from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window, Row
# Source bucket
s3_bucket = "s3://polakowo-yelp2/"
# Source data paths: Yelp
yelp_dir = s3_bucket + "yelp_dataset/"
business_path = yelp_dir + "business.json"
review_path = yelp_dir + "review.json"
user_path = yelp_dir + "user.json"
checkin_path = yelp_dir + "checkin.json"
tip_path = yelp_dir + "tip.json"
photo_path = yelp_dir + "photo.json"
# Source data paths: Demographics
demo_dir = s3_bucket + "demo_dataset/"
demo_path = demo_dir + "us-cities-demographics.json"
# Source data paths: Weather
weather_dir = s3_bucket + "weather_dataset/"
city_attr_path = weather_dir + "city_attributes.csv"
weather_temp_path = weather_dir + "temperature.csv"
weather_desc_path = weather_dir + "weather_description.csv"
# Target data paths
staging_dir = s3_bucket + "staging_data/"
business_attributes_path = staging_dir + "business_attributes"
cities_path = staging_dir + "cities"
addresses_path = staging_dir + "addresses"
categories_path = staging_dir + "categories"
business_categories_path = staging_dir + "business_categories"
business_hours_path = staging_dir + "business_hours"
businesses_path = staging_dir + "businesses"
reviews_path = staging_dir + "reviews"
users_path = staging_dir + "users"
elite_years_path = staging_dir + "elite_years"
friends_path = staging_dir + "friends"
checkins_path = staging_dir + "checkins"
tips_path = staging_dir + "tips"
photos_path = staging_dir + "photos"
city_weather_path = staging_dir + "city_weather"
def describe(df):
# The total number of records
print("--------------------------------------------")
print("count:")
print(df.count())
# The number of null values for each column
print("--------------------------------------------")
print("nulls:")
print(df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).collect())
# The maximum number of chars for each string column (useful for table definition in Redshift)
print("--------------------------------------------")
print("max_str_lengths:")
print({k: df.select(k).agg(F.max(F.length(k))).first() for k, v in df.dtypes if v.startswith('string')})
# Print schema
print("--------------------------------------------")
print("schema:")
df.printSchema()
# Print first record
print("--------------------------------------------")
print("example:")
print(df.first())
def write_csv(df, path):
# For tables with standardized text fields
# CSV can be parsed and processed incrementally
df.write\
.format('csv')\
.option("header", "true")\
.option("delimiter", "\t")\
.mode('overwrite')\
.save(path)
def write_json(df, path, n_partitions):
# JSON files have to be parsed as a whole by Amazon Redshift
# Redshift accepts JSON files of max. 4MB size (-> smart partitioning)
# What you gain from using JSON is stricter string handling
df.repartition(n_partitions).write\
.format('json')\
.option("nullValue", None)\
.mode('overwrite')\
.save(path)
def write_parquet(df, path):
# The main format for our output files, as it combines pros of CSV and JSON formats
df.write.parquet(path, mode="overwrite")
business_df = spark.read.json(business_path)
describe(business_df)
('nulls:', [Row(address=0, attributes=28836, business_id=0, categories=482, city=0, hours=44830, is_open=0, latitude=0, longitude=0, name=0, postal_code=0, review_count=0, stars=0, state=0)]) ('count:', 192609) schema: root |-- address: string (nullable = true) |-- attributes: struct (nullable = true) | |-- AcceptsInsurance: string (nullable = true) | |-- AgesAllowed: string (nullable = true) | |-- Alcohol: string (nullable = true) | |-- Ambience: string (nullable = true) | |-- BYOB: string (nullable = true) | |-- BYOBCorkage: string (nullable = true) | |-- BestNights: string (nullable = true) | |-- BikeParking: string (nullable = true) | |-- BusinessAcceptsBitcoin: string (nullable = true) | |-- BusinessAcceptsCreditCards: string (nullable = true) | |-- BusinessParking: string (nullable = true) | |-- ByAppointmentOnly: string (nullable = true) | |-- Caters: string (nullable = true) | |-- CoatCheck: string (nullable = true) | |-- Corkage: string (nullable = true) | |-- DietaryRestrictions: string (nullable = true) | |-- DogsAllowed: string (nullable = true) | |-- DriveThru: string (nullable = true) | |-- GoodForDancing: string (nullable = true) | |-- GoodForKids: string (nullable = true) | |-- GoodForMeal: string (nullable = true) | |-- HairSpecializesIn: string (nullable = true) | |-- HappyHour: string (nullable = true) | |-- HasTV: string (nullable = true) | |-- Music: string (nullable = true) | |-- NoiseLevel: string (nullable = true) | |-- Open24Hours: string (nullable = true) | |-- OutdoorSeating: string (nullable = true) | |-- RestaurantsAttire: string (nullable = true) | |-- RestaurantsCounterService: string (nullable = true) | |-- RestaurantsDelivery: string (nullable = true) | |-- RestaurantsGoodForGroups: string (nullable = true) | |-- RestaurantsPriceRange2: string (nullable = true) | |-- RestaurantsReservations: string (nullable = true) | |-- RestaurantsTableService: string (nullable = true) | |-- RestaurantsTakeOut: string (nullable = true) | |-- Smoking: string (nullable = true) | |-- WheelchairAccessible: string (nullable = true) | |-- WiFi: string (nullable = true) |-- business_id: string (nullable = true) |-- categories: string (nullable = true) |-- city: string (nullable = true) |-- hours: struct (nullable = true) | |-- Friday: string (nullable = true) | |-- Monday: string (nullable = true) | |-- Saturday: string (nullable = true) | |-- Sunday: string (nullable = true) | |-- Thursday: string (nullable = true) | |-- Tuesday: string (nullable = true) | |-- Wednesday: string (nullable = true) |-- is_open: long (nullable = true) |-- latitude: double (nullable = true) |-- longitude: double (nullable = true) |-- name: string (nullable = true) |-- postal_code: string (nullable = true) |-- review_count: long (nullable = true) |-- stars: double (nullable = true) |-- state: string (nullable = true) ('example:', [Row(address=u'2818 E Camino Acequia Drive', attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, Ambience=None, BYOB=None, BYOBCorkage=None, BestNights=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, BusinessParking=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DietaryRestrictions=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids=u'False', GoodForMeal=None, HairSpecializesIn=None, HappyHour=None, HasTV=None, Music=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None), business_id=u'1SWheh84yJXfytovILXOAQ', categories=u'Golf, Active Life', city=u'Phoenix', hours=None, is_open=0, latitude=33.5221425, longitude=-112.0184807, name=u'Arizona Biltmore Golf Club', postal_code=u'85016', review_count=5, stars=3.0, state=u'AZ')])
Unfold deep nested field attributes
into a new table.
business_attributes_df = business_df.select("business_id", "attributes.*")
business_attributes_df.select("AcceptsInsurance").distinct().collect()
[Row(AcceptsInsurance=u'None'), Row(AcceptsInsurance=u'False'), Row(AcceptsInsurance=None), Row(AcceptsInsurance=u'True')]
def parse_boolean(x):
# Convert boolean strings to native boolean format
if x is None or x == 'None':
return None
if x == 'True':
return True
if x == 'False':
return False
parse_boolean_udf = F.udf(parse_boolean, T.BooleanType())
bool_attrs = [
"AcceptsInsurance",
"BYOB",
"BikeParking",
"BusinessAcceptsBitcoin",
"BusinessAcceptsCreditCards",
"ByAppointmentOnly",
"Caters",
"CoatCheck",
"Corkage",
"DogsAllowed",
"DriveThru",
"GoodForDancing",
"GoodForKids",
"HappyHour",
"HasTV",
"Open24Hours",
"OutdoorSeating",
"RestaurantsCounterService",
"RestaurantsDelivery",
"RestaurantsGoodForGroups",
"RestaurantsReservations",
"RestaurantsTableService",
"RestaurantsTakeOut",
"WheelchairAccessible"
]
for attr in bool_attrs:
business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_udf(attr))
business_attributes_df.select("AcceptsInsurance").distinct().collect()
[Row(AcceptsInsurance=None), Row(AcceptsInsurance=True), Row(AcceptsInsurance=False)]
business_attributes_df.select("AgesAllowed").distinct().collect()
[Row(AgesAllowed=u'None'), Row(AgesAllowed=None), Row(AgesAllowed=u"u'18plus'"), Row(AgesAllowed=u"u'21plus'"), Row(AgesAllowed=u"u'allages'"), Row(AgesAllowed=u"u'19plus'")]
def parse_string(x):
# Clean and standardize strings
if x is None or x == '':
return None
# Some strings are of format u"u'string'"
return x.replace("u'", "").replace("'", "").lower()
parse_string_udf = F.udf(parse_string, T.StringType())
str_attrs = [
"AgesAllowed",
"Alcohol",
"BYOBCorkage",
"NoiseLevel",
"RestaurantsAttire",
"Smoking",
"WiFi",
]
for attr in str_attrs:
business_attributes_df = business_attributes_df.withColumn(attr, parse_string_udf(attr))
business_attributes_df.select("AgesAllowed").distinct().collect()
[Row(AgesAllowed=u'none'), Row(AgesAllowed=u'19plus'), Row(AgesAllowed=None), Row(AgesAllowed=u'allages'), Row(AgesAllowed=u'21plus'), Row(AgesAllowed=u'18plus')]
business_attributes_df.select("RestaurantsPriceRange2").distinct().collect()
[Row(RestaurantsPriceRange2=u'3'), Row(RestaurantsPriceRange2=u'None'), Row(RestaurantsPriceRange2=None), Row(RestaurantsPriceRange2=u'1'), Row(RestaurantsPriceRange2=u'4'), Row(RestaurantsPriceRange2=u'2')]
def parse_integer(x):
# Convert integers masked as strings to native integer format
if x is None or x == 'None':
return None
return int(x)
parse_integer_udf = F.udf(parse_integer, T.IntegerType())
int_attrs = [
"RestaurantsPriceRange2",
]
for attr in int_attrs:
business_attributes_df = business_attributes_df.withColumn(attr, parse_integer_udf(attr))
business_attributes_df.select("RestaurantsPriceRange2").distinct().collect()
[Row(RestaurantsPriceRange2=None), Row(RestaurantsPriceRange2=1), Row(RestaurantsPriceRange2=3), Row(RestaurantsPriceRange2=4), Row(RestaurantsPriceRange2=2)]
business_attributes_df.select("business_id", "Ambience").where("Ambience is not null").first()
Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Ambience=u"{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'divey': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': True}")
import ast
def parse_boolean_dict(x):
# Convert dicts masked as strings to string:boolean format
if x is None or x == 'None' or x == '':
return None
return ast.literal_eval(x)
parse_boolean_dict_udf = F.udf(parse_boolean_dict, T.MapType(T.StringType(), T.BooleanType()))
bool_dict_attrs = [
"Ambience",
"BestNights",
"BusinessParking",
"DietaryRestrictions",
"GoodForMeal",
"HairSpecializesIn",
"Music"
]
for attr in bool_dict_attrs:
business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_dict_udf(attr))
# Get all keys of the MapType
# [Row(key=u'romantic'), Row(key=u'casual'), ...
key_rows = business_attributes_df.select(F.explode(attr)).select("key").distinct().collect()
# Convert each key into column (with proper name)
exprs = ["{}['{}'] as {}".format(attr, row.key, attr+"_"+row.key.replace('-', '_')) for row in key_rows]
business_attributes_df = business_attributes_df.selectExpr("*", *exprs).drop(attr)
business_attributes_df.where("business_id = 'QXAEGFB4oINsVuTFxEYKFQ'")\
.select("business_id", *filter(lambda x: x.startswith("Ambience"), business_attributes_df.columns))\
.collect()
[Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Ambience_romantic=False, Ambience_casual=True, Ambience_trendy=False, Ambience_intimate=False, Ambience_hipster=False, Ambience_upscale=False, Ambience_divey=False, Ambience_touristy=False, Ambience_classy=False)]
describe(business_attributes_df)
('nulls:', [Row(business_id=0, AcceptsInsurance=185359, AgesAllowed=192486, Alcohol=144146, BYOB=192581, BYOBCorkage=191186, BikeParking=107210, BusinessAcceptsBitcoin=179524, BusinessAcceptsCreditCards=79476, ByAppointmentOnly=145756, Caters=151981, CoatCheck=189103, Corkage=191947, DogsAllowed=185174, DriveThru=189417, GoodForDancing=187867, GoodForKids=126299, HappyHour=187403, HasTV=144511, NoiseLevel=148730, Open24Hours=192596, OutdoorSeating=137786, RestaurantsAttire=143970, RestaurantsCounterService=192598, RestaurantsDelivery=140087, RestaurantsGoodForGroups=137891, RestaurantsPriceRange2=84430, RestaurantsReservations=140322, RestaurantsTableService=175436, RestaurantsTakeOut=130532, Smoking=189111, WheelchairAccessible=172650, WiFi=142545, Ambience_romantic=145072, Ambience_casual=145072, Ambience_trendy=145072, Ambience_intimate=145072, Ambience_hipster=145695, Ambience_upscale=145260, Ambience_divey=152636, Ambience_touristy=145072, Ambience_classy=145072, BestNights_sunday=189140, BestNights_thursday=189140, BestNights_monday=189140, BestNights_wednesday=189140, BestNights_saturday=189140, BestNights_friday=189140, BestNights_tuesday=189140, BusinessParking_valet=91169, BusinessParking_lot=91177, BusinessParking_validated=91180, BusinessParking_garage=91159, BusinessParking_street=91177, DietaryRestrictions_kosher=192557, DietaryRestrictions_dairy_free=192557, DietaryRestrictions_vegan=192557, DietaryRestrictions_vegetarian=192557, DietaryRestrictions_gluten_free=192557, DietaryRestrictions_soy_free=192557, DietaryRestrictions_halal=192557, GoodForMeal_lunch=162897, GoodForMeal_brunch=162895, GoodForMeal_dinner=162897, GoodForMeal_latenight=162897, GoodForMeal_dessert=162897, GoodForMeal_breakfast=162897, HairSpecializesIn_curly=191634, HairSpecializesIn_asian=191779, HairSpecializesIn_perms=191634, HairSpecializesIn_africanamerican=191779, HairSpecializesIn_straightperms=191779, HairSpecializesIn_kids=191634, HairSpecializesIn_coloring=191634, HairSpecializesIn_extensions=191634, Music_no_music=188653, Music_dj=187617, Music_live=188087, Music_karaoke=188128, Music_video=188126, Music_background_music=188131, Music_jukebox=188102)]) ('count:', 192609) schema: root |-- business_id: string (nullable = true) |-- AcceptsInsurance: string (nullable = true) |-- AgesAllowed: string (nullable = true) |-- Alcohol: string (nullable = true) |-- BYOB: string (nullable = true) |-- BYOBCorkage: string (nullable = true) |-- BikeParking: string (nullable = true) |-- BusinessAcceptsBitcoin: string (nullable = true) |-- BusinessAcceptsCreditCards: string (nullable = true) |-- ByAppointmentOnly: string (nullable = true) |-- Caters: string (nullable = true) |-- CoatCheck: string (nullable = true) |-- Corkage: string (nullable = true) |-- DogsAllowed: string (nullable = true) |-- DriveThru: string (nullable = true) |-- GoodForDancing: string (nullable = true) |-- GoodForKids: string (nullable = true) |-- HappyHour: string (nullable = true) |-- HasTV: string (nullable = true) |-- NoiseLevel: string (nullable = true) |-- Open24Hours: string (nullable = true) |-- OutdoorSeating: string (nullable = true) |-- RestaurantsAttire: string (nullable = true) |-- RestaurantsCounterService: string (nullable = true) |-- RestaurantsDelivery: string (nullable = true) |-- RestaurantsGoodForGroups: string (nullable = true) |-- RestaurantsPriceRange2: string (nullable = true) |-- RestaurantsReservations: string (nullable = true) |-- RestaurantsTableService: string (nullable = true) |-- RestaurantsTakeOut: string (nullable = true) |-- Smoking: string (nullable = true) |-- WheelchairAccessible: string (nullable = true) |-- WiFi: string (nullable = true) |-- Ambience_romantic: boolean (nullable = true) |-- Ambience_casual: boolean (nullable = true) |-- Ambience_trendy: boolean (nullable = true) |-- Ambience_intimate: boolean (nullable = true) |-- Ambience_hipster: boolean (nullable = true) |-- Ambience_upscale: boolean (nullable = true) |-- Ambience_divey: boolean (nullable = true) |-- Ambience_touristy: boolean (nullable = true) |-- Ambience_classy: boolean (nullable = true) |-- BestNights_sunday: boolean (nullable = true) |-- BestNights_thursday: boolean (nullable = true) |-- BestNights_monday: boolean (nullable = true) |-- BestNights_wednesday: boolean (nullable = true) |-- BestNights_saturday: boolean (nullable = true) |-- BestNights_friday: boolean (nullable = true) |-- BestNights_tuesday: boolean (nullable = true) |-- BusinessParking_valet: boolean (nullable = true) |-- BusinessParking_lot: boolean (nullable = true) |-- BusinessParking_validated: boolean (nullable = true) |-- BusinessParking_garage: boolean (nullable = true) |-- BusinessParking_street: boolean (nullable = true) |-- DietaryRestrictions_kosher: boolean (nullable = true) |-- DietaryRestrictions_dairy_free: boolean (nullable = true) |-- DietaryRestrictions_vegan: boolean (nullable = true) |-- DietaryRestrictions_vegetarian: boolean (nullable = true) |-- DietaryRestrictions_gluten_free: boolean (nullable = true) |-- DietaryRestrictions_soy_free: boolean (nullable = true) |-- DietaryRestrictions_halal: boolean (nullable = true) |-- GoodForMeal_lunch: boolean (nullable = true) |-- GoodForMeal_brunch: boolean (nullable = true) |-- GoodForMeal_dinner: boolean (nullable = true) |-- GoodForMeal_latenight: boolean (nullable = true) |-- GoodForMeal_dessert: boolean (nullable = true) |-- GoodForMeal_breakfast: boolean (nullable = true) |-- HairSpecializesIn_curly: boolean (nullable = true) |-- HairSpecializesIn_asian: boolean (nullable = true) |-- HairSpecializesIn_perms: boolean (nullable = true) |-- HairSpecializesIn_africanamerican: boolean (nullable = true) |-- HairSpecializesIn_straightperms: boolean (nullable = true) |-- HairSpecializesIn_kids: boolean (nullable = true) |-- HairSpecializesIn_coloring: boolean (nullable = true) |-- HairSpecializesIn_extensions: boolean (nullable = true) |-- Music_no_music: boolean (nullable = true) |-- Music_dj: boolean (nullable = true) |-- Music_live: boolean (nullable = true) |-- Music_karaoke: boolean (nullable = true) |-- Music_video: boolean (nullable = true) |-- Music_background_music: boolean (nullable = true) |-- Music_jukebox: boolean (nullable = true) ('example:', [Row(business_id=u'1SWheh84yJXfytovILXOAQ', AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, BYOB=None, BYOBCorkage=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids=u'False', HappyHour=None, HasTV=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None, Ambience_romantic=None, Ambience_casual=None, Ambience_trendy=None, Ambience_intimate=None, Ambience_hipster=None, Ambience_upscale=None, Ambience_divey=None, Ambience_touristy=None, Ambience_classy=None, BestNights_sunday=None, BestNights_thursday=None, BestNights_monday=None, BestNights_wednesday=None, BestNights_saturday=None, BestNights_friday=None, BestNights_tuesday=None, BusinessParking_valet=None, BusinessParking_lot=None, BusinessParking_validated=None, BusinessParking_garage=None, BusinessParking_street=None, DietaryRestrictions_kosher=None, DietaryRestrictions_dairy_free=None, DietaryRestrictions_vegan=None, DietaryRestrictions_vegetarian=None, DietaryRestrictions_gluten_free=None, DietaryRestrictions_soy_free=None, DietaryRestrictions_halal=None, GoodForMeal_lunch=None, GoodForMeal_brunch=None, GoodForMeal_dinner=None, GoodForMeal_latenight=None, GoodForMeal_dessert=None, GoodForMeal_breakfast=None, HairSpecializesIn_curly=None, HairSpecializesIn_asian=None, HairSpecializesIn_perms=None, HairSpecializesIn_africanamerican=None, HairSpecializesIn_straightperms=None, HairSpecializesIn_kids=None, HairSpecializesIn_coloring=None, HairSpecializesIn_extensions=None, Music_no_music=None, Music_dj=None, Music_live=None, Music_karaoke=None, Music_video=None, Music_background_music=None, Music_jukebox=None)])
write_parquet(business_attributes_df, business_attributes_path)
Take city
and state_code
from the business.json
and enrich them with demographics data.
demo_df = spark.read.json(demo_path)
describe(demo_df)
('nulls:', [Row(datasetid=0, fields=0, record_timestamp=0, recordid=0)]) ('count:', 2891) schema: root |-- datasetid: string (nullable = true) |-- fields: struct (nullable = true) | |-- average_household_size: double (nullable = true) | |-- city: string (nullable = true) | |-- count: long (nullable = true) | |-- female_population: long (nullable = true) | |-- foreign_born: long (nullable = true) | |-- male_population: long (nullable = true) | |-- median_age: double (nullable = true) | |-- number_of_veterans: long (nullable = true) | |-- race: string (nullable = true) | |-- state: string (nullable = true) | |-- state_code: string (nullable = true) | |-- total_population: long (nullable = true) |-- record_timestamp: string (nullable = true) |-- recordid: string (nullable = true) ('example:', [Row(datasetid=u'us-cities-demographics', fields=Row(average_household_size=2.73, city=u'Newark', count=76402, female_population=143873, foreign_born=86253, male_population=138040, median_age=34.6, number_of_veterans=5829, race=u'White', state=u'New Jersey', state_code=u'NJ', total_population=281913), record_timestamp=u'1970-01-01T01:00:00+01:00', recordid=u'85458783ecf5da6572ee00e7120f68eff4fd0d61')])
demo_df.groupby("fields.city", "fields.state_code").count().sort(F.col("count").desc()).first()
Row(city=u'Fontana', state_code=u'CA', count=5)
Seems like this table isn't keyed by city and state code. Let's find the candidate key.
demo_df.groupBy("fields.city", "fields.state_code").agg(F.collect_list(F.col("fields.race")).alias("races")).first()
Row(city=u'Asheville', state_code=u'NC', races=[u'White', u'Black or African-American', u'Asian', u'American Indian and Alaska Native', u'Hispanic or Latino'])
demo_df.groupBy("fields.city", "fields.state_code", "fields.race").count().sort(F.col("count").desc()).first()
Row(city=u'Jurupa Valley', state_code=u'CA', race=u'Black or African-American', count=1)
The candidate key is composed of city
, state_code
and race
fields.
Each JSON object here seems to describe (1) the demographics of the city and (2) the number of people belonging to some race (race
and count
fields). Since each record is unique by city
, state_code
and race
fields, while other demographic fields are unique by only city
and state_code
(which means lots of redundancy), we need to transform race
column into columns corresponding to each of its values via pivot
function.
def prepare_race(x):
# We want to make each race a stand-alone column, thus each race value needs a proper naming
return x.replace(" ", "_").replace("-", "_").lower()
prepare_race_udf = F.udf(prepare_race, T.StringType())
# Group by all columns except race and count and convert race rows into columns
demo_df = demo_df.select("fields.*")\
.withColumn("race", prepare_race_udf("race"))
demo_df = demo_df.groupby(*set(demo_df.schema.names).difference(set(["race", "count"])))\
.pivot('race')\
.max('count')
describe(demo_df)
('nulls:', [Row(total_population=0, city=0, number_of_veterans=7, male_population=1, foreign_born=7, average_household_size=8, median_age=0, state=0, state_code=0, female_population=1, american_indian_and_alaska_native=57, asian=13, black_or_african_american=12, hispanic_or_latino=0, white=7)]) ('count:', 596) schema: root |-- total_population: long (nullable = true) |-- city: string (nullable = true) |-- number_of_veterans: long (nullable = true) |-- male_population: long (nullable = true) |-- foreign_born: long (nullable = true) |-- average_household_size: double (nullable = true) |-- median_age: double (nullable = true) |-- state: string (nullable = true) |-- state_code: string (nullable = true) |-- female_population: long (nullable = true) |-- american_indian_and_alaska_native: long (nullable = true) |-- asian: long (nullable = true) |-- black_or_african_american: long (nullable = true) |-- hispanic_or_latino: long (nullable = true) |-- white: long (nullable = true) ('example:', [Row(total_population=1567442, city=u'Philadelphia', number_of_veterans=61995, male_population=741270, foreign_born=205339, average_household_size=2.61, median_age=34.1, state=u'Pennsylvania', state_code=u'PA', female_population=826172, american_indian_and_alaska_native=17500, asian=122721, black_or_african_american=691186, hispanic_or_latino=219038, white=688130)])
# Merge city data with demographics data
cities_df = business_df.selectExpr("city", "state as state_code")\
.distinct()\
.join(demo_df, ["city", "state_code"], how="left")\
.withColumn("city_id", F.monotonically_increasing_id())
describe(cities_df)
count: 1258 nulls: [Row(city=0, state_code=0, total_population=1210, number_of_veterans=1210, male_population=1210, foreign_born=1210, average_household_size=1210, median_age=1210, state=1210, female_population=1210, american_indian_and_alaska_native=1211, asian=1210, black_or_african_american=1210, hispanic_or_latino=1210, white=1210, city_id=0)] max_str_lengths: {'city': Row(max(length(city))=50), 'state': Row(max(length(state))=14), 'state_code': Row(max(length(state_code))=3)} schema: root |-- city: string (nullable = true) |-- state_code: string (nullable = true) |-- total_population: long (nullable = true) |-- number_of_veterans: long (nullable = true) |-- male_population: long (nullable = true) |-- foreign_born: long (nullable = true) |-- average_household_size: double (nullable = true) |-- median_age: double (nullable = true) |-- state: string (nullable = true) |-- female_population: long (nullable = true) |-- american_indian_and_alaska_native: long (nullable = true) |-- asian: long (nullable = true) |-- black_or_african_american: long (nullable = true) |-- hispanic_or_latino: long (nullable = true) |-- white: long (nullable = true) |-- city_id: long (nullable = false) example: Row(city=u'Mesa', state_code=u'AZ', total_population=471833, number_of_veterans=31808, male_population=234998, foreign_born=57492, average_household_size=2.68, median_age=36.9, state=u'Arizona', female_population=236835, american_indian_and_alaska_native=16044, asian=14608, black_or_african_american=22699, hispanic_or_latino=131425, white=413010, city_id=0)
write_parquet(cities_df, cities_path)
Pull address information from business.json, but instead of city take newly created city_id
.
addresses_df = business_df.selectExpr("address", "latitude", "longitude", "postal_code", "city", "state as state_code")\
.join(cities_df.select("city", "state_code", "city_id"), ["city", "state_code"], how='left')\
.drop("city", "state_code")\
.distinct()\
.withColumn("address_id", F.monotonically_increasing_id())
describe(addresses_df)
-------------------------------------------- count: 178763 -------------------------------------------- nulls: [Row(address=0, latitude=0, longitude=0, postal_code=0, city_id=0, address_id=0)] -------------------------------------------- max_str_lengths: {'postal_code': Row(max(length(postal_code))=8), 'address': Row(max(length(address))=118)} -------------------------------------------- schema: root |-- address: string (nullable = true) |-- latitude: double (nullable = true) |-- longitude: double (nullable = true) |-- postal_code: string (nullable = true) |-- city_id: long (nullable = true) |-- address_id: long (nullable = false) -------------------------------------------- example: Row(address=u'3495 Lawrence Ave E', latitude=43.757291, longitude=-79.2293784, postal_code=u'M1H 1B2', city_id=8589934592, address_id=0)
write_parquet(addresses_df, addresses_path)
First, create a list of unique categories and assign each of them an id.
business_df.select("business_id", "categories").first()
Row(business_id=u'1SWheh84yJXfytovILXOAQ', categories=u'Golf, Active Life')
import re
def parse_categories(categories):
# Convert comma separated list of strings masked as a string into a native list type
if categories is None:
return []
parsed = []
# Some strings contain commas, so they have to be extracted beforehand
require_attention = set(["Wills, Trusts, & Probates"])
for s in require_attention:
if categories.find(s) > -1:
parsed.append(s)
categories = categories.replace(s, "")
return list(filter(None, parsed + re.split(r",\s*", categories)))
parse_categories_udf = F.udf(parse_categories, T.ArrayType(T.StringType()))
business_categories_df = business_df.select("business_id", "categories")\
.withColumn("categories", parse_categories_udf("categories"))
# Convert the list of categories in each row into a set of rows
categories_df = business_categories_df.select(F.explode("categories").alias("category"))\
.dropDuplicates()\
.sort("category")\
.withColumn("category_id", F.monotonically_increasing_id())
describe(categories_df)
('nulls:', [Row(category=0, category_id=0)]) ('count:', 1298) schema: root |-- category: string (nullable = true) |-- category_id: long (nullable = false) ('example:', [Row(category=u'3D Printing', category_id=0)])
write_parquet(categories_df, categories_path)
For each record in business.json
, convert list of categories in categories
field into rows of pairs business_id
-category_id
.
import re
def zip_categories(business_id, categories):
# For each value in categories, zip it with business_id to form a pair
return list(zip([business_id] * len(categories), categories))
zip_categories_udf = F.udf(zip_categories, T.ArrayType(T.ArrayType(T.StringType())))
# Zip business_id's and categories and extract them into a new table called business_catagories
business_categories_df = business_categories_df.select(F.explode(zip_categories_udf("business_id", "categories")).alias("cols"))\
.selectExpr("cols[0] as business_id", "cols[1] as category")\
.dropDuplicates()
business_categories_df = business_categories_df.join(categories_df, business_categories_df["category"] == categories_df["category"], how="left")\
.drop("category")
describe(business_categories_df)
-------------------------------------------- count: 788110 -------------------------------------------- nulls: [Row(business_id=0, category_id=0)] -------------------------------------------- max_str_lengths: {'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- category_id: long (nullable = true) -------------------------------------------- example: Row(business_id=u'BeNBXXzqyaHtNQI0mW7EMg', category_id=128849018882)
write_parquet(business_categories_df, business_categories_path)
To enable efficient querying based on business hours, for each day of week, split the time range string into "from" and "to" integers.
business_hours_df = business_df.select("business_id", "hours.*")
business_hours_df.where("Monday is not null").first()
Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Friday=u'9:0-1:0', Monday=u'9:0-0:0', Saturday=u'9:0-1:0', Sunday=u'9:0-0:0', Thursday=u'9:0-0:0', Tuesday=u'9:0-0:0', Wednesday=u'9:0-0:0')
def parse_hours(x):
# Take "9:0-0:0" (9am-00am) and transform it into {from: 900, to: 0}
if x is None:
return None
convert_to_int = lambda x: int(x.split(':')[0]) * 100 + int(x.split(':')[1])
return {
"from": convert_to_int(x.split('-')[0]),
"to": convert_to_int(x.split('-')[1])
}
parse_hours_udf = F.udf(parse_hours, T.StructType([
T.StructField('from', T.IntegerType(), nullable=True),
T.StructField('to', T.IntegerType(), nullable=True)
]))
hour_attrs = [
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday",
]
for attr in hour_attrs:
business_hours_df = business_hours_df.withColumn(attr, parse_hours_udf(attr))\
.selectExpr("*", attr+".from as "+attr+"_from", attr+".to as "+attr+"_to")\
.drop(attr)
business_hours_df.where("business_id = 'QXAEGFB4oINsVuTFxEYKFQ'").first()
Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Monday_from=900, Monday_to=0, Tuesday_from=900, Tuesday_to=0, Wednesday_from=900, Wednesday_to=0, Thursday_from=900, Thursday_to=0, Friday_from=900, Friday_to=100, Saturday_from=900, Saturday_to=100, Sunday_from=900, Sunday_to=0)
describe(business_hours_df)
-------------------------------------------- count: 192609 -------------------------------------------- nulls: [Row(business_id=0, Monday_from=56842, Monday_to=56842, Tuesday_from=49181, Tuesday_to=49181, Wednesday_from=47452, Wednesday_to=47452, Thursday_from=46706, Thursday_to=46706, Friday_from=47435, Friday_to=47435, Saturday_from=66748, Saturday_to=66748, Sunday_from=101273, Sunday_to=101273)] -------------------------------------------- max_str_lengths: {'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- Monday_from: integer (nullable = true) |-- Monday_to: integer (nullable = true) |-- Tuesday_from: integer (nullable = true) |-- Tuesday_to: integer (nullable = true) |-- Wednesday_from: integer (nullable = true) |-- Wednesday_to: integer (nullable = true) |-- Thursday_from: integer (nullable = true) |-- Thursday_to: integer (nullable = true) |-- Friday_from: integer (nullable = true) |-- Friday_to: integer (nullable = true) |-- Saturday_from: integer (nullable = true) |-- Saturday_to: integer (nullable = true) |-- Sunday_from: integer (nullable = true) |-- Sunday_to: integer (nullable = true) -------------------------------------------- example: Row(business_id=u'1SWheh84yJXfytovILXOAQ', Monday_from=None, Monday_to=None, Tuesday_from=None, Tuesday_to=None, Wednesday_from=None, Wednesday_to=None, Thursday_from=None, Thursday_to=None, Friday_from=None, Friday_to=None, Saturday_from=None, Saturday_to=None, Sunday_from=None, Sunday_to=None)
write_parquet(business_hours_df, business_hours_path)
Take any other information and write it into businesses table.
businesses_df = business_df.join(addresses_df, (business_df["address"] == addresses_df["address"])
& (business_df["latitude"] == addresses_df["latitude"])
& (business_df["longitude"] == addresses_df["longitude"])
& (business_df["postal_code"] == addresses_df["postal_code"]), how="left")\
.selectExpr("business_id", "address_id", "cast(is_open as boolean)", "name", "review_count", "stars")
describe(businesses_df)
-------------------------------------------- count: 196728 -------------------------------------------- nulls: [Row(business_id=0, address_id=0, is_open=0, name=0, review_count=0, stars=0)] -------------------------------------------- max_str_lengths: {'business_id': Row(max(length(business_id))=22), 'name': Row(max(length(name))=64)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- address_id: long (nullable = true) |-- is_open: boolean (nullable = true) |-- name: string (nullable = true) |-- review_count: long (nullable = true) |-- stars: double (nullable = true) -------------------------------------------- example: Row(business_id=u'nn8RDkUz0cWLcEhuga4f7Q', address_id=1271310320103, is_open=True, name=u'Handy AZ Man', review_count=19, stars=4.5)
write_parquet(businesses_df, businesses_path)
reviews_df = spark.read.json(review_path)
describe(review_df)
-------------------------------------------- count: 6685900 -------------------------------------------- nulls: [Row(business_id=0, cool=0, date=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)] -------------------------------------------- max_str_lengths: {'date': Row(max(length(date))=19), 'text': Row(max(length(text))=5000), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22), 'review_id': Row(max(length(review_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- cool: long (nullable = true) |-- date: string (nullable = true) |-- funny: long (nullable = true) |-- review_id: string (nullable = true) |-- stars: double (nullable = true) |-- text: string (nullable = true) |-- useful: long (nullable = true) |-- user_id: string (nullable = true) -------------------------------------------- example: Row(business_id=u'ujmEBvifdJM6h6RLv4wQIg', cool=0, date=u'2013-05-07 04:34:36', funny=1, review_id=u'Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text=u'Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id=u'hG7b0MtEbXx5QzbzE6C_VA')
The table can be used as-is, only minor transformations required.
# date field looks more like a timestamp
reviews_df = review_df.withColumnRenamed("date", "ts")\
.withColumn("ts", F.to_timestamp("ts"))
describe(reviews_df)
-------------------------------------------- count: 6685900 -------------------------------------------- nulls: [Row(business_id=0, cool=0, ts=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)] -------------------------------------------- max_str_lengths: {'text': Row(max(length(text))=5000), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22), 'review_id': Row(max(length(review_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- cool: long (nullable = true) |-- ts: timestamp (nullable = true) |-- funny: long (nullable = true) |-- review_id: string (nullable = true) |-- stars: double (nullable = true) |-- text: string (nullable = true) |-- useful: long (nullable = true) |-- user_id: string (nullable = true) -------------------------------------------- example: Row(business_id=u'ujmEBvifdJM6h6RLv4wQIg', cool=0, ts=datetime.datetime(2013, 5, 7, 4, 34, 36), funny=1, review_id=u'Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text=u'Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id=u'hG7b0MtEbXx5QzbzE6C_VA')
write_parquet(reviews_df, reviews_path)
user_df = spark.read.json(user_path)
describe(user_df)
-------------------------------------------- count: 1637138 -------------------------------------------- nulls: [Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, elite=0, fans=0, friends=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)] -------------------------------------------- max_str_lengths: {'yelping_since': Row(max(length(yelping_since))=19), 'friends': Row(max(length(friends))=359878), 'elite': Row(max(length(elite))=64), 'name': Row(max(length(name))=32), 'user_id': Row(max(length(user_id))=22)} -------------------------------------------- schema: root |-- average_stars: double (nullable = true) |-- compliment_cool: long (nullable = true) |-- compliment_cute: long (nullable = true) |-- compliment_funny: long (nullable = true) |-- compliment_hot: long (nullable = true) |-- compliment_list: long (nullable = true) |-- compliment_more: long (nullable = true) |-- compliment_note: long (nullable = true) |-- compliment_photos: long (nullable = true) |-- compliment_plain: long (nullable = true) |-- compliment_profile: long (nullable = true) |-- compliment_writer: long (nullable = true) |-- cool: long (nullable = true) |-- elite: string (nullable = true) |-- fans: long (nullable = true) |-- friends: string (nullable = true) |-- funny: long (nullable = true) |-- name: string (nullable = true) |-- review_count: long (nullable = true) |-- useful: long (nullable = true) |-- user_id: string (nullable = true) |-- yelping_since: string (nullable = true) -------------------------------------------- example: Row(average_stars=4.03, compliment_cool=1, compliment_cute=0, compliment_funny=1, compliment_hot=2, compliment_list=0, compliment_more=0, compliment_note=1, compliment_photos=0, compliment_plain=1, compliment_profile=0, compliment_writer=2, cool=25, elite=u'2015,2016,2017', fans=5, friends=u'c78V-rj8NQcQjOI8KP3UEA, alRMgPcngYSCJ5naFRBz5g, ajcnq75Z5xxkvUSmmJ1bCg, BSMAmp2-wMzCkhTfq9ToNg, jka10dk9ygX76hJG0gfPZQ, dut0e4xvme7QSlesOycHQA, l4l5lBnK356zBua7B-UJ6Q, 0HicMOOs-M_gl2eO-zES4Q, _uI57wL2fLyftrcSFpfSGQ, T4_Qd0YWbC3co6WSMw4vxg, iBRoLWPtWmsI1kdbE9ORSA, xjrUcid6Ymq0DoTJELkYyw, GqadWVzJ6At-vgLzK_SKgA, DvB13VJBmSnbFXBVBsKmDA, vRP9nQkYTeNioDjtxZlVhg, gT0A1iN3eeQ8EMAjJhwQtw, 6yCWjFPtp_AD4x93WAwmnw, 1dKzpNnib-JlViKv8_Gt5g, 3Bv4_JxHXq-gVLOxYMQX0Q, ikQyfu1iViYh8T0us7wiFQ, f1GGltNaB7K5DR1jf3dOmg, tgeFUChlh7v8bZFVl2-hjQ, -9-9oyXlqsMG2he5xIWdLQ, Adj9fBPVJad8vSs-mIP7gw, Ce49RY8CKXVsTifxRYFTsw, M1_7TLi8CbdA89nFLlH4iw, wFsNv-hqbW_F5-IRqfBN6g, 0Q1L7zXHocaUZ2gsG2XJeg, cBFgmOCBdhYa0xoFEAzp_g, VrD_AgiFvzqtlR15vir3SQ, cpE-7HK514Sr5vpSen9CEQ, F1UYelhPFB-zIKlt0ygIZg, CQAL1hvsLMCzuJf9AglsXw, 1KnY1wr15WfEWIRLB9IS6g, QWFQ-kXBiLbid-lm5Jr3dQ, nymT8liFugCrM16lTy0ZfQ, qj69bdd885heDvUPCyHd2Q, DySCZZcgbdrlHgEovk5y9w, lZMJIDuvhT9Dy4KyquLXyA, b_9Gn7wS93AoPZPR0dIJqQ, N07g1IaLh0_6sUjtiSRe4w, YdfPX_7DxSnKvvdCJ57iOw, 8GYryZPD22W7WgQ8kvMkEQ, cpQmAgOWatghp14h1pn1dQ, EnchhymLYMqftCRjqvVWmw, -JdfKhFktE7Zs9BMDFcPeQ, uWhC9eof98zPkvsalgaqJw, eyTlNDDaiPatfe6mheIZ0g, VfHq0o73aKsODvfAhwAQtg, kvD5tICngLAaQDujSFWupA, dXacwEhqi9-3_XT6JeH0Og, NfU0zDaTMEQ4-X9dbQWd9A, cTHWBdjSKbctSUIvWsgFxw, 3IEtCbSDF5t7RkZ20T6s9A, HJJXTrp6UybYyPdQ9DA0JA, JaXogQFVjzGRAeBvzamBHg, NUonfKkjS1iVqnNITtgXZg, D5vaJAYp0sOrGfsj9qvsMA, H27Ecbwwu4FGAlLgICourw, S8HrLmMiE4u8FWYWkNEoTw, Io36Y3xWQcIX9rYvPcYfXQ, J5mcqh8KxYpqjaLBNlwcig, -nTB3_08g06fD0GT8AtDBQ, wMpFA46lihK8oFns_5p65A, RZGFJHeomGJCWp3xcL3ejA, ZoQSzzXoSP1RxOD4Amv9Bg, qzM0EB0SkuuGIFv0adjQAQ, HuM6vvuveken-fPZ7d4olA, H3oukHpGpn9n_mJwSDSQyQ, PkmsJsQ8FIZe8eh8c_u96g, wSByVbwME4MzgkJaFyfvNg, YEVqknoDmrHAoUbHX0nPnA, li3vsK1XAPmeJYAUTYflHQ, MKc8yXi0glbPYt0Qb4PECw, fQPH6W9fksi27gkuUPnFaA, amrCMrDsoRetYFg2kwwdFA, 84dVQ6n6r2ezNaTuc7RkKA, yW9QjWY0olv5-uRKv3t_Kw, 5XJDj7c3eoidfQ3jW18Zgw, txSc6a6pIDctvwyBeu7Aqg, HFbbDCyyqP9xPkUlcxeIdg, hTUv5oh2do6Z3OppPuuiJA, gSqonG9J4fNM-fl_fE71AA, pd9mgTFpBTg5F9x-MsczNg, j3VE22V2GcHiH8UZxfFLfw, NYXlMW-T-3V4Jqr4r-i0Wg, btxgAZedxX8IWhMifA7Xkg, -Hp5mPLiRJNFnyeX5Ygzag, P6-DwVg6-t2JuQwIUEk0iQ, OI2TvxYvZrAodBG_RF53Xw, bHxf_VPKmZur1Bier-6A2A, Et_Sb39cVm81_Xe9HDM8ZQ, 5HwGl2UyYbaRq8aD6YC-fA, ZK228WMcCKLo5thcjD7rdw, iTf8wojwfm0NOi7dOiz3Nw, btYRxQYNJjpecflNHtFH0A, Kgo42FzpW_dXFgDKoewbtg, MNk_1Q_dqOY3xxHZKeO8VQ, AlwD504T9k0m5lkg3k5g6Q', funny=17, name=u'Rashmi', review_count=95, useful=84, user_id=u'l6BmjZMeQD3rDxWUbiAiow', yelping_since=u'2013-10-08 23:11:33')
# Drop fields which will be outsourced and cast timestamp field
users_df = user_df.drop("elite", "friends")\
.withColumn("yelping_since", F.to_timestamp("yelping_since"))
describe(users_df)
-------------------------------------------- count: 1637138 -------------------------------------------- nulls: [Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, fans=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)] -------------------------------------------- max_str_lengths: {'user_id': Row(max(length(user_id))=22), 'name': Row(max(length(name))=32)} -------------------------------------------- schema: root |-- average_stars: double (nullable = true) |-- compliment_cool: long (nullable = true) |-- compliment_cute: long (nullable = true) |-- compliment_funny: long (nullable = true) |-- compliment_hot: long (nullable = true) |-- compliment_list: long (nullable = true) |-- compliment_more: long (nullable = true) |-- compliment_note: long (nullable = true) |-- compliment_photos: long (nullable = true) |-- compliment_plain: long (nullable = true) |-- compliment_profile: long (nullable = true) |-- compliment_writer: long (nullable = true) |-- cool: long (nullable = true) |-- fans: long (nullable = true) |-- funny: long (nullable = true) |-- name: string (nullable = true) |-- review_count: long (nullable = true) |-- useful: long (nullable = true) |-- user_id: string (nullable = true) |-- yelping_since: timestamp (nullable = true) -------------------------------------------- example: Row(average_stars=4.03, compliment_cool=1, compliment_cute=0, compliment_funny=1, compliment_hot=2, compliment_list=0, compliment_more=0, compliment_note=1, compliment_photos=0, compliment_plain=1, compliment_profile=0, compliment_writer=2, cool=25, fans=5, funny=17, name=u'Rashmi', review_count=95, useful=84, user_id=u'l6BmjZMeQD3rDxWUbiAiow', yelping_since=datetime.datetime(2013, 10, 8, 23, 11, 33))
write_parquet(users_df, users_path)
The field elite
is a comma-separated list of strings masked as a string. Make a separate table out of it.
elite_years_df = user_df.select("user_id", "elite")\
.withColumn("year", F.explode(F.split(F.col("elite"), ",")))\
.where("year != '' and year is not null")\
.select(F.col("user_id"), F.col("year").cast("integer"))
describe(elite_years_df)
-------------------------------------------- count: 224499 -------------------------------------------- nulls: [Row(user_id=0, year=0)] -------------------------------------------- max_str_lengths: {'user_id': Row(max(length(user_id))=22)} -------------------------------------------- schema: root |-- user_id: string (nullable = true) |-- year: integer (nullable = true) -------------------------------------------- example: Row(user_id=u'l6BmjZMeQD3rDxWUbiAiow', year=2015)
write_parquet(elite_years_df, elite_years_path)
Basically the same procedure as elite
to get the table of user relationships. Can take some time.
friends_df = user_df.select("user_id", "friends")\
.withColumn("friend_id", F.explode(F.split(F.col("friends"), ", ")))\
.where("friend_id != '' and friend_id is not null")\
.select(F.col("user_id"), F.col("friend_id"))\
.distinct()
describe(friends_df)
-------------------------------------------- count: 75531114 -------------------------------------------- nulls: [Row(user_id=0, friend_id=0)] -------------------------------------------- max_str_lengths: {'friend_id': Row(max(length(friend_id))=22), 'user_id': Row(max(length(user_id))=22)} -------------------------------------------- schema: root |-- user_id: string (nullable = true) |-- friend_id: string (nullable = true) -------------------------------------------- example: Row(user_id=u'l6BmjZMeQD3rDxWUbiAiow', friend_id=u'wMpFA46lihK8oFns_5p65A')
write_parquet(friends_df, friends_path)
checkin_df = spark.read.json(checkin_path)
describe(checkin_df)
-------------------------------------------- count: 161950 -------------------------------------------- nulls: [Row(business_id=0, date=0)] -------------------------------------------- max_str_lengths: {'date': Row(max(length(date))=3004279), 'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- date: string (nullable = true) -------------------------------------------- example: Row(business_id=u'--1UhMGODdWsrMastO9DZw', date=u'2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')
Basically the same procedure as friends
to get the table of pairs business_id
:ts
.
checkins_df = checkin_df.selectExpr("business_id", "date as ts")\
.withColumn("ts", F.explode(F.split(F.col("ts"), ", ")))\
.where("ts != '' and ts is not null")\
.withColumn("ts", F.to_timestamp("ts"))
describe(checkins_df)
-------------------------------------------- count: 19089148 -------------------------------------------- nulls: [Row(business_id=0, ts=0)] -------------------------------------------- max_str_lengths: {'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- ts: timestamp (nullable = true) -------------------------------------------- example: Row(business_id=u'--1UhMGODdWsrMastO9DZw', ts=datetime.datetime(2016, 4, 26, 19, 49, 16))
write_parquet(checkins_df, checkins_path)
tip_df = spark.read.json(tip_path)
describe(tip_df)
-------------------------------------------- count: 1223094 -------------------------------------------- nulls: [Row(business_id=0, compliment_count=0, date=0, text=0, user_id=0)] -------------------------------------------- max_str_lengths: {'date': Row(max(length(date))=19), 'text': Row(max(length(text))=500), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- compliment_count: long (nullable = true) |-- date: string (nullable = true) |-- text: string (nullable = true) |-- user_id: string (nullable = true) -------------------------------------------- example: Row(business_id=u'VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, date=u'2014-03-27 03:51:24', text=u'Great for watching games, ufc, and whatever else tickles yer fancy', user_id=u'UPw5DWs_b-e2JRBS-t37Ag')
Assign to each record a unique id for convenience.
tips_df = tip_df.withColumnRenamed("date", "ts")\
.withColumn("ts", F.to_timestamp("ts"))\
.withColumn("tip_id", F.monotonically_increasing_id())
describe(tips_df)
-------------------------------------------- count: 1223094 -------------------------------------------- nulls: [Row(business_id=0, compliment_count=0, ts=0, text=0, user_id=0, tip_id=0)] -------------------------------------------- max_str_lengths: {'text': Row(max(length(text))=500), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- compliment_count: long (nullable = true) |-- ts: timestamp (nullable = true) |-- text: string (nullable = true) |-- user_id: string (nullable = true) |-- tip_id: long (nullable = false) -------------------------------------------- example: Row(business_id=u'VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, ts=datetime.datetime(2014, 3, 27, 3, 51, 24), text=u'Great for watching games, ufc, and whatever else tickles yer fancy', user_id=u'UPw5DWs_b-e2JRBS-t37Ag', tip_id=0)
write_parquet(tips_df, tips_path)
photo_df = spark.read.json(photo_path)
describe(photo_df)
-------------------------------------------- count: 200000 -------------------------------------------- nulls: [Row(business_id=0, caption=0, label=0, photo_id=0)] -------------------------------------------- max_str_lengths: {'caption': Row(max(length(caption))=140), 'photo_id': Row(max(length(photo_id))=22), 'business_id': Row(max(length(business_id))=22), 'label': Row(max(length(label))=7)} -------------------------------------------- schema: root |-- business_id: string (nullable = true) |-- caption: string (nullable = true) |-- label: string (nullable = true) |-- photo_id: string (nullable = true) -------------------------------------------- example: Row(business_id=u'rcaPajgKOJC2vo_l3xa42A', caption=u'', label=u'inside', photo_id=u'MllA1nNpcp1kDteVg6OGUw')
Even if we do not store any photos, this table is useful for knowing how many and what kind of photos were taken.
write_parquet(photo_df, photos_path)
Get the names of US cities supported by this dataset and assign to each a city_id
. Requires reading the table cities
.
city_attr_df = spark.read\
.format('csv')\
.option("header", "true")\
.option("delimiter", ",")\
.load(city_attr_path)
describe(city_attr_df)
-------------------------------------------- count: 36 -------------------------------------------- nulls: [Row(City=0, Country=0, Latitude=0, Longitude=0)] -------------------------------------------- max_str_lengths: {'Latitude': Row(max(length(Latitude))=9), 'City': Row(max(length(City))=17), 'Longitude': Row(max(length(Longitude))=11), 'Country': Row(max(length(Country))=13)} -------------------------------------------- schema: root |-- City: string (nullable = true) |-- Country: string (nullable = true) |-- Latitude: string (nullable = true) |-- Longitude: string (nullable = true) -------------------------------------------- example: Row(City=u'Vancouver', Country=u'Canada', Latitude=u'49.24966', Longitude=u'-123.119339')
# We only want the list of US cities
cities = city_attr_df.where("Country = 'United States'")\
.select("City")\
.distinct()\
.rdd.flatMap(lambda x: x)\
.collect()
# The list of cities provided by the weather dataset
cities
[u'Phoenix', u'Dallas', u'San Antonio', u'Philadelphia', u'Los Angeles', u'Indianapolis', u'San Francisco', u'San Diego', u'Nashville', u'Detroit', u'Portland', u'Pittsburgh', u'Chicago', u'Atlanta', u'Las Vegas', u'Seattle', u'Kansas City', u'Saint Louis', u'Minneapolis', u'Houston', u'Jacksonville', u'Albuquerque', u'Miami', u'New York', u'Charlotte', u'Denver', u'Boston']
# Weather dataset doesn't provide us with the respective state codes though
# How do we know whether "Phoenix" is in AZ or TX?
# The most appropriate solution is finding the biggest city
# Let's find out which of those cities are referenced in Yelp dataset and relevant to us
cities_df = spark.read.parquet(cities_path)
cities_df.printSchema()
root |-- city: string (nullable = true) |-- state_code: string (nullable = true) |-- total_population: long (nullable = true) |-- number_of_veterans: long (nullable = true) |-- male_population: long (nullable = true) |-- foreign_born: long (nullable = true) |-- average_household_size: double (nullable = true) |-- median_age: double (nullable = true) |-- state: string (nullable = true) |-- female_population: long (nullable = true) |-- american_indian_and_alaska_native: long (nullable = true) |-- asian: long (nullable = true) |-- black_or_african_american: long (nullable = true) |-- hispanic_or_latino: long (nullable = true) |-- white: long (nullable = true) |-- city_id: long (nullable = true)
cities_df.filter(F.col("city").isin(cities))\
.select("city")\
.distinct()\
.collect()
# Tables "cities" includes 11 cities out of 36 provided by the weather dataset
[Row(city=u'Phoenix'), Row(city=u'Dallas'), Row(city=u'Los Angeles'), Row(city=u'San Diego'), Row(city=u'Pittsburgh'), Row(city=u'Las Vegas'), Row(city=u'Seattle'), Row(city=u'New York'), Row(city=u'Charlotte'), Row(city=u'Denver'), Row(city=u'Boston')]
# Now find their states (using Google or any other API)
weather_cities_df = [
Row(city='Phoenix', state_code='AZ'),
Row(city='Dallas', state_code='TX'),
Row(city='Los Angeles', state_code='CA'),
Row(city='San Diego', state_code='CA'),
Row(city='Pittsburgh', state_code='PA'),
Row(city='Las Vegas', state_code='NV'),
Row(city='Seattle', state_code='WA'),
Row(city='New York', state_code='NY'),
Row(city='Charlotte', state_code='NC'),
Row(city='Denver', state_code='CO'),
Row(city='Boston', state_code='MA')
]
weather_cities_schema = T.StructType([
T.StructField("city", T.StringType()),
T.StructField("state_code", T.StringType())
])
weather_cities_df = spark.createDataFrame(weather_cities_df, schema=weather_cities_schema)
# Join with the cities dataset to find matches
weather_cities_df = cities_df.join(weather_cities_df, ["city", "state_code"])\
.select("city", "city_id")\
.distinct()
Read temperaturs recorded hourly, transform them into daily averages, and filter by our cities. Also, cities are columns, so transform them into rows.
weather_temp_df = spark.read\
.format('csv')\
.option("header", "true")\
.option("delimiter", ",")\
.load(weather_temp_path)
describe(weather_temp_df)
-------------------------------------------- count: 45253 -------------------------------------------- nulls: [Row(datetime=0, Vancouver=795, Portland=1, San Francisco=793, Seattle=3, Los Angeles=3, San Diego=1, Las Vegas=1, Phoenix=3, Albuquerque=1, Denver=1, San Antonio=1, Dallas=4, Houston=3, Kansas City=1, Minneapolis=13, Saint Louis=1, Chicago=3, Nashville=2, Indianapolis=7, Atlanta=6, Detroit=1, Jacksonville=1, Charlotte=3, Miami=805, Pittsburgh=3, Toronto=1, Philadelphia=3, New York=793, Montreal=3, Boston=3, Beersheba=798, Tel Aviv District=793, Eilat=792, Haifa=798, Nahariyya=797, Jerusalem=793)] -------------------------------------------- max_str_lengths: {'Eilat': Row(max(length(Eilat))=13), 'San Diego': Row(max(length(San Diego))=13), 'Chicago': Row(max(length(Chicago))=13), 'Philadelphia': Row(max(length(Philadelphia))=13), 'Denver': Row(max(length(Denver))=13), 'datetime': Row(max(length(datetime))=19), 'Dallas': Row(max(length(Dallas))=13), 'Nahariyya': Row(max(length(Nahariyya))=13), 'Vancouver': Row(max(length(Vancouver))=13), 'San Francisco': Row(max(length(San Francisco))=13), 'Indianapolis': Row(max(length(Indianapolis))=13), 'Phoenix': Row(max(length(Phoenix))=13), 'Pittsburgh': Row(max(length(Pittsburgh))=13), 'Nashville': Row(max(length(Nashville))=13), 'Albuquerque': Row(max(length(Albuquerque))=13), 'New York': Row(max(length(New York))=13), 'Los Angeles': Row(max(length(Los Angeles))=13), 'Atlanta': Row(max(length(Atlanta))=13), 'San Antonio': Row(max(length(San Antonio))=13), 'Toronto': Row(max(length(Toronto))=13), 'Haifa': Row(max(length(Haifa))=13), 'Charlotte': Row(max(length(Charlotte))=13), 'Miami': Row(max(length(Miami))=13), 'Kansas City': Row(max(length(Kansas City))=13), 'Detroit': Row(max(length(Detroit))=13), 'Saint Louis': Row(max(length(Saint Louis))=13), 'Tel Aviv District': Row(max(length(Tel Aviv District))=13), 'Montreal': Row(max(length(Montreal))=13), 'Houston': Row(max(length(Houston))=13), 'Jerusalem': Row(max(length(Jerusalem))=13), 'Boston': Row(max(length(Boston))=13), 'Minneapolis': Row(max(length(Minneapolis))=13), 'Jacksonville': Row(max(length(Jacksonville))=13), 'Beersheba': Row(max(length(Beersheba))=13), 'Las Vegas': Row(max(length(Las Vegas))=13), 'Portland': Row(max(length(Portland))=13), 'Seattle': Row(max(length(Seattle))=13)} -------------------------------------------- schema: root |-- datetime: string (nullable = true) |-- Vancouver: string (nullable = true) |-- Portland: string (nullable = true) |-- San Francisco: string (nullable = true) |-- Seattle: string (nullable = true) |-- Los Angeles: string (nullable = true) |-- San Diego: string (nullable = true) |-- Las Vegas: string (nullable = true) |-- Phoenix: string (nullable = true) |-- Albuquerque: string (nullable = true) |-- Denver: string (nullable = true) |-- San Antonio: string (nullable = true) |-- Dallas: string (nullable = true) |-- Houston: string (nullable = true) |-- Kansas City: string (nullable = true) |-- Minneapolis: string (nullable = true) |-- Saint Louis: string (nullable = true) |-- Chicago: string (nullable = true) |-- Nashville: string (nullable = true) |-- Indianapolis: string (nullable = true) |-- Atlanta: string (nullable = true) |-- Detroit: string (nullable = true) |-- Jacksonville: string (nullable = true) |-- Charlotte: string (nullable = true) |-- Miami: string (nullable = true) |-- Pittsburgh: string (nullable = true) |-- Toronto: string (nullable = true) |-- Philadelphia: string (nullable = true) |-- New York: string (nullable = true) |-- Montreal: string (nullable = true) |-- Boston: string (nullable = true) |-- Beersheba: string (nullable = true) |-- Tel Aviv District: string (nullable = true) |-- Eilat: string (nullable = true) |-- Haifa: string (nullable = true) |-- Nahariyya: string (nullable = true) |-- Jerusalem: string (nullable = true) -------------------------------------------- example: Row(datetime=u'2012-10-01 12:00:00', Vancouver=None, Portland=None, San Francisco=None, Seattle=None, Los Angeles=None, San Diego=None, Las Vegas=None, Phoenix=None, Albuquerque=None, Denver=None, San Antonio=None, Dallas=None, Houston=None, Kansas City=None, Minneapolis=None, Saint Louis=None, Chicago=None, Nashville=None, Indianapolis=None, Atlanta=None, Detroit=None, Jacksonville=None, Charlotte=None, Miami=None, Pittsburgh=None, Toronto=None, Philadelphia=None, New York=None, Montreal=None, Boston=None, Beersheba=None, Tel Aviv District=None, Eilat=u'309.1', Haifa=None, Nahariyya=None, Jerusalem=None)
# Extract date string from time string to be able to group by day
weather_temp_df = weather_temp_df.select("datetime", *cities)\
.withColumn("date", F.substring("datetime", 0, 10))\
.drop("datetime")
weather_temp_df.where("Phoenix is not null").first()
Row(Phoenix=u'296.6', Dallas=u'289.74', San Antonio=u'289.29', Philadelphia=u'285.63', Los Angeles=u'291.87', Indianapolis=u'283.85', San Francisco=u'289.48', San Diego=u'291.53', Nashville=u'287.41', Detroit=u'284.03', Portland=u'282.08', Pittsburgh=u'281.0', Chicago=u'284.01', Atlanta=u'294.03', Las Vegas=u'293.41', Seattle=u'281.8', Kansas City=u'289.98', Saint Louis=u'286.18', Minneapolis=u'286.87', Houston=u'288.27', Jacksonville=u'298.17', Albuquerque=u'285.12', Miami=u'299.72', New York=u'288.22', Charlotte=u'288.65', Denver=u'284.61', Boston=u'287.17', date=u'2012-10-01')
phoenix_rows = weather_temp_df.where("Phoenix is not null and date = '2012-10-01'").select("Phoenix").collect()
phoenix_rows
[Row(Phoenix=u'296.6'), Row(Phoenix=u'296.608508543'), Row(Phoenix=u'296.631487354'), Row(Phoenix=u'296.654466164'), Row(Phoenix=u'296.677444975'), Row(Phoenix=u'296.700423786'), Row(Phoenix=u'296.723402597'), Row(Phoenix=u'296.746381407'), Row(Phoenix=u'296.769360218'), Row(Phoenix=u'296.792339029'), Row(Phoenix=u'296.815317839')]
import numpy as np
# For data quality check
phoenix_mean_temp = np.mean([float(row.Phoenix) for row in phoenix_rows])
phoenix_mean_temp
296.7017392647272
# To transform city columns into rows, transform each city individually and union all dataframes
temp_df = None
for city in cities:
# Get average temperature in Fahrenheit for each day and city
df = weather_temp_df.select("date", city)\
.withColumnRenamed(city, "temperature")\
.withColumn("temperature", F.col("temperature").cast("double"))\
.withColumn("city", F.lit(city))\
.groupBy("date", "city")\
.agg(F.mean("temperature").alias("avg_temperature"))
if temp_df is None:
temp_df = df
else:
temp_df = temp_df.union(df)
weather_temp_df = temp_df
# Speed up further joins
weather_temp_df = weather_temp_df.repartition(1).cache()
weather_temp_df.count()
50949
phoenix_mean_temp2 = weather_temp_df.where("city = 'Phoenix' and date = '2012-10-01'").collect()[0].avg_temperature
assert(phoenix_mean_temp == phoenix_mean_temp2)
# If we pass, the calculations are done correctly
describe(weather_temp_df)
-------------------------------------------- count: 50949 -------------------------------------------- nulls: [Row(date=0, city=0, avg_temperature=99)] -------------------------------------------- max_str_lengths: {'date': Row(max(length(date))=10), 'city': Row(max(length(city))=13)} -------------------------------------------- schema: root |-- date: string (nullable = true) |-- city: string (nullable = false) |-- avg_temperature: double (nullable = true) -------------------------------------------- example: Row(date=u'2012-10-19', city=u'Phoenix', avg_temperature=297.77833333333325)
Read weather descriptions recorded hourly, pick the most frequent one on each day, and filter by our cities.
The same as for temperatures, transform columns into rows.
weather_desc_df = spark.read\
.format('csv')\
.option("header", "true")\
.option("delimiter", ",")\
.load(weather_desc_path)
describe(weather_desc_df)
-------------------------------------------- count: 45253 -------------------------------------------- nulls: [Row(datetime=0, Vancouver=793, Portland=1, San Francisco=793, Seattle=1, Los Angeles=1, San Diego=1, Las Vegas=1, Phoenix=1, Albuquerque=1, Denver=1, San Antonio=1, Dallas=1, Houston=1, Kansas City=1, Minneapolis=1, Saint Louis=1, Chicago=1, Nashville=1, Indianapolis=1, Atlanta=1, Detroit=1, Jacksonville=1, Charlotte=1, Miami=793, Pittsburgh=1, Toronto=1, Philadelphia=1, New York=793, Montreal=1, Boston=1, Beersheba=793, Tel Aviv District=793, Eilat=792, Haifa=793, Nahariyya=793, Jerusalem=793)] -------------------------------------------- max_str_lengths: {'Eilat': Row(max(length(Eilat))=28), 'San Diego': Row(max(length(San Diego))=28), 'Chicago': Row(max(length(Chicago))=35), 'Philadelphia': Row(max(length(Philadelphia))=28), 'Denver': Row(max(length(Denver))=32), 'datetime': Row(max(length(datetime))=19), 'Dallas': Row(max(length(Dallas))=32), 'Nahariyya': Row(max(length(Nahariyya))=28), 'Vancouver': Row(max(length(Vancouver))=28), 'San Francisco': Row(max(length(San Francisco))=32), 'Indianapolis': Row(max(length(Indianapolis))=32), 'Phoenix': Row(max(length(Phoenix))=28), 'Pittsburgh': Row(max(length(Pittsburgh))=32), 'Nashville': Row(max(length(Nashville))=32), 'Albuquerque': Row(max(length(Albuquerque))=35), 'New York': Row(max(length(New York))=35), 'Los Angeles': Row(max(length(Los Angeles))=28), 'Atlanta': Row(max(length(Atlanta))=32), 'San Antonio': Row(max(length(San Antonio))=35), 'Toronto': Row(max(length(Toronto))=28), 'Haifa': Row(max(length(Haifa))=28), 'Charlotte': Row(max(length(Charlotte))=32), 'Miami': Row(max(length(Miami))=28), 'Kansas City': Row(max(length(Kansas City))=28), 'Detroit': Row(max(length(Detroit))=28), 'Saint Louis': Row(max(length(Saint Louis))=32), 'Tel Aviv District': Row(max(length(Tel Aviv District))=28), 'Montreal': Row(max(length(Montreal))=28), 'Houston': Row(max(length(Houston))=32), 'Jerusalem': Row(max(length(Jerusalem))=28), 'Boston': Row(max(length(Boston))=28), 'Minneapolis': Row(max(length(Minneapolis))=35), 'Jacksonville': Row(max(length(Jacksonville))=28), 'Beersheba': Row(max(length(Beersheba))=20), 'Las Vegas': Row(max(length(Las Vegas))=28), 'Portland': Row(max(length(Portland))=28), 'Seattle': Row(max(length(Seattle))=28)} -------------------------------------------- schema: root |-- datetime: string (nullable = true) |-- Vancouver: string (nullable = true) |-- Portland: string (nullable = true) |-- San Francisco: string (nullable = true) |-- Seattle: string (nullable = true) |-- Los Angeles: string (nullable = true) |-- San Diego: string (nullable = true) |-- Las Vegas: string (nullable = true) |-- Phoenix: string (nullable = true) |-- Albuquerque: string (nullable = true) |-- Denver: string (nullable = true) |-- San Antonio: string (nullable = true) |-- Dallas: string (nullable = true) |-- Houston: string (nullable = true) |-- Kansas City: string (nullable = true) |-- Minneapolis: string (nullable = true) |-- Saint Louis: string (nullable = true) |-- Chicago: string (nullable = true) |-- Nashville: string (nullable = true) |-- Indianapolis: string (nullable = true) |-- Atlanta: string (nullable = true) |-- Detroit: string (nullable = true) |-- Jacksonville: string (nullable = true) |-- Charlotte: string (nullable = true) |-- Miami: string (nullable = true) |-- Pittsburgh: string (nullable = true) |-- Toronto: string (nullable = true) |-- Philadelphia: string (nullable = true) |-- New York: string (nullable = true) |-- Montreal: string (nullable = true) |-- Boston: string (nullable = true) |-- Beersheba: string (nullable = true) |-- Tel Aviv District: string (nullable = true) |-- Eilat: string (nullable = true) |-- Haifa: string (nullable = true) |-- Nahariyya: string (nullable = true) |-- Jerusalem: string (nullable = true) -------------------------------------------- example: Row(datetime=u'2012-10-01 12:00:00', Vancouver=None, Portland=None, San Francisco=None, Seattle=None, Los Angeles=None, San Diego=None, Las Vegas=None, Phoenix=None, Albuquerque=None, Denver=None, San Antonio=None, Dallas=None, Houston=None, Kansas City=None, Minneapolis=None, Saint Louis=None, Chicago=None, Nashville=None, Indianapolis=None, Atlanta=None, Detroit=None, Jacksonville=None, Charlotte=None, Miami=None, Pittsburgh=None, Toronto=None, Philadelphia=None, New York=None, Montreal=None, Boston=None, Beersheba=None, Tel Aviv District=None, Eilat=u'haze', Haifa=None, Nahariyya=None, Jerusalem=None)
# Extract date string from time string to be able to group by day
weather_desc_df = weather_desc_df.select("datetime", *cities)\
.withColumn("date", F.substring("datetime", 0, 10))\
.drop("datetime")
weather_desc_df.where("Phoenix is not null").first()
Row(Phoenix=u'sky is clear', Dallas=u'mist', San Antonio=u'sky is clear', Philadelphia=u'broken clouds', Los Angeles=u'mist', Indianapolis=u'overcast clouds', San Francisco=u'light rain', San Diego=u'sky is clear', Nashville=u'mist', Detroit=u'sky is clear', Portland=u'scattered clouds', Pittsburgh=u'mist', Chicago=u'overcast clouds', Atlanta=u'light rain', Las Vegas=u'sky is clear', Seattle=u'sky is clear', Kansas City=u'sky is clear', Saint Louis=u'sky is clear', Minneapolis=u'broken clouds', Houston=u'sky is clear', Jacksonville=u'scattered clouds', Albuquerque=u'sky is clear', Miami=u'light intensity drizzle', New York=u'few clouds', Charlotte=u'mist', Denver=u'light rain', Boston=u'sky is clear', date=u'2012-10-01')
phoenix_rows = weather_desc_df.where("Phoenix is not null and date = '2012-12-10'").select("Phoenix").collect()
phoenix_rows
[Row(Phoenix=u'few clouds'), Row(Phoenix=u'scattered clouds'), Row(Phoenix=u'scattered clouds'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'sky is clear')]
from collections import Counter
# For data quality check
phoenix_most_common_weather = Counter([row.Phoenix for row in phoenix_rows]).most_common()[0][0]
phoenix_most_common_weather
u'sky is clear'
# To transform city columns into rows, transform each city individually and union all dataframes
temp_df = None
for city in cities:
# Get the most frequent description for each day and city
window = Window.partitionBy("date", "city").orderBy(F.desc("count"))
df = weather_desc_df.select("date", city)\
.withColumnRenamed(city, "weather_description")\
.withColumn("city", F.lit(city))\
.groupBy("date", "city", "weather_description")\
.count()\
.withColumn("order", F.row_number().over(window))\
.where(F.col("order") == 1)\
.drop("count", "order")
if temp_df is None:
temp_df = df
else:
temp_df = temp_df.union(df)
weather_desc_df = temp_df
# Speed up further joins
weather_desc_df = weather_desc_df.repartition(1).cache()
weather_desc_df.count()
50949
phoenix_most_common_weather2 = weather_desc_df.where("city = 'Phoenix' and date = '2012-12-10'").collect()[0].weather_description
assert(phoenix_most_common_weather == phoenix_most_common_weather2)
# If we pass, the calculations are done correctly
describe(weather_desc_df)
-------------------------------------------- count: 50949 -------------------------------------------- nulls: [Row(date=0, city=0, weather_description=102)] -------------------------------------------- max_str_lengths: {'date': Row(max(length(date))=10), 'city': Row(max(length(city))=13), 'weather_description': Row(max(length(weather_description))=23)} -------------------------------------------- schema: root |-- date: string (nullable = true) |-- city: string (nullable = false) |-- weather_description: string (nullable = true) -------------------------------------------- example: Row(date=u'2013-09-10', city=u'Phoenix', weather_description=u'light rain')
# What was the weather in the city when the particular review was posted?
# Join weather description with temperature, and keep only city ids which are present in Yelp
city_weather_df = weather_temp_df.join(weather_desc_df, ["city", "date"])\
.join(weather_cities_df, "city")\
.drop("city")\
.distinct()\
.withColumn("date", F.to_date("date"))
describe(city_weather_df)
-------------------------------------------- count: 15096 -------------------------------------------- nulls: [Row(date=0, avg_temperature=33, weather_description=34, city_id=0)] -------------------------------------------- max_str_lengths: {'weather_description': Row(max(length(weather_description))=23)} -------------------------------------------- schema: root |-- date: date (nullable = true) |-- avg_temperature: double (nullable = true) |-- weather_description: string (nullable = true) |-- city_id: long (nullable = true) -------------------------------------------- example: Row(date=datetime.date(2013, 11, 23), avg_temperature=285.16625, weather_description=u'sky is clear', city_id=146028888064)
write_parquet(city_weather_df, city_weather_path)