#!/usr/bin/env python # coding: utf-8 # # Solution-1: Introduction to PySpark RDD # # ## Import and initialize SparkContext # In[1]: from pyspark.sql import SparkSession spark = SparkSession.builder.appName("solution-1").getOrCreate() sc = spark.sparkContext # ## Create Parallelized Collections # In[2]: data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) type(distData) # ## TODO: Read external data file # ### Use SparkContext's textFile function to read in a text file # In[3]: textFilePath = './emails.txt' emails = sc.textFile(textFilePath) type(emails) # ## Generate a list of data, from 1 to 10 # In[4]: data = list(range(1,11)) print(data) # ## Parallelize the data with 2 partitions # In[5]: numbers = sc.parallelize(data,2) # ## Print RDD # In[6]: print(numbers) print(numbers.collect()) # ## Get only even numbers, and collect them # In[7]: numbers.filter(lambda x: x % 2 == 0).collect() # ## TODO: find emails with hotmail domain # In[ ]: emails.filter(lambda e: '@hotmail' in e).collect() # ## Square all the numbers in the list using the map operation # In[ ]: numbers.map(lambda x: x*x).collect() # ## Use flatMap to apply a function that returns a list and flatten the result # In[ ]: m = numbers.map(lambda x: [x**2, x**3]).collect() fm = numbers.flatMap(lambda x: [x**2, x**3]).collect() print(m) print(fm) # ## TODO: separate username and domain from all emails # # ### eg: marshuang80@gmail.com -> [marshuang80, gmail.com] # In[ ]: # Hint use the pyhton split() function username_domain = emails.map(lambda x: x.split('@')) username_domain.collect() # ## RDD Key-Value Pairs # In[ ]: username_domain.keys().collect() # In[ ]: username_domain.values().collect() # ## Reduce by key # In[ ]: data = ["a", "b", "a", "a", "b", "b", "b", "b"] rdd = sc.parallelize(data) pairRDD = rdd.map(lambda x: (x, 1)) pairRDD.reduceByKey(lambda x,y: x+y).collect() # ## TODO: count the number of domains with the same username # In[ ]: # do another mapping operation to make all domains in a list username_domain = username_domain.map(lambda x: (x[0],1)) print("** Results from mapping values to list") print(username_domain.top(3)) print("\n** Results from reduceByKey ** ") username_domain.reduceByKey(lambda val1, val2: val1 + val2).collect() # ## Very important to stop Spark # In[ ]: sc.stop()