#!/usr/bin/env python # coding: utf-8 # In[18]: from pyspark import SparkContext, SparkConf # In[20]: from pyspark import SparkContext, SparkConf # In[21]: conf = SparkConf().setAppName('pyspark') # In[22]: sc = SparkContext(conf=conf) # In[23]: # General: import sys import tweepy # To consume Twitter's API import re import math import numpy as np sc # In[24]: # For plotting and visualization: from IPython.display import display import matplotlib.pyplot as plt from pyspark.sql.types import * from pyspark.sql import * from pyspark.sql.functions import lit from pyspark.sql import functions as func from pyspark.sql.functions import col from pyspark.sql.types import IntegerType from pyspark.sql.functions import desc from pyspark.sql.functions import bround from pyspark.sql import Row from dateutil import parser from pyspark.sql.types import * # In[25]: # Consume: CONSUMER_KEY = '' CONSUMER_SECRET = '' # Access: ACCESS_TOKEN = '' ACCESS_SECRET = '' # In[26]: def twitter(): # Authentication and access using keys: auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET) # Return API with authentication: api = tweepy.API(auth) return api # In[27]: #Enter WOEID oF Location to know MOST POPULAR TWEETS in that location api = twitter() tweet_pop = [] place = api.trends_place(id = 395269) #Caracas for location in place: for trend in location["trends"]: tweet_pop.append(trend["name"]) tweet_pop.append(trend["tweet_volume"]) # find top 5 popular tweet topic based on tweet volume pop_list_tweet = [tweet_pop[2*i:2*i+2] for i in range(0,math.ceil(len(tweet_pop)/2))] pop_list_tweet = spark.createDataFrame(pop_list_tweet, ["Tweet", "TweetVolume"]) pop_list_tweet = pop_list_tweet.withColumn("Tweet", pop_list_tweet["Tweet"].cast(StringType())).withColumn("TweetVolume", pop_list_tweet["TweetVolume"].cast(IntegerType())) pop_list_tweet = pop_list_tweet.sort(desc("TweetVolume")).limit(5) popular = pop_list_tweet.select(col("Tweet")).limit(1).collect() res={} for i in popular: res.update(i.asDict()) mostPopular= res['Tweet'] plt.title("Most Popular Tweets ",fontsize = 40) pop=pop_list_tweet.toPandas() ax1 = plt.subplot(121, aspect='equal') pop.plot(kind='pie', y = 'TweetVolume', ax=ax1, autopct='%1.1f%%', startangle=90, shadow=False, labels=pop['Tweet'], legend = False, fontsize= 35, radius=7) #search the most poplar tweet topic and retrieve tweets on it tweets = api.search(q=mostPopular, count=1500) lists = [] for tweet in tweets[:1500]: hashtags = tweet.entities.get('hashtags') for hashtag in hashtags: lists.append(hashtag['text']) print(lists) # In[28]: spark = SparkSession(sc) # In[29]: #Enter WOEID oF Location to know MOST POPULAR TWEETS in that location api = twitter() tweet_pop = [] place = api.trends_place(id = 395269) #Caracas for location in place: for trend in location["trends"]: tweet_pop.append(trend["name"]) tweet_pop.append(trend["tweet_volume"]) # find top 5 popular tweet topic based on tweet volume pop_list_tweet = [tweet_pop[2*i:2*i+2] for i in range(0,math.ceil(len(tweet_pop)/2))] pop_list_tweet = spark.createDataFrame(pop_list_tweet, ["Tweet", "TweetVolume"]) pop_list_tweet = pop_list_tweet.withColumn("Tweet", pop_list_tweet["Tweet"].cast(StringType())).withColumn("TweetVolume", pop_list_tweet["TweetVolume"].cast(IntegerType())) pop_list_tweet = pop_list_tweet.sort(desc("TweetVolume")).limit(5) popular = pop_list_tweet.select(col("Tweet")).limit(1).collect() res={} for i in popular: res.update(i.asDict()) mostPopular= res['Tweet'] plt.title("Most Popular Tweets ",fontsize = 40) pop=pop_list_tweet.toPandas() ax1 = plt.subplot(121, aspect='equal') pop.plot(kind='pie', y = 'TweetVolume', ax=ax1, autopct='%1.1f%%', startangle=90, shadow=False, labels=pop['Tweet'], legend = False, fontsize= 35, radius=7) #search the most poplar tweet topic and retrieve tweets on it tweets = api.search(q=mostPopular, count=1500) lists = [] for tweet in tweets[:1500]: hashtags = tweet.entities.get('hashtags') for hashtag in hashtags: lists.append(hashtag['text']) print(lists) # In[30]: #get top popular hashtags used in tweets for the topic rdd = sc.parallelize(lists) new_row = Row("Hashtags") df = rdd.map(new_row).toDF() df.show() #calculate percentages of the top hashtags from pyspark.sql.types import StructType total=df.count() result=(df.groupBy('Hashtags').count() .withColumn('Total',func.lit(total)) .withColumn('Percentage',(col('count')/col('Total') *100)) .withColumn('%',bround('Percentage',2)) .sort(desc('%')) .limit(5) ) #plot popular hashtags pdf1=result.toPandas() pdf1.plot(kind='barh',x='Hashtags',y='%',colormap='winter_r') # In[31]: #Details about the tweets on popular topic (Tweet, Retweet, Likes, Creation Date) list_tweet = [] for tweet in tweets: data = re.sub(r'\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', tweet.text).replace("\n","") list_tweet.append(data) list_tweet.append(tweet.retweet_count) list_tweet.append(tweet.favorite_count) list_tweet.append(str(parser.parse(str(tweet.created_at)))) list_tweet = [list_tweet[4*i:4*i+4] for i in range(0,math.ceil(len(list_tweet)/4))] tweet_df = spark.createDataFrame(list_tweet, ["Tweet", "Retweet","Likes","Created_At"]) tweet_df = tweet_df.withColumn("Retweet", tweet_df["Retweet"].cast(IntegerType())).withColumn("Likes", tweet_df["Likes"].cast(IntegerType())).withColumn("Created_At", tweet_df["Created_At"].cast(DateType())) tweet_df.show() # In[32]: #retrieve top 5 most retweeted tweets max_retweet = tweet_df.groupBy('Tweet').agg(func.sum("Retweet").alias('Retweet')) max_retweet = max_retweet.sort(desc('Retweet')).limit(5) max_retweet.show() plt.title("Most Retweeted Tweets on " + mostPopular,fontsize = 40) pdf2=max_retweet.toPandas() ax1 = plt.subplot(121, aspect='equal') pdf2.plot(kind='pie', y = 'Retweet', ax=ax1, autopct='%1.1f%%', startangle=90, shadow=False, labels=pdf2['Tweet'], legend = False, fontsize= 46, radius=25) # In[34]: #retrieve top 5 most liked tweets max_likes = tweet_df.groupBy('Tweet').agg(func.sum("Likes").alias('Likes')) max_likes = max_likes.sort(desc('Likes')).limit(5) max_likes.show() pdf3=max_likes.toPandas() plt.title("Most Liked Tweets on " + mostPopular,fontsize = 40) ax1 = plt.subplot(121, aspect='equal') pdf3.plot(kind='pie', y = 'Likes', ax=ax1, autopct='%1.1f%%', startangle=90, shadow=False, labels=pdf3['Tweet'], legend = False, fontsize= 40, radius=8) # In[35]: popular_month=(tweet_df.groupBy('Created_At').count()) mon_popular = popular_month.sort(desc('count')).show() # In[ ]: