##########################################################################################
def csv_to_df(input_filename):
df = pd.read_csv(input_filename, usecols=['D', 'M', 'Y', 'CNT'])
df[['D', 'M', 'Y']] = df[['D', 'M', 'Y']].astype(str)
df['Date'] = df['D'] + '-' + df['M'] + '-' + df['Y']
df = df[['Date', 'CNT']]
df['Date'] = pd.to_datetime(df['Date'], format='%d-%m-%Y')
df.sort_values(by='Date', inplace=True)
df['WDN'] = df['Date'].apply(lambda x: pd.Timestamp.weekday(x))
df.set_index('Date', inplace=True)
return df
##########################################################################################
# Weekend start/end: 0 - Sunday, 5- Friday
def ws(df):
wknd_start = df[df['WDN'] == 5].index
wknd_end = df[df['WDN'] == 0].index
return zip(wknd_start, wknd_end)
###########################################################################################
# Highlighted weekends
def highlight_wknds(w, wknds_highlight_color):
shapes_list = [dict(
type='rect',
xref='x',
yref='paper',
x0=u[0],
y0=0,
x1=u[1],
y1=1,
fillcolor=wknds_highlight_color,
opacity=0.5,
layer='below',
line_width=0,) for u in w]
return shapes_list
###########################################################################################
# Plot TS and highligt weekends
def plot_ts(df, title_text):
w = ws(df)
shapes_list = highlight_wknds(w, wknds_highlight_color='lightskyblue')
# Plot TS, add highlighted weekends
fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['CNT'], line_color='blue'))
fig.update_layout(shapes=shapes_list, autosize=False,
width=1800, height=450)
fig.update_xaxes(
tickangle=-90,
title_text="Date",
title_font={"size": 12},
title_standoff=25,
nticks=df.shape[0])
fig.update_yaxes(
title_text=title_text,
title_font={"size": 12},
title_standoff=25)
fig.update_layout(xaxis_rangeslider_visible=True)
fig.show()
###########################################################################################
# Plot ACF
def plot_acf(df, alpha):
nlags = df.shape[0] - 1
acf, confint = stattools.acf(
df['CNT'].values, nlags=nlags, qstat=False, fft=False,
alpha=alpha, missing='none')
# Y-coordinates for highlighting CI zone
y_pos = [abs(i - j)/2 if (i >= 0 and j >= 0)
else (abs(i) + abs(j))/2 for i, j in confint[1:]]
lags = np.arange(0, df.shape[0], 1, dtype=int)
fig = go.Figure()
fig.update_layout(width=900, height=450)
fig.update_xaxes(
tickangle=-90,
title_text="Lag",
title_font={"size": 12},
title_standoff=25,
nticks=32)
fig.update_yaxes(
title_text="Autocorrelation",
title_font={"size": 12},
title_standoff=25,
nticks=10)
fig.add_trace(go.Scatter(x=lags[1:], y=y_pos,
fill='tozeroy',
fillcolor='rgba(135, 206, 250, 0.5)',
line_color='rgba(135, 206, 250, 0.5)',
showlegend=False,
name='CI'
))
fig.add_trace(go.Scatter(x=lags[1:], y=[-i for i in y_pos],
fill='tozeroy',
fillcolor='rgba(135, 206, 250, 0.5)',
line_color='rgba(135, 206, 250, 0.5)',
showlegend=False,
name='CI'))
fig.add_trace(go.Scatter(x=lags, y=acf,
mode='markers', line_color='blue',
name='Autocorr'))
for X0, Y0, Y1 in zip(lags, np.array([0]*len(acf)), acf):
fig.add_shape(type="line",
x0=X0, y0=Y0, x1=X0, y1=Y1,
line=dict(color="Blue", width=1.5))
# Add zero line
fig.add_shape(type="line",
x0=0, y0=0, x1=len(lags), y1=0,
line=dict(color="Blue", width=1))
fig.show()
###########################################################################################
# Plot TS decomposition
def plot_ts_decomp(df, model, period):
subplot_titles = ["Observed", "Trend", "Seasonality", "Residuals"]
d = seasonal_decompose(df['CNT'],
model=model, period=period)
l = [d.observed, d.trend, d.seasonal, d.resid]
fig = make_subplots(rows=len(l), cols=1,
subplot_titles=subplot_titles)
for r, (k, v) in enumerate(zip(subplot_titles, l), start=1):
x = v.index[np.logical_not(
np.isnan(v.values))]
y = v.values[np.logical_not(
np.isnan(v.values))]
fig.append_trace(go.Scatter(
x=x,
y=y,
name=k), row=r, col=1)
fig.update_layout(height=1000, width=750,
title_text="Time series decomposition")
fig.show()
###########################################################################################
# ADF test
def calc_adf(x, a):
ar = stattools.adfuller(x, autolag=a)
res = ['ADF', 'p-value', 'Used Lag', 'Nobs', 'Critical Values',
'ICbest']
d = dict(zip(res, ar))
print('\n'.join([f'{i}: {j}' if i != 'Critical Values'
else f'{i}:\n'+'\n'.join([f' {p}: {q}'
for p, q in d[i].items()])
for i, j in d.items()]))
###########################################################################################
# KPSS test
def calc_kpss(x, n):
kp = stattools.kpss(x, nlags=n)
res = ['statistic', 'p_value', 'n_lags', 'critical_values']
d = dict(zip(res, kp))
print('\n'.join([f'{i}: {j}' if i != 'critical_values'
else 'Critical Values:\n'+'\n'.join([f' {p}: {q}'
for p, q in d[i].items()])
for i, j in d.items()]))
###########################################################################################
# Histogram
def plot_msgsz(df, x_scale):
if ('MN' not in df.columns) or ('MSGSZ'not in df.columns):
raise Exception('Wrong DF!')
else:
months = [m for m in df['MN'].unique()]
colors = ['blue', 'yellow', 'red', 'green', 'magenta']
scale = ['lin', 'log10']
fig = go.Figure()
if x_scale not in scale:
raise Exception('Wrong x-scale!')
d = dict(zip(months, colors))
for m, c in d.items():
x = [np.log10(y) if (y > 0) & (x_scale == 'log10') else 0
if (y <= 0) & (x_scale == 'log10') else y
if x_scale == 'lin' else -1
for y in df[df['MN'] == m]['MSGSZ'].values]
fig.add_trace(go.Histogram(
x=x,
name=m,
marker_color=c))
# Overlay the histograms
fig.update_layout(barmode='overlay',
bargap=0.2,
bargroupgap=0.1)
# Reduce opacity to see all the histograms
fig.update_traces(opacity=0.85)
fig.show()
###########################################################################################
# Boxplot
def plot_box(df):
fig = go.Figure()
fig.update_layout(width=1200, height=900)
for m in df_msgsz[['M', 'MN']].sort_values(by='M')['MN'].unique():
fig.add_trace(
go.Box(y=df_msgsz[df_msgsz['MN'] == m]['MSGSZ'],
name=m,
boxpoints='all',
jitter=0.5,
pointpos=-1.8))
fig.show()
###########################################################################################