import os
import sys
spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/'
if not spark_home:
raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.7.10 (default, May 28 2015 17:04:42) SparkContext available as sc, HiveContext available as sqlContext.
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
df = sqlContext.read.json("people.json")
df.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
df.printSchema()
root |-- age: long (nullable = true) |-- name: string (nullable = true)
df.select("name").show()
+-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+
df.select(df['name'], df['age'] + 1).show()
+-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+
df.filter(df['age'] > 21).show()
+---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
df.groupBy("age").count().show()
+----+-----+ | age|count| +----+-----+ |null| 1| | 19| 1| | 30| 1| +----+-----+
Create RDD and then transform an RDD to an DataFrame
from pyspark.sql import Row
# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people
PythonRDD[33] at RDD at PythonRDD.scala:43
people.collect()
[Row(age=29, name=u'Michael'), Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
schemaPeople = sqlContext.createDataFrame(people)
# schemaPeople = people.toDF() is another way to create DataFrame
schemaPeople
DataFrame[age: bigint, name: string]
schemaPeople.collect()
[Row(age=29, name=u'Michael'), Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print teenName
Name: Justin