#!/usr/bin/env python # coding: utf-8 # In[1]: import os import sys spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/' if not spark_home: raise ValueError('SPARK_HOME enviroment variable is not set') sys.path.insert(0,os.path.join(spark_home,'python')) sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip')) execfile(os.path.join(spark_home,'python/pyspark/shell.py')) # #Dataframes # A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. # ##Load data from json file # In[2]: df = sqlContext.read.json("people.json") df.show() # In[3]: df.printSchema() # In[4]: df.select("name").show() # In[5]: df.select(df['name'], df['age'] + 1).show() # In[6]: df.filter(df['age'] > 21).show() # In[7]: df.groupBy("age").count().show() # ##Load data from text file # Create RDD and then transform an RDD to an DataFrame # In[8]: from pyspark.sql import Row # Load a text file and convert each line to a Row. lines = sc.textFile("people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) people # In[9]: people.collect() # In[10]: schemaPeople = sqlContext.createDataFrame(people) # schemaPeople = people.toDF() is another way to create DataFrame schemaPeople # In[11]: schemaPeople.collect() # In[12]: # Infer the schema, and register the DataFrame as a table. schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print teenName