RDD cơ bản

  • Programmer chỉ định số lượng partitions.
  • Driver tự phân chia partition đến các Workers tương ứng.
  • Master parameter chỉ định số lượng workers cụ thể.

Các hàm transformations

  • map(func): trả về tập dữ liệu phân tán mới bằng cách ánh xạ từng phần tử tập dữ liệu nguồn qua hàm func do programmer định nghĩa.
  • filter(func): trả về tập dữ liệu phân tán mới bằng cách lọc ra các phần tử tập dữ liệu nguồn thoả điều kiện hàm func định nghĩa.
  • distinct(): trả về tập dữ liệu phân tán mới chỉ chứa các phần tử riêng biệt từ tập dữ liệu nguồn.
  • flatMap(func): tương tự như map(), nhưng có thể ánh xạ các phần tử nguồn sang 0 hoặc nhiều phần tử ở tập dữ liệu mới. Hàm func thường trả về kiểu Seg thay vì phần tử đơn lẻ.
In [1]:
print "http://localhost:4040/jobs/"
http://localhost:4040/jobs/
In [2]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x * 2)
Out[2]:
PythonRDD[1] at RDD at PythonRDD.scala:48
In [3]:
rdd.filter(lambda x: x % 2 == 0)
Out[3]:
PythonRDD[2] at RDD at PythonRDD.scala:48
In [4]:
rdd = sc.parallelize([1, 4, 2, 2, 3])
rdd.distinct()
Out[4]:
PythonRDD[8] at RDD at PythonRDD.scala:48
In [5]:
rdd = sc.parallelize([1, 2, 3])
rdd.map(lambda x: [x, x + 5])
Out[5]:
PythonRDD[10] at RDD at PythonRDD.scala:48
In [6]:
rdd.flatMap(lambda x: [x, x + 5])
Out[6]:
PythonRDD[11] at RDD at PythonRDD.scala:48

Các hàm actions

  • reduce(func): aggregate từng phần tử tập dữ liệu thông qua hàm func, hàm func nhận 2 đối số và trả về 1 giá trị.
  • take(n): trả về mảng n phần tử.
  • collect(): trả về tất cả các phần tử. CHÚ Ý: phải đảm bảo máy Driver đủ dung lượng để chứa kết quả trả về.
  • takeOrdered(n, key=func): trả về n phần tử sắp xếp tăng dần hoặc sắp xếp theo hàm key.
In [7]:
rdd = sc.parallelize([1, 2, 3])
rdd.reduce(lambda a, b: a * b)
Out[7]:
6
In [8]:
rdd.take(2)
Out[8]:
[1, 2]
In [9]:
rdd.collect()
Out[9]:
[1, 2, 3]
In [10]:
rdd = sc.parallelize([5, 3, 1, 2])
rdd.takeOrdered(3, lambda s: -1 * s)
Out[10]:
[5, 3, 2]
In [11]:
rdd.takeOrdered(3)
Out[11]:
[1, 2, 3]
In [12]:
lines = sc.textFile("sample_text.txt", 4)
print lines.count()
5
In [13]:
print lines.count()
5
In [14]:
lines = sc.textFile("sample_text.txt", 4)
lines.cache()
print lines.count()
print lines.count()
5
5

Key-Value RDDs

  • Tương tự như Map Reduce, Spark hỗ trợ Key-Value pairs.
  • Mỗi phần tử của Pair RDD là một cặp tuple. ## Some Key-Value transformation
  • reduceByKey(func): trả về tập dữ liệu phân tán mới (K, V). Trong đó, các giá trị cho từng key được tổng hợp bằng hàm reduce func có dạng (V, V) -> V.
  • sortByKey(): trả về tập dữ liệu phân tán mới (K, V) sắp xếp tăng dần theo keys.
  • groupByKey(): trả về tập dữ liệu phân tán mới (K, Iterable).
In [15]:
rdd = sc.parallelize([(1, 2), (3, 4)])
rdd.collect()
Out[15]:
[(1, 2), (3, 4)]
In [16]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd.reduceByKey(lambda a, b: a + b).collect()
Out[16]:
[(1, 2), (3, 10)]
In [17]:
rdd = sc.parallelize([(1, "a"), (2, "c"), (1, "b")])
rdd.sortByKey().collect()
Out[17]:
[(1, 'a'), (1, 'b'), (2, 'c')]
In [18]:
rdd.groupByKey().collect()
Out[18]:
[(1, <pyspark.resultiterable.ResultIterable at 0x1066d5e50>),
 (2, <pyspark.resultiterable.ResultIterable at 0x1064d4750>)]

X.join(Y)

  • Trả về tất cả các phần tử RDD keys khớp với X và Y.
  • Mỗi cặp có định dạng (k, (v1, v2)). Trong đó, (k, v1) thuộc X và (k, v2) thuộc Y.
In [19]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
Out[19]:
[('a', (1, 2)), ('a', (1, 3))]

X.leftOuterJoin(Y)

  • Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là:
    • Tất cả các cặp (k, (v, w)) với w thuộc Y.
    • Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k.
In [20]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.leftOuterJoin(y).collect())
Out[20]:
[('a', (1, 2)), ('b', (4, None))]

X.rightOuterJoin(Y)

  • Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là:
    • Tất cả các cặp (k, (v, w)) với v thuộc X.
    • Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k.
In [21]:
x = sc.parallelize([("a", 1)])
y = sc.parallelize([("a", 2), ("b", 4)])
sorted(x.rightOuterJoin(y).collect())
Out[21]:
[('a', (1, 2)), ('b', (None, 4))]

X.fullOuterJoin(Y)

  • Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là:
    • Tất cả các cặp (k, (v, w)) với w thuộc Y.
    • Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k.
  • Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là:
    • Tất cả các cặp (k, (v, w)) với v thuộc X.
    • Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k.
In [22]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())
Out[22]:
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]