The "raw" data is divided among the following tables:
We want to join these together to reconstitute a training data set with this schema:
# notebook parameters
import os
spark_master = "local[*]"
app_name = "churn-etl"
input_files = dict(
billing="billing_events",
account_features="customer_account_features",
internet_features="customer_internet_features",
meta="customer_meta",
phone_features="customer_phone_features"
)
output_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "parquet"
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'
import pyspark
session = pyspark.sql.SparkSession.builder \
.master(spark_master) \
.appName(app_name) \
.config("spark.eventLog.enabled", True) \
.config("spark.eventLog.dir", ".") \
.config("spark.driver.memory", driver_memory) \
.config("spark.executor.memory", executor_memory) \
.getOrCreate()
session
import churn.etl
churn.etl.register_options(
spark_master = spark_master,
app_name = app_name,
input_files = input_files,
output_prefix = output_prefix,
output_mode = output_mode,
output_kind = output_kind,
input_kind = input_kind,
driver_memory = driver_memory,
executor_memory = executor_memory
)
from churn.etl import read_df
billing_events = read_df(session, input_files["billing"])
billing_events.printSchema()
from churn.etl import join_billing_data
customer_billing = join_billing_data(billing_events)
customer_billing
When we aggregated billing data, we also captured a unique list of customers in a temporary view. For convenience, we can access it as follows:
from churn.etl import customers as get_customers
customers = get_customers()
phone_features = read_df(session, input_files["phone_features"])
phone_features.printSchema()
from churn.etl import join_phone_features
customer_phone_features = join_phone_features(phone_features)
Whereas phone features only include whether or not there are multiple lines, there are several internet-specific features in accounts:
InternetService
(one of Fiber optic
or DSL
in the "raw" data; its absence translates to No
in the processed data)OnlineSecurity
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)OnlineBackup
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)DeviceProtection
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)TechSupport
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)StreamingTV
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)StreamingMovies
(Yes
in the "raw" data if present; one of No
, Yes
, or No internet service
in the processed data)This will lead to some slightly more interesting joins!
internet_features = read_df(session, input_files["internet_features"])
internet_features.printSchema()
internet_features.show()
from churn.etl import join_internet_features
customer_internet_features = join_internet_features(internet_features)
account_features = read_df(session, input_files["account_features"])
account_features.printSchema()
account_features.show()
from churn.etl import join_account_features
customer_account_features = join_account_features(account_features)
account_meta = read_df(session, input_files["meta"])
account_meta.printSchema()
from churn.etl import process_account_meta
customer_account_meta = process_account_meta(account_meta)
from churn.etl import chained_join
from churn.etl import forcefloat
wide_data = chained_join(
"customerID",
customers,
[
customer_billing,
customer_phone_features,
customer_internet_features,
customer_account_features,
customer_account_meta
]
).select(
"customerID",
"gender",
"SeniorCitizen",
"Partner",
"Dependents",
"tenure",
"PhoneService",
"MultipleLines",
"InternetService",
"OnlineSecurity",
"OnlineBackup",
"DeviceProtection",
"TechSupport",
"StreamingTV",
"StreamingMovies",
"Contract",
"PaperlessBilling",
"PaymentMethod",
forcefloat("MonthlyCharges"),
forcefloat("TotalCharges"),
"Churn"
)
wide_data.explain()
%%time
from churn.etl import write_df
write_df(wide_data, output_file)
If we need to inspect individual components of our processing, we can. Each constituent of these joins is registered as a temporary view. For example, we loaded customers
earlier using a method from churn.etl
, but it is also available as a table:
customers = session.table("customers")
customers.show()
We can see which tables are available by querying the session catalog:
tables = session.catalog.listTables()
[t.name for t in tables]
session.stop()