Data extracted from public HTML pages and written to population_wpm_sections
table. Extraction details:
enwiki-20190420-pages-articles-multistream.xml.bz2
dump filepopulation_wpm_sections
for use in other notebooks%run -i 'data-defaults.py'
import urllib.request
from bs4 import BeautifulSoup
# method to fetch page HTML and map section_ids to section H2s
def extract_h2s(row):
page_id = str(row['page_id'])
rev_id = str(row['rev_id'])
url = 'https://en.wikipedia.org/?oldid=' + rev_id
sections = list()
try:
wpm_page = urllib.request.urlopen(url)
except urllib.error.HTTPError:
print('error fetching data for: ' + str(row))
return
soup = BeautifulSoup(wpm_page.read(), features="lxml")
h2 = ""
for span in soup.find_all(class_="mw-headline"):
if('h2' == span.parent.name):
h2 = span.get('id')
sections.append(Row(page_id=page_id, section_h2=h2, section_id=span.get('id')))
return sections
# only need revision ids for 20190420
WIKIPEDIA_XML_DUMPS = ['enwiki-20190420-pages-articles-multistream.xml.bz2']
def page_rev(entity, date):
return Row(page_id=entity.id, rev_id=entity.revision.id, dt=date)
page_revs_rdd = sc.emptyRDD()
for file in WIKIPEDIA_XML_DUMPS:
wikipedia = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='page').load(file)
dump_date = re.search(r'.*(\d{8}).*',file).group(1)
articles = wikipedia\
.filter("ns = '0'")\
.filter("redirect._title is null") \
.filter("revision.text._VALUE is not null") \
.filter("length(revision.text._VALUE) > 0")
daily_page_revs = sqlContext.createDataFrame(articles.rdd.map(lambda entity: page_rev(entity, dump_date)))
page_revs_rdd = page_revs_rdd.union(daily_page_revs.rdd)
page_revs_merged = sqlContext.createDataFrame(page_revs_rdd)
page_revs_merged.registerTempTable("page_revs_date")
# extract WP:M page revisions for 20190420
query = """
SELECT DISTINCT page_id, rev_id
FROM
page_revs_date
WHERE page_id IN
(SELECT page_id FROM ryanmax.population_wpm_pages_with_extlinks)
AND dt = '20190420'
"""
wpm_revs_pandas = spark.sql(query).toPandas()
wpm_revs_pandas.describe()
page_id | rev_id | |
---|---|---|
count | 3.259700e+04 | 3.259700e+04 |
mean | 2.118422e+07 | 8.676358e+08 |
std | 1.888627e+07 | 3.811546e+07 |
min | 2.500000e+01 | 4.303365e+08 |
25% | 3.237962e+06 | 8.614966e+08 |
50% | 1.728788e+07 | 8.830005e+08 |
75% | 3.660101e+07 | 8.891040e+08 |
max | 6.053808e+07 | 8.937050e+08 |
all_sections = list()
i = 0
for index, row in wpm_revs_pandas.iterrows():
for section in extract_h2s(row):
all_sections.append(section)
i += 1
if (i%1000 == 0):
print(str(i) + " pages fetched")
sections_df = spark.createDataFrame(all_sections)
1000 pages fetched 2000 pages fetched 3000 pages fetched 4000 pages fetched 5000 pages fetched 6000 pages fetched 7000 pages fetched 8000 pages fetched 9000 pages fetched 10000 pages fetched 11000 pages fetched 12000 pages fetched 13000 pages fetched 14000 pages fetched 15000 pages fetched 16000 pages fetched 17000 pages fetched 18000 pages fetched 19000 pages fetched 20000 pages fetched 21000 pages fetched 22000 pages fetched 23000 pages fetched 24000 pages fetched 25000 pages fetched 26000 pages fetched 27000 pages fetched 28000 pages fetched 29000 pages fetched 30000 pages fetched 31000 pages fetched 32000 pages fetched
sections_df.toPandas().describe()
page_id | section_h2 | section_id | |
---|---|---|---|
count | 248540 | 248540 | 248540 |
unique | 32496 | 29280 | 74213 |
top | 602009 | References | References |
freq | 517 | 31411 | 30879 |
# write section data to table for later use
sections_df.registerTempTable("temp_population_wpm_sections")
sqlContext.sql("DROP TABLE IF EXISTS ryanmax.population_wpm_sections")
sqlContext.sql("CREATE TABLE ryanmax.population_wpm_sections AS SELECT * FROM temp_population_wpm_sections")
DataFrame[]