#!/usr/bin/env python # coding: utf-8 # ## Analysis of Tweets from Ireland 8th # ### Setup # In[1]: import os import pyspark from pyspark.sql import SQLContext # Add the elasticsearch-hadoop jar os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell' conf = pyspark.SparkConf() # Point to the master. conf.setMaster("spark://tweetsets.library.gwu.edu:7101") import os import pyspark # Add the elasticsearch-hadoop jar os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell' conf = pyspark.SparkConf() # Point to the master. conf.setMaster("spark://tweetsets.library.gwu.edu:7101") conf.setAppName("ireland-8th-analysis") conf.set("spark.driver.bindAddress", "0.0.0.0") # Don't hog all of the cores. conf.set("spark.cores.max", "3") # Specify a port for the block manager (which runs as part of the worker). The range 7003-7028 is set # to be open in the Spark worker container. conf.set("spark.blockManager.port", "7003") # create the context sc = pyspark.SparkContext(conf=conf) # Configure for ElasticSearch cluster and index. es_conf = {"es.nodes": "tweetsets.library.gwu.edu", "es.port": "9200", "es.resource": "tweets-ba2157/doc", "es.read.field.as.array.include": "hashtags,text,urls"} sqlContext = SQLContext(sc) tweets_df = sqlContext.read.format("org.elasticsearch.spark.sql").options(**es_conf).load() tweets_df.createOrReplaceTempView("tweets") # ### Count # In[2]: tweets_df.count() # ### Top hashtags # In[3]: hashtags_df = sqlContext.sql("SELECT hashtag, count(hashtag) from (SELECT explode(hashtags) hashtag FROM tweets) group by hashtag order by count(hashtag) desc") hashtags_df.show(50, truncate=False) # ### Top users by all tweet types # In[4]: screen_name_df = sqlContext.sql("SELECT user_screen_name, count(user_screen_name) from tweets group by user_screen_name order by count(user_screen_name) desc") screen_name_df.show(50, truncate=False) # ### Top users by original tweets only # In[5]: screen_name_orig_df = sqlContext.sql("SELECT user_screen_name, count(user_screen_name) from tweets where tweet_type='original' group by user_screen_name order by count(user_screen_name) desc") screen_name_orig_df.show(50, truncate=False) # ### Top URLs # In[6]: urls_df = sqlContext.sql("SELECT url, count(url) from (SELECT explode(urls) url FROM tweets) where not url like 'http://twitter.com%' group by url order by count(url) desc") urls_df.show(50, truncate=False) # ### Top timezones # In[7]: tz_df = sqlContext.sql("SELECT user_time_zone, count(user_time_zone) FROM tweets group by user_time_zone order by count(user_time_zone) desc") tz_df.show(10, truncate=False) # ### Top user languages # In[8]: lang_df = sqlContext.sql("SELECT user_language, count(user_language) FROM tweets group by user_language order by count(user_language) desc") lang_df.show(10, truncate=False) # ### Top retweets # In[9]: rt_df = sqlContext.sql("SELECT CONCAT('https://twitter.com/', retweeted_quoted_screen_name, '/status/', retweet_quoted_status_id), count(retweet_quoted_status_id) FROM tweets group by retweet_quoted_status_id, retweeted_quoted_screen_name order by count(retweet_quoted_status_id) desc") rt_df.show(25, truncate=False) # ### Top trigrams (combinations of 3 words) # In[11]: from pyspark.ml.feature import RegexTokenizer, NGram, StopWordsRemover from pyspark.sql.functions import sort_array, udf, explode from pyspark.sql.types import ArrayType, StringType # Text (using distinct) text_df = tweets_df.select(explode("text").alias("text")).distinct() # Tokenize tokenizer = RegexTokenizer(pattern="([:\.!?,]|'s|’s)*\\s+[‘]*", inputCol="text", outputCol="words") tokenized_df = tokenizer.transform(text_df) # Stopwords stop_words = StopWordsRemover.loadDefaultStopWords('english') stop_words.extend(['rt', ' ', '-', '&', 'it’s', '', 'may', 'see', 'want', 'i’m', 'us', 'make', "we've", "you're", "you've", "don't", "i’ve", 'it', 'they’re', 'don’t', 'lets', 'add']) remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stop_words) filtered_df = remover.transform(tokenized_df) # Remove hashtags and URLs and dupes def clean(arr): new_arr = set() for item in arr: add_to_arr = True for startswith in ('#', 'http'): if item.startswith(startswith): add_to_arr = False if add_to_arr: new_arr.add(item) return list(new_arr) clean_udf = udf(lambda arr: clean(arr), ArrayType(StringType())) clean_df = filtered_df.withColumn("clean_words", clean_udf(filtered_df.filtered_words)) # Sort the words sorted_df = clean_df.select(sort_array('clean_words').alias('sorted_words')) ngram = NGram(n=3, inputCol="sorted_words", outputCol="ngrams") ngram_df = ngram.transform(sorted_df).select(explode('ngrams').alias('ngrams')) ngram_df.groupBy('ngrams').count().orderBy('count', ascending=False).show(50, truncate=False)