Tập dữ liệu phân tán biểu diễn dưới dạng dòng và cột như CSDL.
l = [("Alice", 1)]
print sqlContext.createDataFrame(l).collect()
print sqlContext.createDataFrame(l, ["name", "age"]).collect()
[Row(_1=u'Alice', _2=1)] [Row(name=u'Alice', age=1)]
rdd = sc.parallelize(l)
sqlContext.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
df = sqlContext.createDataFrame(rdd, ["name", "age"])
df.collect()
[Row(name=u'Alice', age=1)]
print sqlContext.createDataFrame(rdd, "a: string, b: int").collect()
rdd = sc.parallelize(l)
rdd = rdd.map(lambda row: row[1])
print sqlContext.createDataFrame(rdd, "int").collect()
[Row(a=u'Alice', b=1)] [Row(value=1)]
from pyspark.sql import Row
rdd = sc.parallelize([("Alice", 12)])
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.collect()
[Row(name=u'Alice', age=12)]
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.collect()
[Row(name=u'Alice', age=12)]
import pandas
print sqlContext.createDataFrame(df.toPandas()).collect()
print sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(name=u'Alice', age=1)] [Row(0=1, 1=2)]
df.toDF("f1", "f2").collect()
[Row(f1=u'Alice', f2=1)]
df.toJSON().first()
u'{"name":"Alice","age":1}'
df.toPandas()
name | age | |
---|---|---|
0 | Alice | 1 |
sqlContext.registerDataFrameAsTable(df, "table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
print sqlContext.tableNames()
[u'table1', u'table2']
df3 = sqlContext.tables()
print df3
print df3.filter("tableName = 'table1'").first()
DataFrame[database: string, tableName: string, isTemporary: boolean] Row(database=u'', tableName=u'table1', isTemporary=True)
sqlContext.dropTempTable("table1")
sqlContext.dropTempTable("table2")
sqlContext.registerFunction("stringLengthString", lambda x: len(x))
sqlContext.sql("SELECT stringLengthString('test')").collect()
[Row(stringLengthString(test)=u'4')]
from pyspark.sql.types import IntegerType
sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")
df.collect()
df.createTempView("people")
df2 = sqlContext.sql("select * from people")
df.repartition(10).rdd.getNumPartitions()
10
data = df.union(df).repartition("age")
data.show()
+-----+---+------+ | name|age|height| +-----+---+------+ | Bob| 5| 25| | Bob| 5| 25| |Alice| 2| 12| |Alice| 2| 12| +-----+---+------+
data = data.repartition(7, "age")
data.show()
+-----+---+------+ | name|age|height| +-----+---+------+ |Alice| 2| 12| | Bob| 5| 25| |Alice| 2| 12| | Bob| 5| 25| +-----+---+------+
data.rdd.getNumPartitions()
7
data = data.repartition("name", "age")
data.show()
+-----+---+------+ | name|age|height| +-----+---+------+ | Bob| 5| 25| | Bob| 5| 25| |Alice| 2| 12| |Alice| 2| 12| +-----+---+------+
# withColumn(colName, col)
# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
df.withColumn("age2", df.age + 2).collect()
[Row(name=u'Alice', age=2, height=12, age2=4), Row(name=u'Bob', age=5, height=25, age2=7)]
df.withColumnRenamed("age", "age2").collect()
[Row(name=u'Alice', age2=2, height=12), Row(name=u'Bob', age2=5, height=25)]
print df.select(df.age.cast("string").alias("ages")).collect()
print df.select(df.age.cast(StringType()).alias("ages")).collect()
[Row(ages=u'2'), Row(ages=u'5')] [Row(ages=u'2'), Row(ages=u'5')]
df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]
gdf = df.groupBy(df.name)
sorted(gdf.agg({"*": "count"}).collect())
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
from pyspark.sql import functions as F
sorted(gdf.agg(F.min(df.age)).collect())
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), "inner")
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
[Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)]
df.printSchema()
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- height: integer (nullable = true)
df.schema
StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(height,IntegerType,true)))
df.storageLevel
StorageLevel(False, False, False, False, 1)
df.count()
2
print df.groupBy().sum("age").collect()
print df.groupBy().sum("age", "height").collect()
[Row(sum(age)=7)] [Row(sum(age)=7, sum(height)=37)]
df.groupBy().avg("age").collect()
[Row(avg(age)=3.5)]
df.groupBy().avg("age", "height").collect()
[Row(avg(age)=3.5, avg(height)=18.5)]
df.columns
['name', 'age', 'height']
print df.name
print df["name"]
print df.age + 1
Column<name> Column<name> Column<(age + 1)>
# cube(*col): Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them.
df.cube("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+ | name| age|count| +-----+----+-----+ | null|null| 2| | null| 2| 1| | null| 5| 1| |Alice|null| 1| |Alice| 2| 1| | Bob|null| 1| | Bob| 5| 1| +-----+----+-----+
df.describe(["age"]).show()
+-------+------------------+ |summary| age| +-------+------------------+ | count| 2| | mean| 3.5| | stddev|2.1213203435596424| | min| 2| | max| 5| +-------+------------------+
df.describe().show()
+-------+-----+------------------+-----------------+ |summary| name| age| height| +-------+-----+------------------+-----------------+ | count| 2| 2| 2| | mean| null| 3.5| 18.5| | stddev| null|2.1213203435596424|9.192388155425117| | min|Alice| 2| 12| | max| Bob| 5| 25| +-------+-----+------------------+-----------------+
df.distinct().count()
2
df.dtypes
[('name', 'string'), ('age', 'int'), ('height', 'int')]
df.explain()
== Physical Plan == Scan ExistingRDD[name#81,age#82,height#83]
df.explain(True)
== Parsed Logical Plan == LogicalRDD [name#81, age#82, height#83] == Analyzed Logical Plan == name: string, age: int, height: int LogicalRDD [name#81, age#82, height#83] == Optimized Logical Plan == LogicalRDD [name#81, age#82, height#83] == Physical Plan == Scan ExistingRDD[name#81,age#82,height#83]
df.groupBy().avg().collect()
[Row(avg(age)=3.5, avg(height)=18.5)]
df.groupBy("name").agg({"age": "mean"}).collect()
[Row(name=u'Bob', avg(age)=5.0), Row(name=u'Alice', avg(age)=2.0)]
df.groupBy(df.name).avg().collect()
[Row(name=u'Bob', avg(age)=5.0, avg(height)=25.0), Row(name=u'Alice', avg(age)=2.0, avg(height)=12.0)]
df.groupBy(["name", df.age]).count().collect()
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
print df.groupBy().max("age").collect()
print df.groupBy().max("age", "height").collect()
[Row(max(age)=5)] [Row(max(age)=5, max(height)=25)]
print df.groupBy().mean("age").collect()
print df.groupBy().mean("age", "height").collect()
[Row(avg(age)=3.5)] [Row(avg(age)=3.5, avg(height)=18.5)]
from pyspark.sql import Row
df = sc.parallelize([
Row(name="Alice", age=5, height=80),
Row(name="Alice", age=5, height=80),
Row(name="Alice", age=10, height=80)
]).toDF()
df.dropDuplicates().show()
+---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| | 10| 80|Alice| +---+------+-----+
df.dropDuplicates(["name", "height"]).show()
+---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| +---+------+-----+
print df.select("age", "name").collect()
print df2.select("name", "height").collect()
df.crossJoin(df2.select("height")).select("age", "name", df2.height).collect()
[Row(age=5, name=u'Alice'), Row(age=5, name=u'Alice'), Row(age=10, name=u'Alice')] [Row(name=u'Alice', height=12), Row(name=u'Bob', height=25)]
[Row(age=5, name=u'Alice', height=12), Row(age=5, name=u'Alice', height=25), Row(age=5, name=u'Alice', height=12), Row(age=5, name=u'Alice', height=25), Row(age=10, name=u'Alice', height=12), Row(age=10, name=u'Alice', height=25)]
df.drop("age").collect()
[Row(height=80, name=u'Alice'), Row(height=80, name=u'Alice'), Row(height=80, name=u'Alice')]
df.drop(df.age).collect()
[Row(height=80, name=u'Alice'), Row(height=80, name=u'Alice'), Row(height=80, name=u'Alice')]
df.join(df2, df.name == df2.name, "inner").drop(df.name).drop(df.age).collect()
[Row(height=80, name=u'Alice', age=2, height=12), Row(height=80, name=u'Alice', age=2, height=12), Row(height=80, name=u'Alice', age=2, height=12)]
df.join(df2, "name", "inner").drop("age", "height").collect()
[Row(name=u'Alice'), Row(name=u'Alice'), Row(name=u'Alice')]
df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=25), Row(name=u'Alice', height=12), Row(name=u'Alice', height=12), Row(name=u'Alice', height=12)]
df.join(df2, 'name', 'outer').select('name', df.height).collect()
[Row(name=u'Bob', height=None), Row(name=u'Alice', height=80), Row(name=u'Alice', height=80), Row(name=u'Alice', height=80)]
cond = [df.name == df2.name, df.age == df2.age]
df.join(df2, cond, 'outer').select(df.name, df2.age).collect()
[Row(name=None, age=2), Row(name=None, age=5), Row(name=u'Alice', age=None), Row(name=u'Alice', age=None), Row(name=u'Alice', age=None)]
df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name=u'Alice', height=12), Row(name=u'Alice', height=12), Row(name=u'Alice', height=12)]
df.join(df2, ['name', 'age']).select(df.name, df.age).collect()
[]
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")
print df.filter(df.age > 3).collect()
print df.filter("age > 3").collect()
print df.where("age=2").collect()
[Row(name=u'Bob', age=5, height=25)] [Row(name=u'Bob', age=5, height=25)] [Row(name=u'Alice', age=2, height=12)]
df.first()
Row(name=u'Alice', age=2, height=12)
df.head()
Row(name=u'Alice', age=2, height=12)
print df.limit(1).collect()
print df.limit(0).collect()
[Row(name=u'Alice', age=2, height=12)] []
# orderBy
print df.sort(df.age.desc()).collect()
print df.sort("age", ascending=False).collect()
print df.orderBy(df.age.desc()).collect()
from pyspark.sql.functions import *
print df.sort(asc("age")).collect()
print df.sort(desc("age"), "name").collect()
print df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)] [Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)] [Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)] [Row(name=u'Alice', age=2, height=12), Row(name=u'Bob', age=5, height=25)] [Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)] [Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]
print df.filter(df.name.endswith("ice")).collect()
df.filter(df.name.endswith("ice$")).collect()
[Row(name=u'Alice', age=2, height=12)]
[]
# get subfield RDD > RDD, gets a field by name in a StructField.
from pyspark.sql import Row
df1 = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df1.select(df1.r.getField("b")).show()
df1.select(df1.r.getField("a")).show()
+---+ |r.b| +---+ | b| +---+ +---+ |r.a| +---+ | 1| +---+
# RDD contains list and dictionary
df1 = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
df1.select(df1.l.getItem(0), df1.d.getItem("key")).show()
df1.select(df1.l[0], df1.d["key"]).show()
+----+------+ |l[0]|d[key]| +----+------+ | 1| value| +----+------+ +----+------+ |l[0]|d[key]| +----+------+ | 1| value| +----+------+
from pyspark.sql import Row
df1 = sc.parallelize([Row(name=u"Tom", height=80), Row(name=u"Alice", height=None)]).toDF()
print df1.filter(df1.height.isNotNull()).collect()
print df1.filter(df1.height.isNull()).collect()
[Row(height=80, name=u'Tom')] [Row(height=None, name=u'Alice')]
print df[df.name.isin("Bob", "Mike")].collect()
print df[df.age.isin(1, 2, 3)].collect()
[Row(name=u'Bob', age=5, height=25)] [Row(name=u'Alice', age=2, height=12)]
df.filter(df.name.like("Al%")).collect()
[Row(name=u'Alice', age=2, height=12)]
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+ | name|CASE WHEN (age > 3) THEN 1 ELSE 0 END| +-----+-------------------------------------+ |Alice| 0| | Bob| 1| +-----+-------------------------------------+
df.na.replace(["Alice", "Bob"], ["A", "B"], "name").show()
+----+---+------+ |name|age|height| +----+---+------+ | A| 2| 12| | B| 5| 25| +----+---+------+
df.rollup("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+ | name| age|count| +-----+----+-----+ | null|null| 2| |Alice|null| 1| |Alice| 2| 1| | Bob|null| 1| | Bob| 5| 1| +-----+----+-----+
# sample(withReplacement, fraction, seed=None)
df.sample(False, 0.5, 42).count()
2
# sampleBy(col, fractions, seed=None)
dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
sampled.groupBy("key").count().orderBy("key").show()
+---+-----+ |key|count| +---+-----+ | 0| 5| | 1| 9| +---+-----+
df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
# show(n=20, truncate=True)
# truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
df.show(truncate=3)
+----+---+------+ |name|age|height| +----+---+------+ | Ali| 2| 12| | Bob| 5| 25| +----+---+------+
row = Row(name="Alice", age=11)
print row
print row["name"], row["age"]
print row.name, row.age
print "name" in row
print "wrong_key" in row
Row(age=11, name='Alice') Alice 11 Alice 11 True False
# Row also can be used to create another Row like class, then it could be used to create Row objects
Person = Row("name", "age")
print Person
print Person("Alice", 11)
<Row(name, age)> Row(name='Alice', age=11)
# asDict(recursive=False)
print Row(name="Alice", age=11).asDict()
row = Row(key=1, value=Row(name="a", age=2))
print row.asDict()
print row.asDict(True)
{'age': 11, 'name': 'Alice'} {'value': Row(age=2, name='a'), 'key': 1} {'value': {'age': 2, 'name': 'a'}, 'key': 1}