In [18]:
from pyspark import SparkContext, SparkConf
In [20]:
from pyspark import SparkContext, SparkConf
In [21]:
conf = SparkConf().setAppName('pyspark')
In [22]:
sc = SparkContext(conf=conf)
In [23]:
# General:
import sys
import tweepy           # To consume Twitter's API
import re
import math
import numpy as np
sc
Out[23]:

SparkContext

Spark UI

Version
v2.4.1
Master
local[*]
AppName
pyspark
In [24]:
# For plotting and visualization:
from IPython.display import display
import matplotlib.pyplot as plt
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import lit
from pyspark.sql import functions as func
from pyspark.sql.functions  import col
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import bround
from pyspark.sql import Row
from dateutil import parser
from pyspark.sql.types import *
In [25]:
# Consume:
CONSUMER_KEY    = ''
CONSUMER_SECRET = ''

# Access:
ACCESS_TOKEN  = ''
ACCESS_SECRET = ''
In [26]:
def twitter():
    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth)
    return api
In [27]:
#Enter WOEID oF Location to know MOST POPULAR TWEETS in that location
api = twitter()
tweet_pop = []
place = api.trends_place(id = 395269) #Caracas
for location in place:
    for trend in location["trends"]:
        tweet_pop.append(trend["name"])
        tweet_pop.append(trend["tweet_volume"])

# find top 5 popular tweet topic based on tweet volume        
pop_list_tweet = [tweet_pop[2*i:2*i+2] for i in range(0,math.ceil(len(tweet_pop)/2))]
pop_list_tweet = spark.createDataFrame(pop_list_tweet, ["Tweet", "TweetVolume"])
pop_list_tweet = pop_list_tweet.withColumn("Tweet", pop_list_tweet["Tweet"].cast(StringType())).withColumn("TweetVolume", pop_list_tweet["TweetVolume"].cast(IntegerType()))
pop_list_tweet = pop_list_tweet.sort(desc("TweetVolume")).limit(5)
popular = pop_list_tweet.select(col("Tweet")).limit(1).collect()
res={}
for i in popular:
    res.update(i.asDict())
mostPopular= res['Tweet']
plt.title("Most Popular Tweets  ",fontsize = 40)
pop=pop_list_tweet.toPandas()
ax1 = plt.subplot(121, aspect='equal')
pop.plot(kind='pie', y = 'TweetVolume', ax=ax1, autopct='%1.1f%%', 
 startangle=90, shadow=False, labels=pop['Tweet'], legend = False, fontsize= 35, radius=7)


#search the most poplar tweet topic and retrieve tweets on it
tweets = api.search(q=mostPopular, count=1500)
lists = []
for tweet in tweets[:1500]:
    hashtags = tweet.entities.get('hashtags')
    for hashtag in hashtags:
            lists.append(hashtag['text'])
print(lists)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-27-2564ede2f093> in <module>
     10 # find top 5 popular tweet topic based on tweet volume
     11 pop_list_tweet = [tweet_pop[2*i:2*i+2] for i in range(0,math.ceil(len(tweet_pop)/2))]
---> 12 pop_list_tweet = spark.createDataFrame(pop_list_tweet, ["Tweet", "TweetVolume"])
     13 pop_list_tweet = pop_list_tweet.withColumn("Tweet", pop_list_tweet["Tweet"].cast(StringType())).withColumn("TweetVolume", pop_list_tweet["TweetVolume"].cast(IntegerType()))
     14 pop_list_tweet = pop_list_tweet.sort(desc("TweetVolume")).limit(5)

NameError: name 'spark' is not defined
In [28]:
spark = SparkSession(sc)
In [29]:
#Enter WOEID oF Location to know MOST POPULAR TWEETS in that location
api = twitter()
tweet_pop = []
place = api.trends_place(id = 395269) #Caracas
for location in place:
    for trend in location["trends"]:
        tweet_pop.append(trend["name"])
        tweet_pop.append(trend["tweet_volume"])

# find top 5 popular tweet topic based on tweet volume        
pop_list_tweet = [tweet_pop[2*i:2*i+2] for i in range(0,math.ceil(len(tweet_pop)/2))]
pop_list_tweet = spark.createDataFrame(pop_list_tweet, ["Tweet", "TweetVolume"])
pop_list_tweet = pop_list_tweet.withColumn("Tweet", pop_list_tweet["Tweet"].cast(StringType())).withColumn("TweetVolume", pop_list_tweet["TweetVolume"].cast(IntegerType()))
pop_list_tweet = pop_list_tweet.sort(desc("TweetVolume")).limit(5)
popular = pop_list_tweet.select(col("Tweet")).limit(1).collect()
res={}
for i in popular:
    res.update(i.asDict())
mostPopular= res['Tweet']
plt.title("Most Popular Tweets  ",fontsize = 40)
pop=pop_list_tweet.toPandas()
ax1 = plt.subplot(121, aspect='equal')
pop.plot(kind='pie', y = 'TweetVolume', ax=ax1, autopct='%1.1f%%', 
 startangle=90, shadow=False, labels=pop['Tweet'], legend = False, fontsize= 35, radius=7)


#search the most poplar tweet topic and retrieve tweets on it
tweets = api.search(q=mostPopular, count=1500)
lists = []
for tweet in tweets[:1500]:
    hashtags = tweet.entities.get('hashtags')
    for hashtag in hashtags:
            lists.append(hashtag['text'])
print(lists)
['SOULIV', 'SOULIV', 'LFC', 'LFC', 'LFC', 'Liverpool', 'PremierLeague', 'Football', 'EPl', 'Saintsfc', 'LFC', 'LFC', 'LFC', 'SOULIV', 'Liverpool', 'Southampton', 'LFC', 'LFC', 'Liverpool', 'SOTLIV', 'SOULIV', 'SOULIV', 'Liverpool', 'Hendo', 'SouLiv', 'LFC', 'YNWA', 'YNWA', 'LFC', 'Liverpool', 'LFC', 'YNWA', 'Saintsfc', 'LFC']
In [30]:
#get top popular hashtags used in tweets for the topic
rdd = sc.parallelize(lists)
new_row = Row("Hashtags")
df = rdd.map(new_row).toDF()
df.show()

#calculate percentages of the top hashtags
from pyspark.sql.types import StructType
total=df.count()
result=(df.groupBy('Hashtags').count()
    .withColumn('Total',func.lit(total))
    .withColumn('Percentage',(col('count')/col('Total') *100))
    .withColumn('%',bround('Percentage',2))
    .sort(desc('%'))
    .limit(5)
    )

#plot popular hashtags
pdf1=result.toPandas()
pdf1.plot(kind='barh',x='Hashtags',y='%',colormap='winter_r')
+-------------+
|     Hashtags|
+-------------+
|       SOULIV|
|       SOULIV|
|          LFC|
|          LFC|
|          LFC|
|    Liverpool|
|PremierLeague|
|     Football|
|          EPl|
|     Saintsfc|
|          LFC|
|          LFC|
|          LFC|
|       SOULIV|
|    Liverpool|
|  Southampton|
|          LFC|
|          LFC|
|    Liverpool|
|       SOTLIV|
+-------------+
only showing top 20 rows

Out[30]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f6a6c857f98>
In [31]:
#Details about the tweets on popular topic (Tweet, Retweet, Likes, Creation Date)
list_tweet = []
for tweet in tweets:
    data = re.sub(r'\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', tweet.text).replace("\n","")
    list_tweet.append(data)
    list_tweet.append(tweet.retweet_count)
    list_tweet.append(tweet.favorite_count)
    list_tweet.append(str(parser.parse(str(tweet.created_at))))
list_tweet = [list_tweet[4*i:4*i+4] for i in range(0,math.ceil(len(list_tweet)/4))]
tweet_df = spark.createDataFrame(list_tweet, ["Tweet", "Retweet","Likes","Created_At"])
tweet_df = tweet_df.withColumn("Retweet", tweet_df["Retweet"].cast(IntegerType())).withColumn("Likes", tweet_df["Likes"].cast(IntegerType())).withColumn("Created_At", tweet_df["Created_At"].cast(DateType()))
tweet_df.show()
+--------------------+-------+-----+----------+
|               Tweet|Retweet|Likes|Created_At|
+--------------------+-------+-----+----------+
|RT @AlexGoldberg_...|    293|    0|2019-04-05|
|RT @LFCUSA: WE AR...|    906|    0|2019-04-05|
|RT @_lisa_anderso...|      1|    0|2019-04-05|
|RT @brfootball: L...|    448|    0|2019-04-05|
|@live_forever07 @...|      0|    0|2019-04-05|
|RT @theinfonerds:...|      2|    0|2019-04-05|
|Another 3 point c...|      0|    0|2019-04-05|
|Premier League Ti...|      0|    0|2019-04-05|
|RT @robertmarawa:...|     70|    0|2019-04-05|
|RT @ActuFoot_: Ma...|    246|    0|2019-04-05|
|RT @BleacherRepor...|    596|    0|2019-04-05|
|Now with Liverpoo...|      0|    1|2019-04-05|
|I know as a unite...|      0|    0|2019-04-05|
|RT @NBCSportsSocc...|    724|    0|2019-04-05|
|RT @MelissaReddy_...|   1537|    0|2019-04-05|
|RT @RyanMason: An...|   1519|    0|2019-04-05|
|RT @HopkinsBRFC: ...|     16|    0|2019-04-05|
|RT @MaddockMirror...|    318|    0|2019-04-05|
|RT @DevilsOfUnite...|    498|    0|2019-04-05|
|RT @MaddockMirror...|    318|    0|2019-04-05|
+--------------------+-------+-----+----------+
only showing top 20 rows

In [32]:
#retrieve top 5 most retweeted tweets
max_retweet = tweet_df.groupBy('Tweet').agg(func.sum("Retweet").alias('Retweet'))
max_retweet = max_retweet.sort(desc('Retweet')).limit(5)
max_retweet.show()
plt.title("Most Retweeted Tweets on " + mostPopular,fontsize = 40)
pdf2=max_retweet.toPandas()
ax1 = plt.subplot(121, aspect='equal')
pdf2.plot(kind='pie', y = 'Retweet', ax=ax1, autopct='%1.1f%%', 
 startangle=90, shadow=False, labels=pdf2['Tweet'], legend = False, fontsize= 46, radius=25)
+--------------------+-------+
|               Tweet|Retweet|
+--------------------+-------+
|RT @MelissaReddy_...|   7685|
|RT @LFCUSA: WE AR...|   4530|
|RT @BleacherRepor...|   1788|
|RT @Fla_Humor: Em...|   1638|
|RT @MaddockMirror...|   1590|
+--------------------+-------+

Out[32]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f6a691ec048>
In [34]:
#retrieve top 5 most liked tweets
max_likes = tweet_df.groupBy('Tweet').agg(func.sum("Likes").alias('Likes'))
max_likes = max_likes.sort(desc('Likes')).limit(5)
max_likes.show()
pdf3=max_likes.toPandas()
plt.title("Most Liked Tweets on " + mostPopular,fontsize = 40)
ax1 = plt.subplot(121, aspect='equal')
pdf3.plot(kind='pie', y = 'Likes', ax=ax1, autopct='%1.1f%%', startangle=90, shadow=False, labels=pdf3['Tweet'], legend = False, fontsize= 40, radius=8)
+--------------------+-----+
|               Tweet|Likes|
+--------------------+-----+
|💫 MOHAMED SALAH ...|   12|
|Now with Liverpoo...|    1|
|RT @RyanMason: An...|    0|
|RT @jason_lewisj2...|    0|
|❤ RT @LFCphoto: -...|    0|
+--------------------+-----+

Out[34]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f6a6947ccc0>
In [35]:
popular_month=(tweet_df.groupBy('Created_At').count())
mon_popular = popular_month.sort(desc('count')).show()
+----------+-----+
|Created_At|count|
+----------+-----+
|2019-04-05|   94|
+----------+-----+

In [ ]: