'created' then did 'anything' - Macintosh
%matplotlib inline
/home/hadoop/anaconda2/lib/python2.7/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment. warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')
COHORT_QUERY = "SELECT C4 FROM %s WHERE C5 = 'account.created' AND C3 = 'Macintosh'"
REST_QUERY = "SELECT C4 FROM %s"
TITLE = "'created' then did 'anything' - Macintosh"
EVENT_STORAGE = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/fxa-retention/data/"
import os
from datetime import date, timedelta, datetime
import pandas
IN_IPYTHON = True
try:
__IPYTHON__
except NameError:
IN_IPYTHON = False
today = date.today()
# today = datetime.strptime('2015-11-08', '%Y-%m-%d').date()
last_monday = today - timedelta(days=-today.weekday(), weeks=2)
WEEK_RANGE = pandas.date_range(end=last_monday, periods=15, freq='W-MON')
WEEKS = WEEK_RANGE.map(lambda x: x.strftime('%Y-%m-%d'))
if not IN_IPYTHON:
EVENT_STORAGE = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', 'tools', 'out')
OUT_DATA = []
VOLUME_DATA = []
for v in range(0, len(WEEKS)):
VOLUME_DATA.append([0] * len(WEEKS))
for x in range(0, len(WEEKS)):
OUT_DATA.append([0] * len(WEEKS))
def week_file(storage, week):
return os.path.join(storage, 'events-' + week + '.csv')
import pandas, os, sys
# Only initialize Spark if testing locally
# Otherwise it should be already running within Spark
try:
from pyspark import SparkContext
except ImportError:
import dev_env
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
IN_IPYTHON = True
try:
__IPYTHON__
except NameError:
IN_IPYTHON = False
sc = SparkContext('local')
sys.path.append(os.path.realpath(os.curdir))
from ipynb_generators.parts.weeks import WEEKS, WEEK_RANGE, OUT_DATA, VOLUME_DATA, EVENT_STORAGE, week_file
COHORT_QUERY = "SELECT C4 FROM %s WHERE C5 = 'account.created'"
REST_QUERY = "SELECT C4 FROM %s"
print "Not in IPython, creating SparkContext manually, using fake data!"
# sc will be global in IPython
sqlContext = SQLContext(sc)
for x in range(0, len(WEEKS)):
saved_uids = None
saved_uids_count = None
idx = 0
for week in WEEKS[x:]:
df = sqlContext.load(source='com.databricks.spark.csv', header='false', path=week_file(EVENT_STORAGE, week))
table_name = 'week' + week.replace('-', '_')
df.registerTempTable(table_name)
if not saved_uids:
cohort_events = sqlContext.sql(COHORT_QUERY % table_name)
new_uids = cohort_events.map(lambda p: p.C4).distinct()
saved_uids = new_uids
saved_uids_count = int(new_uids.count())
VOLUME_DATA[x][idx] = saved_uids_count
OUT_DATA[x][idx] = 100
else:
secondary_events = sqlContext.sql(REST_QUERY % table_name)
new_uids_created_events = secondary_events.map(lambda p: p.C4).distinct()
retention_uids = saved_uids.intersection(new_uids_created_events)
if saved_uids_count > 0:
percentage = int((float(retention_uids.count()) / float(saved_uids_count)) * 100)
else:
percentage = 0
OUT_DATA[x][idx] = percentage
VOLUME_DATA[x][idx] = int(retention_uids.count())
idx += 1
DATA_FRAME = pandas.DataFrame(OUT_DATA, index=WEEK_RANGE, columns=range(0, len(WEEKS)))
VOLUME_DATA_FRAME = pandas.DataFrame(VOLUME_DATA, index=WEEK_RANGE, columns=range(0, len(WEEKS)))
if not IN_IPYTHON:
print DATA_FRAME
print VOLUME_DATA_FRAME
/usr/lib/spark/python/pyspark/sql/context.py:535: UserWarning: load is deprecated. Use read.load() instead. warnings.warn("load is deprecated. Use read.load() instead.")
Py4JJavaErrorTraceback (most recent call last) <ipython-input-7-559a2f99de8e> in <module>() 32 idx = 0 33 for week in WEEKS[x:]: ---> 34 df = sqlContext.load(source='com.databricks.spark.csv', header='false', path=week_file(EVENT_STORAGE, week)) 35 table_name = 'week' + week.replace('-', '_') 36 df.registerTempTable(table_name) /usr/lib/spark/python/pyspark/sql/context.py in load(self, path, source, schema, **options) 534 """ 535 warnings.warn("load is deprecated. Use read.load() instead.") --> 536 return self.read.load(path, source, schema, **options) 537 538 @since(1.3) /usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options) 135 self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) 136 else: --> 137 return self._df(self._jreader.load(path)) 138 else: 139 return self._df(self._jreader.load()) /usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value( --> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 43 def deco(*a, **kw): 44 try: ---> 45 return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e: 47 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 306 raise Py4JJavaError( 307 "An error occurred while calling {0}{1}{2}.\n". --> 308 format(target_id, ".", name), value) 309 else: 310 raise Py4JError( Py4JJavaError: An error occurred while calling o46.load. : java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:102) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.databricks.spark.csv.DefaultSource at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62) at scala.util.Try.orElse(Try.scala:82) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62) ... 14 more
import seaborn
import matplotlib.pyplot as plt
IN_IPYTHON = True
try:
__IPYTHON__
except NameError:
IN_IPYTHON = False
if IN_IPYTHON:
if "DATA_FRAME" in locals():
seaborn.set(style='white')
plt.figure(figsize=(16, 12))
plt.title(TITLE, { 'fontsize': 26 })
seaborn.heatmap(DATA_FRAME, annot=True, fmt='d', yticklabels=WEEKS, xticklabels=range(0, len(WEEKS)))
# Rotate labels
locs, labels = plt.yticks()
plt.setp(labels, rotation=0)
# Set axis font
font = {
'weight': 'bold',
'size': 22
}
# Label axis
plt.ylabel('Cohort Starting Week', **font)
plt.xlabel('Retention Weeks', **font)
print VOLUME_DATA_FRAME
if "WEEKLY_DATA" in locals():
print WEEKLY_DATA
fig = plt.figure(figsize=(16, 12))
plt.title(TITLE, { 'fontsize': 26 })
for item in WEEKLY_DATA:
val = WEEKLY_DATA[item]
plt.plot(val)
ax = fig.add_subplot(111)
for x,y in zip(range(15), val):
ax.annotate('%s' %y, xy=(x,y), size=15)
plt.ylabel('Percentage')
plt.setp(plt.gca().get_xticklabels(), rotation=45, horizontalalignment='right')
plt.xticks(range(15), WEEKS)
plt.legend(WEEKLY_DATA.keys(), loc='lower left', prop={'size': 16})
plt.ylim(-10, 110)
plt.show()
/home/hadoop/anaconda2/lib/python2.7/site-packages/matplotlib/__init__.py:878: UserWarning: axes.color_cycle is deprecated and replaced with axes.prop_cycle; please use the latter. warnings.warn(self.msg_depr % (key, alt_key))