DataFrame cơ bản

Tập dữ liệu phân tán biểu diễn dưới dạng dòng và cột như CSDL.

SQLContext: Tạo DataFrame

Từ danh sách tuples

In [1]:
l = [("Alice", 1)]
print sqlContext.createDataFrame(l).collect()
print sqlContext.createDataFrame(l, ["name", "age"]).collect()
[Row(_1=u'Alice', _2=1)]
[Row(name=u'Alice', age=1)]

Từ RDDs

In [2]:
rdd = sc.parallelize(l)
sqlContext.createDataFrame(rdd).collect()
Out[2]:
[Row(_1=u'Alice', _2=1)]
In [3]:
df = sqlContext.createDataFrame(rdd, ["name", "age"])
df.collect()
Out[3]:
[Row(name=u'Alice', age=1)]
In [4]:
print sqlContext.createDataFrame(rdd, "a: string, b: int").collect()
rdd = sc.parallelize(l)
rdd = rdd.map(lambda row: row[1])
print sqlContext.createDataFrame(rdd, "int").collect()
[Row(a=u'Alice', b=1)]
[Row(value=1)]

Từ Row

In [5]:
from pyspark.sql import Row
rdd = sc.parallelize([("Alice", 12)])
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.collect()
Out[5]:
[Row(name=u'Alice', age=12)]

Từ Schema

In [6]:
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
  ])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.collect()
Out[6]:
[Row(name=u'Alice', age=12)]

Từ pandas

In [7]:
import pandas
print sqlContext.createDataFrame(df.toPandas()).collect()
print sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(name=u'Alice', age=1)]
[Row(0=1, 1=2)]

Chuyển đổi định dạng

In [8]:
df.toDF("f1", "f2").collect()
Out[8]:
[Row(f1=u'Alice', f2=1)]
In [9]:
df.toJSON().first()
Out[9]:
u'{"name":"Alice","age":1}'
In [10]:
df.toPandas()
Out[10]:
name age
0 Alice 1

Tạo temp table

In [11]:
sqlContext.registerDataFrameAsTable(df, "table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
print sqlContext.tableNames()
[u'table1', u'table2']
In [12]:
df3 = sqlContext.tables()
print df3
print df3.filter("tableName = 'table1'").first()
DataFrame[database: string, tableName: string, isTemporary: boolean]
Row(database=u'', tableName=u'table1', isTemporary=True)
In [13]:
sqlContext.dropTempTable("table1")
sqlContext.dropTempTable("table2")

Tạo hàm UDF: User Defined Function

In [14]:
sqlContext.registerFunction("stringLengthString", lambda x: len(x))
sqlContext.sql("SELECT stringLengthString('test')").collect()
Out[14]:
[Row(stringLengthString(test)=u'4')]
In [15]:
from pyspark.sql.types import IntegerType
sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()
Out[15]:
[Row(stringLengthInt(test)=4)]
In [16]:
sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()
Out[16]:
[Row(stringLengthInt(test)=4)]

Thao tác với DataFrame

In [17]:
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")
df.collect()

df.createTempView("people")
df2 = sqlContext.sql("select * from people")
In [18]:
df.repartition(10).rdd.getNumPartitions()
Out[18]:
10
In [19]:
data = df.union(df).repartition("age")
data.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|  Bob|  5|    25|
|  Bob|  5|    25|
|Alice|  2|    12|
|Alice|  2|    12|
+-----+---+------+

In [20]:
data = data.repartition(7, "age")
data.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  2|    12|
|  Bob|  5|    25|
|Alice|  2|    12|
|  Bob|  5|    25|
+-----+---+------+

In [21]:
data.rdd.getNumPartitions()
Out[21]:
7
In [22]:
data = data.repartition("name", "age")
data.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|  Bob|  5|    25|
|  Bob|  5|    25|
|Alice|  2|    12|
|Alice|  2|    12|
+-----+---+------+

In [23]:
# withColumn(colName, col)
# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
df.withColumn("age2", df.age + 2).collect()
Out[23]:
[Row(name=u'Alice', age=2, height=12, age2=4),
 Row(name=u'Bob', age=5, height=25, age2=7)]
In [24]:
df.withColumnRenamed("age", "age2").collect()
Out[24]:
[Row(name=u'Alice', age2=2, height=12), Row(name=u'Bob', age2=5, height=25)]
In [25]:
print df.select(df.age.cast("string").alias("ages")).collect()
print df.select(df.age.cast(StringType()).alias("ages")).collect()
[Row(ages=u'2'), Row(ages=u'5')]
[Row(ages=u'2'), Row(ages=u'5')]

Tổng hợp dữ liệu

In [26]:
df.agg({"age": "max"}).collect()
Out[26]:
[Row(max(age)=5)]
In [27]:
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()
Out[27]:
[Row(min(age)=2)]
In [28]:
gdf = df.groupBy(df.name)
sorted(gdf.agg({"*": "count"}).collect())
Out[28]:
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
In [29]:
from pyspark.sql import functions as F
sorted(gdf.agg(F.min(df.age)).collect())
Out[29]:
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

Alias

In [30]:
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), "inner")
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
Out[30]:
[Row(name=u'Bob', name=u'Bob', age=5),
 Row(name=u'Alice', name=u'Alice', age=2)]

Xem thống kê

In [31]:
df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)

In [32]:
df.schema
Out[32]:
StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(height,IntegerType,true)))
In [33]:
df.storageLevel
Out[33]:
StorageLevel(False, False, False, False, 1)
In [34]:
df.count()
Out[34]:
2
In [35]:
print df.groupBy().sum("age").collect()
print df.groupBy().sum("age", "height").collect()
[Row(sum(age)=7)]
[Row(sum(age)=7, sum(height)=37)]
In [36]:
df.groupBy().avg("age").collect()
Out[36]:
[Row(avg(age)=3.5)]
In [37]:
df.groupBy().avg("age", "height").collect()
Out[37]:
[Row(avg(age)=3.5, avg(height)=18.5)]
In [38]:
df.columns
Out[38]:
['name', 'age', 'height']
In [39]:
print df.name
print df["name"]
print df.age + 1
Column<name>
Column<name>
Column<(age + 1)>
In [40]:
# cube(*col): Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them.
df.cube("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|null|    2|
| null|   2|    1|
| null|   5|    1|
|Alice|null|    1|
|Alice|   2|    1|
|  Bob|null|    1|
|  Bob|   5|    1|
+-----+----+-----+

In [41]:
df.describe(["age"]).show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               3.5|
| stddev|2.1213203435596424|
|    min|                 2|
|    max|                 5|
+-------+------------------+

In [42]:
df.describe().show()
+-------+-----+------------------+-----------------+
|summary| name|               age|           height|
+-------+-----+------------------+-----------------+
|  count|    2|                 2|                2|
|   mean| null|               3.5|             18.5|
| stddev| null|2.1213203435596424|9.192388155425117|
|    min|Alice|                 2|               12|
|    max|  Bob|                 5|               25|
+-------+-----+------------------+-----------------+

In [43]:
df.distinct().count()
Out[43]:
2
In [44]:
df.dtypes
Out[44]:
[('name', 'string'), ('age', 'int'), ('height', 'int')]
In [45]:
df.explain()
== Physical Plan ==
Scan ExistingRDD[name#81,age#82,height#83]
In [46]:
df.explain(True)
== Parsed Logical Plan ==
LogicalRDD [name#81, age#82, height#83]

== Analyzed Logical Plan ==
name: string, age: int, height: int
LogicalRDD [name#81, age#82, height#83]

== Optimized Logical Plan ==
LogicalRDD [name#81, age#82, height#83]

== Physical Plan ==
Scan ExistingRDD[name#81,age#82,height#83]
In [47]:
df.groupBy().avg().collect()
Out[47]:
[Row(avg(age)=3.5, avg(height)=18.5)]
In [48]:
df.groupBy("name").agg({"age": "mean"}).collect()
Out[48]:
[Row(name=u'Bob', avg(age)=5.0), Row(name=u'Alice', avg(age)=2.0)]
In [49]:
df.groupBy(df.name).avg().collect()
Out[49]:
[Row(name=u'Bob', avg(age)=5.0, avg(height)=25.0),
 Row(name=u'Alice', avg(age)=2.0, avg(height)=12.0)]
In [50]:
df.groupBy(["name", df.age]).count().collect()
Out[50]:
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
In [51]:
print df.groupBy().max("age").collect()
print df.groupBy().max("age", "height").collect()
[Row(max(age)=5)]
[Row(max(age)=5, max(height)=25)]
In [52]:
print df.groupBy().mean("age").collect()
print df.groupBy().mean("age", "height").collect()
[Row(avg(age)=3.5)]
[Row(avg(age)=3.5, avg(height)=18.5)]
In [53]:
from pyspark.sql import Row
df = sc.parallelize([
    Row(name="Alice", age=5, height=80),
    Row(name="Alice", age=5, height=80),
    Row(name="Alice", age=10, height=80)
  ]).toDF()
df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

In [54]:
df.dropDuplicates(["name", "height"]).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

Join

In [55]:
print df.select("age", "name").collect()
print df2.select("name", "height").collect()
df.crossJoin(df2.select("height")).select("age", "name", df2.height).collect()
[Row(age=5, name=u'Alice'), Row(age=5, name=u'Alice'), Row(age=10, name=u'Alice')]
[Row(name=u'Alice', height=12), Row(name=u'Bob', height=25)]
Out[55]:
[Row(age=5, name=u'Alice', height=12),
 Row(age=5, name=u'Alice', height=25),
 Row(age=5, name=u'Alice', height=12),
 Row(age=5, name=u'Alice', height=25),
 Row(age=10, name=u'Alice', height=12),
 Row(age=10, name=u'Alice', height=25)]
In [56]:
df.drop("age").collect()
Out[56]:
[Row(height=80, name=u'Alice'),
 Row(height=80, name=u'Alice'),
 Row(height=80, name=u'Alice')]
In [57]:
df.drop(df.age).collect()
Out[57]:
[Row(height=80, name=u'Alice'),
 Row(height=80, name=u'Alice'),
 Row(height=80, name=u'Alice')]
In [58]:
df.join(df2, df.name == df2.name, "inner").drop(df.name).drop(df.age).collect()
Out[58]:
[Row(height=80, name=u'Alice', age=2, height=12),
 Row(height=80, name=u'Alice', age=2, height=12),
 Row(height=80, name=u'Alice', age=2, height=12)]
In [59]:
df.join(df2, "name", "inner").drop("age", "height").collect()
Out[59]:
[Row(name=u'Alice'), Row(name=u'Alice'), Row(name=u'Alice')]
In [60]:
df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
Out[60]:
[Row(name=None, height=25),
 Row(name=u'Alice', height=12),
 Row(name=u'Alice', height=12),
 Row(name=u'Alice', height=12)]
In [61]:
df.join(df2, 'name', 'outer').select('name', df.height).collect()
Out[61]:
[Row(name=u'Bob', height=None),
 Row(name=u'Alice', height=80),
 Row(name=u'Alice', height=80),
 Row(name=u'Alice', height=80)]
In [62]:
cond = [df.name == df2.name, df.age == df2.age]
df.join(df2, cond, 'outer').select(df.name, df2.age).collect()
Out[62]:
[Row(name=None, age=2),
 Row(name=None, age=5),
 Row(name=u'Alice', age=None),
 Row(name=u'Alice', age=None),
 Row(name=u'Alice', age=None)]
In [63]:
df.join(df2, 'name').select(df.name, df2.height).collect()
Out[63]:
[Row(name=u'Alice', height=12),
 Row(name=u'Alice', height=12),
 Row(name=u'Alice', height=12)]
In [64]:
df.join(df2, ['name', 'age']).select(df.name, df.age).collect()
Out[64]:
[]

Filter

In [65]:
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")

print df.filter(df.age > 3).collect()
print df.filter("age > 3").collect()
print df.where("age=2").collect()
[Row(name=u'Bob', age=5, height=25)]
[Row(name=u'Bob', age=5, height=25)]
[Row(name=u'Alice', age=2, height=12)]
In [66]:
df.first()
Out[66]:
Row(name=u'Alice', age=2, height=12)
In [67]:
df.head()
Out[67]:
Row(name=u'Alice', age=2, height=12)
In [68]:
print df.limit(1).collect()
print df.limit(0).collect()
[Row(name=u'Alice', age=2, height=12)]
[]
In [69]:
# orderBy
print df.sort(df.age.desc()).collect()
print df.sort("age", ascending=False).collect()
print df.orderBy(df.age.desc()).collect()

from pyspark.sql.functions import *
print df.sort(asc("age")).collect()
print df.sort(desc("age"), "name").collect()
print df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
[Row(name=u'Alice', age=2, height=12), Row(name=u'Bob', age=5, height=25)]
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
In [70]:
print df.filter(df.name.endswith("ice")).collect()
df.filter(df.name.endswith("ice$")).collect()
[Row(name=u'Alice', age=2, height=12)]
Out[70]:
[]
In [71]:
# get subfield RDD > RDD, gets a field by name in a StructField.
from pyspark.sql import Row
df1 = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df1.select(df1.r.getField("b")).show()
df1.select(df1.r.getField("a")).show()
+---+
|r.b|
+---+
|  b|
+---+

+---+
|r.a|
+---+
|  1|
+---+

In [72]:
# RDD contains list and dictionary
df1 = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
df1.select(df1.l.getItem(0), df1.d.getItem("key")).show()
df1.select(df1.l[0], df1.d["key"]).show()
+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+

In [73]:
from pyspark.sql import Row
df1 = sc.parallelize([Row(name=u"Tom", height=80), Row(name=u"Alice", height=None)]).toDF()
print df1.filter(df1.height.isNotNull()).collect()
print df1.filter(df1.height.isNull()).collect()
[Row(height=80, name=u'Tom')]
[Row(height=None, name=u'Alice')]
In [74]:
print df[df.name.isin("Bob", "Mike")].collect()
print df[df.age.isin(1, 2, 3)].collect()
[Row(name=u'Bob', age=5, height=25)]
[Row(name=u'Alice', age=2, height=12)]
In [75]:
df.filter(df.name.like("Al%")).collect()
Out[75]:
[Row(name=u'Alice', age=2, height=12)]
In [76]:
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice|                                    0|
|  Bob|                                    1|
+-----+-------------------------------------+

Làm việc với Sample

In [77]:
df.na.replace(["Alice", "Bob"], ["A", "B"], "name").show()
+----+---+------+
|name|age|height|
+----+---+------+
|   A|  2|    12|
|   B|  5|    25|
+----+---+------+

In [78]:
df.rollup("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|null|    2|
|Alice|null|    1|
|Alice|   2|    1|
|  Bob|null|    1|
|  Bob|   5|    1|
+-----+----+-----+

In [79]:
# sample(withReplacement, fraction, seed=None)
df.sample(False, 0.5, 42).count()
Out[79]:
2
In [80]:
# sampleBy(col, fractions, seed=None)
dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
sampled.groupBy("key").count().orderBy("key").show()
+---+-----+
|key|count|
+---+-----+
|  0|    5|
|  1|    9|
+---+-----+

In [81]:
df.selectExpr("age * 2", "abs(age)").collect()
Out[81]:
[Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
In [82]:
# show(n=20, truncate=True)
# truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
df.show(truncate=3)
+----+---+------+
|name|age|height|
+----+---+------+
| Ali|  2|    12|
| Bob|  5|    25|
+----+---+------+

Làm việc với Row

In [83]:
row = Row(name="Alice", age=11)
print row
print row["name"], row["age"]
print row.name, row.age
print "name" in row
print "wrong_key" in row
Row(age=11, name='Alice')
Alice 11
Alice 11
True
False
In [84]:
# Row also can be used to create another Row like class, then it could be used to create Row objects
Person = Row("name", "age")
print Person
print Person("Alice", 11)
<Row(name, age)>
Row(name='Alice', age=11)
In [85]:
# asDict(recursive=False)
print Row(name="Alice", age=11).asDict()
row = Row(key=1, value=Row(name="a", age=2))
print row.asDict()
print row.asDict(True)
{'age': 11, 'name': 'Alice'}
{'value': Row(age=2, name='a'), 'key': 1}
{'value': {'age': 2, 'name': 'a'}, 'key': 1}