import re import json import time import threading from collections import defaultdict from itertools import chain from requests import post, get from math import sqrt from wabbit_wappa import VW import numpy as np import pandas as pd pd.options.display.mpl_style = 'default' import seaborn as sb import matplotlib.pylab as pylab import matplotlib.pyplot as plt pylab.rcParams['figure.figsize'] = (12.0, 8.0) pylab.rcParams['font.family'] = 'Bitstream Vera Sans' def wikipedia_updates(callback): endpoint = "http://wikistream.inkdroid.org/socket.io/1" session_id = post(endpoint).content.decode('utf-8').split(':')[0] xhr_endpoint = "/".join((endpoint, "xhr-polling", session_id)) while True: t = time.time() * 1000000 response = get(xhr_endpoint, params={'t': t}).content.decode('utf-8') chunks = re.split(u'\ufffd[0-9]+\ufffd', response) for chunk in chunks: parts = chunk.split(':', 3) if len(parts) == 4: try: payload = json.loads(parts[3])['args'][0] except: raise ValueError('Received non-json data: ' + chunk) if not callback(payload): return max_edits = 500 edits = [] def simple_callback(edit): edit['timestamp'] = time.time() edits.append(edit) return len(edits) < max_edits thr = threading.Thread(target=wikipedia_updates, args=(simple_callback,)) thr.start() reg = re.compile(r'^.*wiki/(.*)') c = len(edits) n = min(5, c) print('Last %d of %d edits...\n' % (n, c)) for edit in edits[-n:]: wiki_name = edit['wikipediaLong'] page_name = reg.match(edit['pageUrl']).group(1) comment = edit['comment'] print('\t'.join((wiki_name, page_name, comment)), '\n', flush=True) if c == max_edits: print('Done.\n') else: print('Still streaming.\n') df = pd.DataFrame(edits) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') df.head() p = df.wikipedia.value_counts().plot(kind='barh') df[df.wikipediaShort=='wd'].robot.value_counts() ts = pd.pivot_table(df, values='delta', index='timestamp', columns='wikipediaShort') ts.head() sec_counts = ts.resample('s', how='count') sec_counts.head() p = sec_counts.plot() p = sec_counts.cumsum().plot() # Helper function to get all length-n ngrams def _ngrams(text, n): return (text[i:i+n] for i in range(len(text) - (n-1))) # Helper function to get all words, padded by a space on either side def _words(text): return (s.center(len(s) + 2) for s in text.split()) # Helper function to filter out tokens that are all whitespace or numerals def _filter(word): return (not word.strip().isspace()) and (not word.strip().isnumeric()) # Get all ngrams for n1...n2, and optionally whole words def ngrams(text, n1, n2, include_words=False): if n1 < 1: raise ValueError('n1 must be 1 or more') if n2 < n1: raise ValueError('n2 must not be less than n1') return filter(_filter, chain(_words(text) if include_words else (), chain.from_iterable(_ngrams(text, n) for n in range(n1, n2+1)))) print(list(ngrams(' the quick brown fox 1997', 1, 2, include_words=True))) skip_reg = re.compile(r'[(),]+') split_reg = re.compile(r'[ _-]+') def page_name(edit): if edit['wikipediaShort'] == 'wd' or \ edit['wikipediaShort'] == 'en' or \ edit['wikipediaShort'] == 'co' or \ edit['namespace'] != 'article': return False name = split_reg.sub(' ', skip_reg.sub('', edit['page'].lower().strip())) return name.center(len(name) + 2) test_edit = {'wikipediaShort': 'xx', 'namespace': 'article', 'page': 'Testing, Testing_ (one-two) '} print(page_name(test_edit)) # Create a new ringbuffer for the given number of predictions def new_ringbuffer(capacity): return np.zeros((capacity, 2), dtype=np.int8) - 1 # Calculate accuracy (% correct predictions) over those rows which have been filled in def accuracy(ringbuffer): # Filter identifying rows which have been filled in valid = np.sum(ringbuffer, axis=1) >= 0 # Filter identifying rows where prediction == correct value matched = ringbuffer[:,0] == ringbuffer[:,1] # Accuracy is number of rows which are valid AND matched, over number of valid rows return np.sum(np.logical_and(valid, matched)) / np.sum(valid) # Return the number of valid (filled-in) rows def valid_size(ringbuffer): return np.sum(np.sum(ringbuffer, axis=1) >= 0) # Add a new test result to the ringbuffer, deleting the oldest entry if necessary, # and returning a reference to the modified buffer def add_entry(ringbuffer, actual, predicted): capacity = len(ringbuffer) # Overwrite the bottom row of the array ringbuffer[capacity-1, :] = (actual, predicted) # Roll the newest entry around to the top of the array, pushing the rest down return np.roll(ringbuffer, 1, axis=0) # How many classes to allow in the model (must be provided up front) max_expected_classes = 30 # When to stop, if not interrupted max_edits = 100000 # How many tests to calculate score over k = 500 # Ringbuffer to hold results of each test rb = new_ringbuffer(k) # How many bits in range of VW hash function: # larger = potentially better accuracy but more memory hashbits = 29 # How many edits we've received, and trained on, respectively received = 0 trained = 0 # List of the wiki names we've seen, to be populated as we go along labels = [] # Maps each wiki name -> its index in labels list label_to_idx = {} def vw_callback(edit): name = page_name(edit) if not name: return True global rb, received, trained received += 1 # Set up mappings for this wiki if this is the first time we've seen it label = edit['wikipedia'].replace(' Wikipedia', '') if label in label_to_idx: idx = label_to_idx[label] else: # If we have exhausted our preset class limit, just skip it! Bit kludgey... if len(labels) == max_expected_classes: print("Ignoring class %d" % len(labels)) return True labels.append(label) idx = len(labels) - 1 label_to_idx[label] = idx # Generate binary features -- unigrams, bigrams, trigrams, and whole words features = set(ngrams(name, 1, 3, include_words=True)) # Test example first -- does model get it right? raw_line = vw.make_line(features=features) # print(raw_line) result = vw.send_line(raw_line).prediction prediction = int(result) - 1 # NB VW FEATURE LABELS START AT 1 # Add prediction to ringbuffer rb = add_entry(rb, idx, prediction) # Train on this example only if prediction was wrong if idx == prediction: #print('%s [%s] OK' % (name, labels[idx]), flush=True) pass else: actual = idx if idx >= len(labels) else labels[idx] predicted = prediction if prediction >= len(labels) else labels[prediction] tested = valid_size(rb) print('%s [%s] Failed!\nPrediction: %s, Received: %d, Trained: %d, Last-%d Accuracy: %0.3f' % (name, actual, predicted, received, trained, tested, accuracy(rb)), flush=True) raw_line = vw.make_line(idx+1, features=features) vw.send_line(raw_line) trained += 1 # Stop if we've hit max_edits return received < max_edits # Start up local VW process vw = VW(loss_function='hinge', b=hashbits, ect=max_expected_classes, hash='all', q='nn') print(vw.command) try: wikipedia_updates(vw_callback) finally: vw.close() confusion = np.zeros((len(labels), len(labels))) for (x, y) in rb: confusion[x,y] += 1 samples_per_wiki = np.asarray(confusion.sum(axis=1)) sorted_idxs = np.argsort(samples_per_wiki)[::-1] sorted_confusion = confusion[sorted_idxs,:][:,sorted_idxs] sorted_confusion.shape fig, ax = plt.subplots(1) p = ax.pcolormesh(sorted_confusion, cmap='Reds') fig.colorbar(p) labels_array = np.array(labels) ax.set_yticks(np.arange(sorted_confusion.shape[0])+0.5, minor=False) ax.set_xticks(np.arange(sorted_confusion.shape[1])+0.5, minor=False) ax.set_xticklabels(labels_array[sorted_idxs], minor=False) ax.set_yticklabels(labels_array[sorted_idxs], minor=False) plt.xticks(rotation=90) plt.show() normalized = sorted_confusion / sorted_confusion.sum(axis=1) np.fill_diagonal(normalized, 0) edge_weights = pd.DataFrame(data=normalized, index=labels_array[sorted_idxs], columns=labels_array[sorted_idxs]).stack() edge_weights.index.names = ('source', 'target') edge_weights.name = 'value' edge_weights.iloc[edge_weights.nonzero()].to_csv('cm.csv', header=True, index=True) from IPython.lib.display import IFrame IFrame('graph.html', 650, 650)