sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum() try: import nltk.corpus as corpus stopwords = set(corpus.stopwords.words()) except ImportError: stopwords = [] # Most common words in "THE DEVELOPMENT OF EMBROIDERY IN AMERICA" rdd = sc.textFile("example.txt") rdd \ .flatMap(lambda line: line.split()) \ .map(lambda word: word.strip().lower()) \ .filter(lambda word: word not in stopwords) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) \ .map(lambda (key, cnt): (cnt, key)) \ .top(10) %%time rdd = sc.parallelize(xrange(10**8)).map(lambda x: float(x) ** 2) %%time _ = rdd.count() sc # Load a Python iterable into an RDD sc.parallelize(range(10)) # Load a text file sc.textFile("example.txt") # Each line is a separate element in the RDD # Load text files sc.textFile("example.txt,example2.txt").collect()[-1001:-991] rdd = sc.parallelize(["Barack Hussein Obama", "George Walker Bush", "William Jefferson Clinton"]) rdd.sortBy(keyfunc=lambda k: k.split(" ")[-1]).collect() rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)]) rdd2 = sc.parallelize([("a", 6), ("b", 7), ("b", 8), ("d", 9)]) rdd1.join(rdd2).collect() rdd1.fullOuterJoin(rdd2).collect() from pyspark.sql import SQLContext, Row, StructField, StructType, FloatType, Row import pandas as pd raw_real_estate = sc.textFile("all.txt") print raw_real_estate.take(1)[0][:250] wake_county_real_estate = raw_real_estate.map(lambda row: dict( owner = row[0:35].strip().title(), last_name = row[0:35].strip().title().split(",")[0], address = row[70:105].strip().title(), sale_price = int(row[273:(284)].strip() or -1), value = int(row[305:(316)].strip() or -1), use = int(row[653:656].strip() or -1), heated_area = int(row[471:482].strip() or -1), year_built = int(row[455:459].strip() or -1), height = row[509:510].strip(), )) sqlContext = SQLContext(sc) schemaWake = sqlContext.inferSchema(wake_county_real_estate.map(lambda d: Row(**d))) \ .registerTempTable("wake") pd.DataFrame.from_records( sqlContext.sql("""SELECT DISTINCT owner, address, year_built, value FROM wake WHERE value > 4000000 AND use = 66 AND owner LIKE '%Church%' """).collect(), columns=["Name","Street","Year Built","Value"]) sqlContext.sql("""SELECT MAX(value) as price FROM wake WHERE owner LIKE 'Goodnight, James H% & Ann B%' GROUP BY last_name """).collect()[0].price from pyspark.mllib.tree import DecisionTree, LabeledPoint from pyspark.mllib import feature from pyspark.mllib.stat import Statistics from random import choice subset = wake_county_real_estate.filter(lambda d: d["use"] in [7, 34]) subset = subset.filter(lambda d: d["heated_area"] > 0 and d["year_built"] > 1900) \ .map(lambda d: LabeledPoint( 1 if d["use"] == 7 else 0, [d["year_built"], d["heated_area"]])) subset.take(2) (trainingData, testData) = subset.randomSplit([0.7, 0.3]) tree = DecisionTree.trainClassifier(trainingData, 2, categoricalFeaturesInfo={}) predictions = tree.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print 'Test Error = ' + str(testErr) # This is much better performance than on a random classifier labelsAndPredictions = testData.map(lambda lp: (lp.label, choice([0, 1]))) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print 'Test Error = ' + str(testErr)