Copy of Tiziano Piccardi's DataSetAnonymization, altered to preserve useragent.is_bot
in both citationusage and citationusagepageload tables.
import pyspark
import re
import pyspark.sql
from pyspark.sql import *
import pandas as pd
import matplotlib.pyplot as plt
import hashlib
import os.path
from pyspark.sql.functions import *
from datetime import timedelta, date
%matplotlib inline
spark_hive = pyspark.sql.HiveContext(sc)
pageloads = spark.sql("select event.session_token from event.citationusagepageload").distinct()
citationusage = spark.sql("select event.session_token from event.citationusage").distinct()
all_tokens = pageloads.union(citationusage).distinct().rdd.zipWithIndex()
session_ids = sqlContext.createDataFrame(all_tokens.map(lambda r:
Row(session_token=r[0].session_token, session_id=r[1])))
session_ids.cache().write.parquet("session_ids.parquet")
session_ids.show()
+----------+--------------------+ |session_id| session_token| +----------+--------------------+ | 0|e7e800a1b82909b4c037| | 1|77f3df30d8f9c579d6b9| | 2|3de8636a9aaff633c5f2| | 3|f1829efe15aa1e26d59e| | 4|9b7ebe75853b1d8b75f4| | 5|6362ed76a063f821ed33| | 6|3e4e49cc8cf0d800587b| | 7|529ba0f5bd46fb0f51eb| | 8|926758e0d73fbcaf2bbb| | 9|ed34a5bcac1b29495f70| | 10|489e7f3c58cc2a6227bc| | 11|bcf4b3768079ed4ce754| | 12|2863173fc6f780634769| | 13|5759f8e9386d19ff5782| | 14|8466e9dfc2a38e7adde6| | 15|9a77459c516a1792a4a8| | 16|e77c252e139768583f51| | 17|264ee2575f6f4e4255e2| | 18|578fccd38d53a295f59f| | 19|cb014c9b031a633a776e| +----------+--------------------+ only showing top 20 rows
query = """
SELECT page_id, year(event_timestamp) edit_year, month(event_timestamp) edit_month,
dayofmonth(event_timestamp) edit_day, hour(event_timestamp) edit_hour,
event_user_text_historical ip
FROM wmf.mediawiki_history
WHERE wiki_db = 'enwiki'
AND event_user_is_anonymous = TRUE
AND to_timestamp(event_timestamp) > '2019-03-01'
AND page_namespace = 0
AND page_is_redirect = FALSE
"""
anonymous_edits = spark.sql(query).distinct()
anonymous_edits
DataFrame[page_id: bigint, edit_year: int, edit_month: int, edit_day: int, edit_hour: int, ip: string]
pageloads = spark.sql("select * from event.citationusagepageload")
pageloads
DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]
Total pageload events:
pageloads_original = pageloads.count()
pageloads_original
1829735489
Total unique sessions:
pageloads_original_sessions = pageloads.select("event.session_token").distinct().count()
pageloads_original_sessions
956199928
citationusage = spark.sql("select * from event.citationusage")
citationusage
DataFrame[dt: string, event: struct<action:string,citation_in_text_refs:bigint,dom_interactive_time:bigint,event_offset_time:bigint,ext_position:bigint,footnote_number:bigint,freely_accessible:boolean,in_infobox:boolean,link_occurrence:bigint,link_text:string,link_url:string,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,section_id:string,session_token:string,skin:string,citation_identifier_label:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]
Total click events:
citationusage_original = citationusage.count()
citationusage_original
117186592
Unique sessions:
citationusage_original_sessions = citationusage.select("event.session_token").distinct().count()
citationusage_original_sessions
75305966
sessions_with_edits = pageloads.join(anonymous_edits, pageloads.ip == anonymous_edits.ip)\
.where(pageloads.year == anonymous_edits.edit_year)\
.where(pageloads.month == anonymous_edits.edit_month)\
.where(pageloads.day == anonymous_edits.edit_day)\
.where(pageloads.hour == anonymous_edits.edit_hour)\
.where(pageloads.event.page_id == anonymous_edits.page_id)\
.select("event.session_token").distinct()
sessions_with_edits
DataFrame[session_token: string]
Count the sessions to excude:
sessions_with_edits.cache().count()
118919
Left join to keep only the pageloads of the sessions without edits:
sessions_with_edits.registerTempTable("sessions_with_edits")
query = """
SELECT p.*
FROM event.citationusagepageload p
LEFT JOIN sessions_with_edits s
ON p.event.session_token = s.session_token
WHERE s.session_token IS NULL
"""
pageloads_clean = spark.sql(query)
pageloads_clean
DataFrame[dt: string, event: struct<action:string,dom_interactive_time:bigint,event_offset_time:bigint,mode:string,namespace_id:bigint,page_id:bigint,page_title:string,page_token:string,referrer:string,revision_id:bigint,session_token:string,skin:string>, ip: string, recvfrom: string, revision: bigint, schema: string, seqid: bigint, useragent: struct<browser_family:string,browser_major:string,browser_minor:string,device_family:string,is_bot:boolean,is_mediawiki:boolean,os_family:string,os_major:string,os_minor:string,wmf_app_version:string>, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, topic: string, year: bigint, month: bigint, day: bigint, hour: bigint]
Get the number of pageload events after the cleaning:
pageloads_anonymized = pageloads_clean.count()
Get the number of individual sessions:
pageloads_anonymized_sessions = pageloads_clean.select("event.session_token").distinct().count()
Add the session id and drop the critical fields:
anonymous_pageloads_nested = pageloads_clean\
.alias("pl")\
.join(session_ids.alias("ids"), pageloads_clean.event.session_token == session_ids.session_token)\
.select(["pl.*", "ids.session_id", to_timestamp("pl.dt").alias("event_time")])
anonymous_pageloads_nested.printSchema()
root |-- dt: string (nullable = true) |-- event: struct (nullable = true) | |-- action: string (nullable = true) | |-- dom_interactive_time: long (nullable = true) | |-- event_offset_time: long (nullable = true) | |-- mode: string (nullable = true) | |-- namespace_id: long (nullable = true) | |-- page_id: long (nullable = true) | |-- page_title: string (nullable = true) | |-- page_token: string (nullable = true) | |-- referrer: string (nullable = true) | |-- revision_id: long (nullable = true) | |-- session_token: string (nullable = true) | |-- skin: string (nullable = true) |-- ip: string (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- useragent: struct (nullable = true) | |-- browser_family: string (nullable = true) | |-- browser_major: string (nullable = true) | |-- browser_minor: string (nullable = true) | |-- device_family: string (nullable = true) | |-- is_bot: boolean (nullable = true) | |-- is_mediawiki: boolean (nullable = true) | |-- os_family: string (nullable = true) | |-- os_major: string (nullable = true) | |-- os_minor: string (nullable = true) | |-- wmf_app_version: string (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- topic: string (nullable = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true) |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true)
anonymous_pageloads_nested.registerTempTable('anonymous_pageloads_nested')
query = """
SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,
recvfrom, revision, schema, seqid,
uuid, webhost, wiki, geocoded_data,
year, month, day, hour
FROM anonymous_pageloads_nested
"""
anonymous_pageloads = spark.sql(query).drop('session_token')
anonymous_pageloads.printSchema()
root |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true) |-- action: string (nullable = true) |-- dom_interactive_time: long (nullable = true) |-- event_offset_time: long (nullable = true) |-- mode: string (nullable = true) |-- namespace_id: long (nullable = true) |-- page_id: long (nullable = true) |-- page_title: string (nullable = true) |-- page_token: string (nullable = true) |-- referrer: string (nullable = true) |-- revision_id: long (nullable = true) |-- skin: string (nullable = true) |-- useragent_is_bot: boolean (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true)
Sanity check:
anonymous_pageloads_count = anonymous_pageloads.count()
anonymous_pageloads_count
1828725320
Sanity check, number of unique sessions:
anonymous_pageloads_sessions_count = anonymous_pageloads.select("session_id").distinct().count()
anonymous_pageloads_sessions_count
956081009
anonymous_pageloads.write.parquet("anonymous_pageloads_april.parquet")
anonymous_citationusage_nested = citationusage\
.alias("cit")\
.join(session_ids.alias("ids"), citationusage.event.session_token == session_ids.session_token)\
.select(["cit.*", "ids.session_id", to_timestamp("cit.dt").alias("event_time")])
anonymous_citationusage_nested.printSchema()
root |-- dt: string (nullable = true) |-- event: struct (nullable = true) | |-- action: string (nullable = true) | |-- citation_in_text_refs: long (nullable = true) | |-- dom_interactive_time: long (nullable = true) | |-- event_offset_time: long (nullable = true) | |-- ext_position: long (nullable = true) | |-- footnote_number: long (nullable = true) | |-- freely_accessible: boolean (nullable = true) | |-- in_infobox: boolean (nullable = true) | |-- link_occurrence: long (nullable = true) | |-- link_text: string (nullable = true) | |-- link_url: string (nullable = true) | |-- mode: string (nullable = true) | |-- namespace_id: long (nullable = true) | |-- page_id: long (nullable = true) | |-- page_title: string (nullable = true) | |-- page_token: string (nullable = true) | |-- referrer: string (nullable = true) | |-- revision_id: long (nullable = true) | |-- section_id: string (nullable = true) | |-- session_token: string (nullable = true) | |-- skin: string (nullable = true) | |-- citation_identifier_label: string (nullable = true) |-- ip: string (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- useragent: struct (nullable = true) | |-- browser_family: string (nullable = true) | |-- browser_major: string (nullable = true) | |-- browser_minor: string (nullable = true) | |-- device_family: string (nullable = true) | |-- is_bot: boolean (nullable = true) | |-- is_mediawiki: boolean (nullable = true) | |-- os_family: string (nullable = true) | |-- os_major: string (nullable = true) | |-- os_minor: string (nullable = true) | |-- wmf_app_version: string (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- topic: string (nullable = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true) |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true)
anonymous_citationusage_nested.registerTempTable('anonymous_citationusage_nested')
query = """
SELECT session_id, event_time, event.*, useragent.is_bot as useragent_is_bot,
recvfrom, revision, schema, seqid,
uuid, webhost, wiki, geocoded_data,
year, month, day, hour
FROM anonymous_citationusage_nested
"""
anonymous_citationusage = spark.sql(query).drop('session_token')
anonymous_citationusage.printSchema()
root |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true) |-- action: string (nullable = true) |-- citation_in_text_refs: long (nullable = true) |-- dom_interactive_time: long (nullable = true) |-- event_offset_time: long (nullable = true) |-- ext_position: long (nullable = true) |-- footnote_number: long (nullable = true) |-- freely_accessible: boolean (nullable = true) |-- in_infobox: boolean (nullable = true) |-- link_occurrence: long (nullable = true) |-- link_text: string (nullable = true) |-- link_url: string (nullable = true) |-- mode: string (nullable = true) |-- namespace_id: long (nullable = true) |-- page_id: long (nullable = true) |-- page_title: string (nullable = true) |-- page_token: string (nullable = true) |-- referrer: string (nullable = true) |-- revision_id: long (nullable = true) |-- section_id: string (nullable = true) |-- skin: string (nullable = true) |-- citation_identifier_label: string (nullable = true) |-- useragent_is_bot: boolean (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true)
Sanity check:
citationusage_anonymized = anonymous_citationusage.count()
citationusage_anonymized
117186558
Sanity check, number of unique sessions:
citationusage_anonymized_sessions = anonymous_citationusage.select("session_id").distinct().count()
citationusage_anonymized_sessions
75305945
anonymous_citationusage.write.parquet("anonymous_citationusage_april.parquet")
print("Number of clicks events removed:")
print(citationusage_original - citationusage_anonymized)
print("Percentage of the dataset removed:")
print((citationusage_original - citationusage_anonymized)/citationusage_original)
Number of clicks events removed: 34 Percentage of the dataset removed: 2.9013558138118737e-07
Number of removed sessions:
print("Number of sessions removed:")
print(citationusage_original_sessions - citationusage_anonymized_sessions)
print("Percentage of the dataset removed:")
print((citationusage_original_sessions - citationusage_anonymized_sessions)/citationusage_original_sessions)
Number of sessions removed: 21 Percentage of the dataset removed: 2.7886236795634494e-07
print("Number of pageload removed removed:")
print(pageloads_original - pageloads_anonymized)
print("Percentage of the dataset removed:")
print((pageloads_original - pageloads_anonymized)/pageloads_original)
Number of pageload removed removed: 1010169 Percentage of the dataset removed: 0.0005520847172025311
print("Number of sessions removed:")
print(pageloads_original_sessions - pageloads_anonymized_sessions)
print("Percentage of the dataset removed:")
print((pageloads_original_sessions - pageloads_anonymized_sessions)/pageloads_original_sessions)
Number of sessions removed: 118919 Percentage of the dataset removed: 0.00012436625073663466
Release cache:
spark.catalog.clearCache()
anonymous_citationusage = spark.read.parquet("anonymous_citationusage_april.parquet")
anonymous_citationusage
DataFrame[session_id: bigint, event_time: timestamp, action: string, citation_in_text_refs: bigint, dom_interactive_time: bigint, event_offset_time: bigint, ext_position: bigint, footnote_number: bigint, freely_accessible: boolean, in_infobox: boolean, link_occurrence: bigint, link_text: string, link_url: string, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, section_id: string, skin: string, citation_identifier_label: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]
Check count:
anonymous_citationusage.count()
117186558
anonymous_pageloads = spark.read.parquet("anonymous_pageloads_april.parquet")
anonymous_pageloads
DataFrame[session_id: bigint, event_time: timestamp, action: string, dom_interactive_time: bigint, event_offset_time: bigint, mode: string, namespace_id: bigint, page_id: bigint, page_title: string, page_token: string, referrer: string, revision_id: bigint, skin: string, useragent_is_bot: boolean, recvfrom: string, revision: bigint, schema: string, seqid: bigint, uuid: string, webhost: string, wiki: string, geocoded_data: map<string,string>, year: bigint, month: bigint, day: bigint, hour: bigint]
Check count:
anonymous_pageloads.count()
1828723456
anonymous_citationusage.printSchema()
root |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true) |-- action: string (nullable = true) |-- citation_in_text_refs: long (nullable = true) |-- dom_interactive_time: long (nullable = true) |-- event_offset_time: long (nullable = true) |-- ext_position: long (nullable = true) |-- footnote_number: long (nullable = true) |-- freely_accessible: boolean (nullable = true) |-- in_infobox: boolean (nullable = true) |-- link_occurrence: long (nullable = true) |-- link_text: string (nullable = true) |-- link_url: string (nullable = true) |-- mode: string (nullable = true) |-- namespace_id: long (nullable = true) |-- page_id: long (nullable = true) |-- page_title: string (nullable = true) |-- page_token: string (nullable = true) |-- referrer: string (nullable = true) |-- revision_id: long (nullable = true) |-- section_id: string (nullable = true) |-- skin: string (nullable = true) |-- citation_identifier_label: string (nullable = true) |-- useragent_is_bot: boolean (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true)
anonymous_pageloads.printSchema()
root |-- session_id: long (nullable = true) |-- event_time: timestamp (nullable = true) |-- action: string (nullable = true) |-- dom_interactive_time: long (nullable = true) |-- event_offset_time: long (nullable = true) |-- mode: string (nullable = true) |-- namespace_id: long (nullable = true) |-- page_id: long (nullable = true) |-- page_title: string (nullable = true) |-- page_token: string (nullable = true) |-- referrer: string (nullable = true) |-- revision_id: long (nullable = true) |-- skin: string (nullable = true) |-- useragent_is_bot: boolean (nullable = true) |-- recvfrom: string (nullable = true) |-- revision: long (nullable = true) |-- schema: string (nullable = true) |-- seqid: long (nullable = true) |-- uuid: string (nullable = true) |-- webhost: string (nullable = true) |-- wiki: string (nullable = true) |-- geocoded_data: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- year: long (nullable = true) |-- month: long (nullable = true) |-- day: long (nullable = true) |-- hour: long (nullable = true)
anonymous_pageloads.registerTempTable('anonymous_pageloads')
pageloads_query = """
select event_time, session_id, page_id, 'pageLoad' as action
from anonymous_pageloads
where wiki = 'enwiki'
AND namespace_id = 0
"""
pageloads = spark.sql(pageloads_query)
pageloads
DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]
pageloads.show(5)
+-------------------+----------+--------+--------+ | event_time|session_id| page_id| action| +-------------------+----------+--------+--------+ |2019-04-04 06:08:18| 4497935| 3157978|pageLoad| |2019-04-06 14:04:01| 2565696|49312398|pageLoad| |2019-04-06 10:35:38| 3423790|28658666|pageLoad| |2019-03-29 01:03:23| 4172151| 1418277|pageLoad| |2019-03-24 15:34:54| 2019617|52839809|pageLoad| +-------------------+----------+--------+--------+ only showing top 5 rows
Get the unique session IDs to filter the "citationusage" table (the pageload is a subsamping).
unique_sessions = pageloads.select("session_id").distinct()
anonymous_citationusage.registerTempTable('anonymous_citationusage')
unique_sessions.registerTempTable('unique_sessions')
events_query = """
select event_time, cu.session_id, page_id, action
from anonymous_citationusage cu
join unique_sessions us
on us.session_id=cu.session_id
where wiki = 'enwiki'
"""
events = spark.sql(events_query)
events
DataFrame[event_time: timestamp, session_id: bigint, page_id: bigint, action: string]
Merge all the events:
sessions_rdd = events.rdd.union(pageloads.rdd)\
.map(lambda r: (r.session_id, [(r.event_time, r.page_id, r.action)]))\
.reduceByKey(lambda a,b: a+b)\
.map(lambda r: (r[0], sorted(r[1], key=lambda x: x[0])))\
.map(lambda r: Row(session_id=r[0], events=[
Row(event_time=e[0], page_id=e[1], action=e[2]) for e in r[1]]))
sessions_rdd.take(1)
[Row(events=[Row(action='pageLoad', event_time=datetime.datetime(2019, 3, 25, 1, 56, 14), page_id=45278900)], session_id=0)]
sessions = sqlContext.createDataFrame(sessions_rdd)
sessions.show(5)
+--------------------+----------+ | events|session_id| +--------------------+----------+ |[[pageLoad, 2019-...| 982515712| |[[pageLoad, 2019-...| 0| |[[pageLoad, 2019-...| 161480706| |[[pageLoad, 2019-...| 322961412| |[[pageLoad, 2019-...| 484442118| +--------------------+----------+ only showing top 5 rows
def count_missing_load(row):
events = {}
for i in range(len(row.events)):
if row.events[i].action == 'pageLoad' and row.events[i].page_id not in events:
events[row.events[i].page_id] = i
for i in range(len(row.events)):
if row.events[i].action is not 'pageLoad' and (row.events[i].page_id not in events or events[row.events[i].page_id] > i):
return 1
return 0
errors_count = sessions.rdd.map(count_missing_load)
errors_rate = errors_count.sum()/unique_sessions.count()
errors_rate
8.327657225917923e-05
Number of sessions:
sessions.count()
956079217