# notebook parameters
import os
import sys
# replace this with your notebook working directory
sys.path.append("/root/telco-churn-augmentation/")
spark_master = "yarn"
app_name = "churn-analytics"
input_prefix = ""
input_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "parquet"
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'
import pyspark
session = pyspark.sql.SparkSession.builder \
.master(spark_master) \
.appName(app_name) \
.config("spark.eventLog.enabled", True) \
.config("spark.eventLog.dir", ".") \
.config("spark.driver.memory", driver_memory) \
.config("spark.executor.memory", executor_memory) \
.config("spark.rapids.memory.pinnedPool.size", "2G") \
.config("spark.sql.shuffle.partitions", 16) \
.config("spark.sql.files.maxPartitionBytes", "4096MB") \
.config("spark.rapids.sql.enabled", True) \
.config("spark.executor.cores", 4) \
.config("spark.task.cpus", 1) \
.config("spark.rapids.sql.concurrentGpuTasks", 2) \
.config("spark.task.resource.gpu.amount", .5) \
.config("spark.rapids.sql.variableFloatAgg.enabled", True) \
.config("spark.rapids.sql.explain", "NOT_ON_GPU") \
.config("spark.rapids.sql.decimalType.enabled", "True") \
.getOrCreate()
session
import churn.eda
import churn.etl
churn.etl.register_options(
output_prefix = output_prefix,
output_mode = output_mode,
output_kind = output_kind,
input_kind = input_kind
)
df = churn.etl.read_df(session, input_prefix + input_file)
%%time
summary = churn.eda.gen_summary(df)
session.catalog.listTables()
grouped_by_quarters = session.table("cube_3").select("3_month_spans", "Contract", "PaperlessBilling", "Churn", "Count").toPandas()
grouped_by_quarters = grouped_by_quarters.rename(columns = {'3_month_spans' : 'tenure_in_quarters'})
import altair as alt
alt.data_transformers.enable('json')
alt.Chart(grouped_by_quarters.dropna()).mark_bar().encode(
x = 'tenure_in_quarters:O',
y = 'sum(Count):Q',
color = 'Churn:N',
column = 'Contract:N'
)
import altair as alt
alt.data_transformers.enable('json')
alt.Chart(grouped_by_quarters.dropna()).mark_bar().encode(
x = 'tenure_in_quarters:O',
y = 'sum(Count):Q',
column = 'Churn:N',
color = 'Contract:N'
)
# session.stop()