from pyspark.sql.functions import col
from requests import get as fetch
from datetime import datetime as dt, timedelta
from pyspark.sql.functions import *
from moztelemetry import standards
from pyspark.sql import Row
from collections import OrderedDict
%pylab inline
Unable to parse whitelist (/home/hadoop/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram-whitelists.json). Assuming all histograms are acceptable.
/home/hadoop/anaconda2/lib/python2.7/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment. warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')
Populating the interactive namespace from numpy and matplotlib
WARNING: pylab import has clobbered these variables: ['rand', 'cbrt', 'rank', 'cosh', 'hypot', 'array', 'tan', 'size', 'isnan', 'randn', 'log', 'floor', 'sum', 'sqrt', 'split', 'rint', 'log10', 'sin', 'repeat', 'log2', 'cos', 'ceil', 'broadcast', 'sinh', 'trunc', 'expm1', 'tanh', 'exp', 'log1p', 'mean'] `%matplotlib` prevents importing * from pylab and numpy
sc.defaultParallelism
608
from pyspark.sql import SQLContext
from pyspark.sql.types import *
bucket = "telemetry-parquet"
prefix = "addons/v1"
%time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")
CPU times: user 12 ms, sys: 0 ns, total: 12 ms Wall time: 24.9 s
def fmt(the_date, date_format="%Y%m%d"):
return dt.strftime(the_date, date_format)
def should_be_updated(target_day, test, recalc_window):
if fmt(yesterday - timedelta(recalc_window)) >= target_day:
return False
elif (test != "@testpilot-addon") and (test_data[test] > target_day): #strcompare, but who cares?
return False
return True
#http://stackoverflow.com/questions/37584077/convert-a-standard-python-key-value-dictionary-list-to-pyspark-data-frame
def convert_to_row(d):
return Row(**OrderedDict(sorted(d.items())))
def dau(df, target_date):
df = df.filter(df.submission_date_s3 == target_date)
return df.select("client_id").distinct().count()
def mau(df, target_date, past_days=28, date_format="%Y%m%d"):
target_day_date = dt.strptime(target_date, date_format)
min_submission_date = target_day_date - timedelta(past_days)
min_submission_date = dt.strftime(min_submission_date, date_format)
max_submission_date = target_day_date
max_submission_date = dt.strftime(max_submission_date, date_format)
df = df.filter(df.submission_date_s3 >= min_submission_date).\
filter(df.submission_date_s3 <= max_submission_date)
return df.select("client_id").distinct().count()
#for error-checking pings that come in after the submission_date
def record_tp_daily_count(df, target_date):
df = df.filter(df.submission_date_s3 == target_date)
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_dau_clients/v1/collection_date="+fmt(dt.utcnow())
df.select("client_id", "submission_date_s3").distinct().df.repartition(10).write.format("parquet").mode("overwrite").save(s3_output)
r = fetch('https://testpilot.firefox.com/api/experiments?format=json')
# time math messy. fix later if important
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']}
active_tests = test_data.keys()
recalc_window=30
today = dt.utcnow() - timedelta(1)
yesterday = today - timedelta(1) #latest day we're retrieving data for
start_date = fmt(yesterday - timedelta(recalc_window-1))
end_date = fmt(yesterday)
dataset.printSchema()
root |-- document_id: string (nullable = false) |-- client_id: string (nullable = false) |-- addon_id: string (nullable = true) |-- blocklisted: boolean (nullable = true) |-- name: string (nullable = true) |-- user_disabled: boolean (nullable = true) |-- app_disabled: boolean (nullable = true) |-- version: string (nullable = true) |-- scope: integer (nullable = true) |-- type: string (nullable = true) |-- foreign_install: boolean (nullable = true) |-- has_binary_components: boolean (nullable = true) |-- install_day: integer (nullable = true) |-- update_day: integer (nullable = true) |-- signed_state: integer (nullable = true) |-- is_system: boolean (nullable = true) |-- submission_date_s3: string (nullable = true) |-- sample_id: string (nullable = true)
dataset = dataset.filter(dataset.submission_date_s3 >= start_date).\
filter(dataset.submission_date_s3 <= end_date)
dataset.first()
Row(document_id=u'594a9c85-923a-4d6d-9bad-62f1af76b512', client_id=u'da0ba608-0c0f-4cbe-bd8b-def610de700d', addon_id=u'e10srollout@mozilla.org', blocklisted=False, name=u'Multi-process staged rollout', user_disabled=False, app_disabled=False, version=u'1.2', scope=1, type=u'extension', foreign_install=False, has_binary_components=False, install_day=16925, update_day=17068, signed_state=None, is_system=True, submission_date_s3=u'20160924', sample_id=u'71')
#only looking at txp addons; optimization for speed
dataset = dataset.where(col("addon_id").isin(active_tests + ["@testpilot-addon"]))
tp_installed = dataset.filter(dataset.addon_id == "@testpilot-addon").select("client_id", "addon_id")
test_installed = dataset.where(col("addon_id").isin(active_tests))
active_tp_users = tp_installed.join(test_installed, test_installed.client_id == tp_installed.client_id).drop(tp_installed.client_id).drop(tp_installed.addon_id)
active_tp_users.filter(active_tp_users.submission_date_s3 == "20161003").select('client_id').distinct().count()
44743
first_date = dt.strptime(start_date, "%Y%m%d")
date_format="%Y%m%d"
records = []
target_dt = yesterday #always calc previous day
while target_dt >= first_date: #check if test is active in should_be_updated
target_day = fmt(target_dt)
for test in active_tests:
if should_be_updated(target_day, test, recalc_window):
print "We should update data for {t} on {d}".format(t=test, d=target_day)
record = {"test": test, "day": target_day, "generated_on": fmt(dt.utcnow(), date_format)}
record["dau"] = dau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day)
print "dau: ", record["dau"]
record["mau"] = mau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day)
print "mau: ", record["mau"]
records.append(record)
print "We should update data for Test Pilot on {d}".format(t=test, d=target_day)
record = {"test": "testpilot", "day": target_day, "generated_on": fmt(dt.utcnow(), date_format)}
record["dau"] = dau(active_tp_users, target_day)
print "dau: ", record["dau"]
record["mau"] = mau(active_tp_users, target_day)
print "mau: ", record["mau"]
records.append(record)
record_tp_daily_count(active_tp_users, target_day)
target_dt -= timedelta(1)
We should update data for jid1-NeEaf3sAHdKHPA@jetpack on 20161003 dau: 13567 mau: 17210 We should update data for blok@mozilla.org on 20161003 dau: 14346 mau: 18186 We should update data for @min-vid on 20161003 dau: 14187 mau: 18976 We should update data for @activity-streams on 20161003 dau: 14920 mau: 26517 We should update data for tabcentertest1@mozilla.com on 20161003 dau: 10747 mau: 21115 We should update data for universal-search@mozilla.com on 20161003 dau: 15369 mau: 25561 We should update data for wayback_machine@mozilla.org on 20161003 dau: 15932 mau: 23362 We should update data for Test Pilot on 20161003 dau: 44743 mau: 67595
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-14-ffa1a5ecb114> in <module>() 22 print "mau: ", record["mau"] 23 records.append(record) ---> 24 record_tp_daily_count(active_tp_users, target_day) 25 26 target_dt -= timedelta(1) <ipython-input-4-0a992096a903> in record_tp_daily_count(df, target_date) 32 df = df.filter(df.submission_date_s3 == target_date) 33 s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_dau_clients/v1/collection_date="+fmt(dt.utcnow()) ---> 34 df.select("client_id", "submission_date_s3").distinct().df.repartition(10).write.format("parquet").mode("overwrite").save(s3_output) /usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name) 840 if name not in self.columns: 841 raise AttributeError( --> 842 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) 843 jc = self._jdf.apply(name) 844 return Column(jc) AttributeError: 'DataFrame' object has no attribute 'df'
# #append previous data
# #right now, assume we have the previous day's data. update later to calc for anything missing
# prev_dataframe = sqlContext.read.parquet("s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/fxa_mau_dau_daily/v1/collection_date="+fmt(yesterday - timedelta(1)))
# records.extend(prev_dataframe.filter(prev_dataframe.day < fmt(today)))\
# .map(lambda x: x.asDict()).collect()
records
[{'dau': 13567, 'day': '20161003', 'generated_on': '20161005', 'mau': 17210, 'test': u'jid1-NeEaf3sAHdKHPA@jetpack'}, {'dau': 14346, 'day': '20161003', 'generated_on': '20161005', 'mau': 18186, 'test': u'blok@mozilla.org'}, {'dau': 14187, 'day': '20161003', 'generated_on': '20161005', 'mau': 18976, 'test': u'@min-vid'}, {'dau': 14920, 'day': '20161003', 'generated_on': '20161005', 'mau': 26517, 'test': u'@activity-streams'}, {'dau': 10747, 'day': '20161003', 'generated_on': '20161005', 'mau': 21115, 'test': u'tabcentertest1@mozilla.com'}, {'dau': 15369, 'day': '20161003', 'generated_on': '20161005', 'mau': 25561, 'test': u'universal-search@mozilla.com'}, {'dau': 15932, 'day': '20161003', 'generated_on': '20161005', 'mau': 23362, 'test': u'wayback_machine@mozilla.org'}, {'dau': 44743, 'day': '20161003', 'generated_on': '20161005', 'mau': 67595, 'test': 'testpilot'}]
df_updated = sc.parallelize(records).map(convert_to_row).toDF().collect()
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v1/collection_date="+fmt(today)
df.repartition(10).write.format("parquet").mode("overwrite").save(s3_output)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <ipython-input-17-4ffd79fb53f8> in <module>() 1 df_updated = sc.parallelize(records).map(convert_to_row).toDF().collect() 2 s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v1/collection_date="+fmt(today) ----> 3 df.repartition(10).write.format("parquet").mode("overwrite").save(s3_output) NameError: name 'df' is not defined