import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re
import os
import seaborn as sns
import dask.dataframe as dd
import spacy
from spacy_readability import Readability
from dask import delayed
from dask_ml.model_selection import train_test_split
from gensim.models import Word2Vec, Doc2Vec
from gensim.models.doc2vec import TaggedDocument
from gensim.corpora import Dictionary
from gensim.similarities import Similarity
from gensim.models import LsiModel
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.manifold import TSNE
# from sklearn.model_selection import train_test_split
# from gensim.test.utils import common_dictionary, common_corpus
# from tqdm import tqdm
sns.set_style('darkgrid')
laptop = '/media/seapea/Blade HDD/_Storage/Data/yelp_dataset/'
tower = '/run/media/seapea/HDD1TB_1/SharedSpace/_Large_datasets/Yelp/'
nlp = spacy.load('en_core_web_md', disable=['tagger', 'ner'])
read = Readability(nlp)
nlp.add_pipe(read, last=True)
%matplotlib inline
--------------------------------------------------------------------------- OSError Traceback (most recent call last) <ipython-input-3-52b22e7789b4> in <module> 24 laptop = '/media/seapea/Blade HDD/_Storage/Data/yelp_dataset/' 25 tower = '/run/media/seapea/HDD1TB_1/SharedSpace/_Large_datasets/Yelp/' ---> 26 nlp = spacy.load('en_core_web_md', disable=['tagger', 'ner']) 27 read = Readability(nlp) 28 nlp.add_pipe(read, last=True) ~/anaconda3/lib/python3.7/site-packages/spacy/__init__.py in load(name, **overrides) 25 if depr_path not in (True, False, None): 26 deprecation_warning(Warnings.W001.format(path=depr_path)) ---> 27 return util.load_model(name, **overrides) 28 29 ~/anaconda3/lib/python3.7/site-packages/spacy/util.py in load_model(name, **overrides) 137 elif hasattr(name, "exists"): # Path or Path-like to model data 138 return load_model_from_path(name, **overrides) --> 139 raise IOError(Errors.E050.format(name=name)) 140 141 OSError: [E050] Can't find model 'en_core_web_md'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.
# Read in a sample of the larger dataset for necessary trial and error
df0 = pd.read_parquet(laptop + 'parquet/part.0.parquet')#.set_index('review_id')
df1 = pd.read_parquet(laptop + 'parquet/part.1.parquet')#.set_index('review_id')
df2 = pd.read_parquet(laptop + 'parquet/part.2.parquet')#.set_index('review_id')
df3 = pd.read_parquet(laptop + 'parquet/part.3.parquet')#.set_index('review_id')
df4 = pd.read_parquet(laptop + 'parquet/part.4.parquet')#.set_index('review_id')
# Let's concat and convert to Dask
df = pd.concat([df0, df1, df2, df3, df4], axis=0)
del(df0, df1, df2, df3, df4)
ddf = dd.from_pandas(df, chunksize=2**12)
# usr = dd.read_json('G:/SharedSpace/_Large_datasets/Yelp/user.json', lines=True, blocksize=2**28)
# biz = dd.read_json('E:/_Large_datasets/Yelp/business.json', lines=True)#, blocksize=2**28)
rev = dd.read_json(laptop + 'review.json',
lines=True, blocksize=2**22) # lower blocksize (i.e. 2*22) made this work in Linux
rev = rev.drop(['funny', 'cool', 'date', 'user_id', 'business_id'], axis=1)
rev = rev.drop_duplicates(subset='text')
# rev = rev.set_index('review_id')
# rev.to_parquet(laptop + 'parquet/')
rev.head()
review_id | stars | useful | text | |
---|---|---|---|---|
0 | Q1sbwvVQXV2734tPgoKj4Q | 1 | 6 | Total bill for this horrible service? Over $8G... |
1 | GJXCdrto3ASJOqKeVWPi6Q | 5 | 0 | I *adore* Travis at the Hard Rock's new Kelly ... |
2 | 2TzJjDVDEuAW6MR5Vuc1ug | 5 | 3 | I have to say that this office really has it t... |
3 | yi0R0Ugj_xUx_Nek0-_Qig | 5 | 0 | Went in for a lunch. Steak sandwich was delici... |
4 | 11a8sVPMUFtaC7_ABRkmtw | 1 | 7 | Today was my second out of three sessions I ha... |
Throughout this project, I have found it to be very painful working with Dask. It is not covered in the course anywhere, despite being a more realistic work environment where too much data exists to analyze things in memory. Arithmetic computations and other analyses across the Dask dataframe chunks pose challenges we haven't seen when operating only in memory.
Some basic issues encountered with Dask include:
Because of these limitations, some of the work and discovery efforts behind the scenes might not be displayed in the notebook.
Let's try some basic Dask computations, verifying integrity and observing the distribution of review ratings.
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,9))
sns.distplot(np.log((rev.useful + 2).compute()), ax=ax1)
sns.boxplot(rev.useful.compute(), ax=ax2)
<matplotlib.axes._subplots.AxesSubplot at 0x7f7603d42e80>
rev[rev.useful == rev.useful.max().compute()].text.compute()
203142 Dinner for 1.\n\n- Preface\nI went to Amy's Ba... Name: text, dtype: object
Another simple task to test the library - we will try the TfidfVectorizer. I'm working with this instead of the regular counter because it will keep our values between zero and one for the visualization I want to map.
@delayed
def ret_count(ddf):
vec = TfidfVectorizer(stop_words='english', ngram_range=(1,2), max_features=30)
X = vec.fit_transform(ddf)
names = vec.get_feature_names()
return X, names
ddf, names = ret_count(rev.text).compute()
print(names)
['best', 'came', 'chicken', 'come', 'definitely', 'did', 'didn', 'don', 'food', 'friendly', 'good', 'got', 'great', 'just', 'like', 'little', 'love', 'nice', 'order', 'ordered', 'people', 'place', 'really', 'restaurant', 'service', 'staff', 'time', 'try', 've', 'went']
Our vectorizer didn't overload the system!
Unfortunately, the counts are exceedingly high and overpower the other observations making this unhelpful.
def top4(val, ax):
sns.distplot(ddf.todense()[:,val].reshape(1,-1), kde=True, ax=ax)
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16,9), sharex=True, sharey=True)
axes = [ax1, ax2, ax3, ax4]
for val, ax in zip(range(4), axes):
top4(val, ax)
After some hours of troubleshooting (not shown in project), I comfortable with the distributed dataframes
In a previous project, I limited my vocabulary and text subset to one million words as spaCy throws an error when too large a vocabulary exists for use in memory. I have since discovered a lazy computation design for this exists within spaCy called the pipe. This allows spaCy to batch the documents and prevent breaking memory constraints. This has been a helpful discovery.
The steps below will be the analyses we take to find explicit value in reviews.
Maybe the frequency of certain words is more prevalent in more valuable reviews
Are there combinations of words (phrases) that exist frequently?
Might be a factor
Might also
Also checking if industries respond differently
'''@delayed
def ret_count(ddf):
vec = CountVectorizer(stop_words='english', ngram_range=(1,2), max_features=50)
X = vec.fit_transform(ddf)
names = vec.get_feature_names()
return X, names
ddf, names = ret_count(rev.text).compute()
print(names)'''
['amazing', 'best', 'better', 'came', 'chicken', 'come', 'day', 'definitely', 'delicious', 'did', 'didn', 'don', 'experience', 'food', 'friendly', 'going', 'good', 'got', 'great', 'just', 'know', 'like', 'little', 'love', 'make', 'menu', 'new', 'nice', 'night', 'order', 'ordered', 'people', 'place', 'pretty', 'really', 'recommend', 'restaurant', 'right', 'said', 'say', 'service', 'staff', 'sure', 'time', 'try', 've', 'wait', 'want', 'way', 'went']
We will try this again with some preprocessing - the benefits of lemmatization would increase the diversity of words selected for features.
# Basic text cleaning
def fix_nl(mytext):
text = re.sub(r'\n\n', '', mytext)
text = re.sub(r'\n', '', text)
final = re.sub(r' ', ' ', text)
return final
# Basic lemmatize function
def lemma_sent(sent):
return ' '.join(word.lemma_.lower() for word in sent if not word.is_punct and not word.is_stop)
@delayed
def process_n_count(series):
series = series.apply(fix_nl)
spaCy_text = nlp.pipe(series)
vec = CountVectorizer(stop_words='english', ngram_range=(1,2), max_features=300)
vec.fit_transform([lemma_sent(text) for text in spaCy_text])
return vec
@delayed
def test(series, vec):
series = series.apply(fix_nl)
spaCy_test = nlp.pipe(series)
X_new = vec.transform(series)
return X_new
# This cell will test the steps of my approach against a much smaller dataframe
df.text = df.text.apply(fix_nl)
new_txt = list(nlp.pipe(df.iloc[:5000].text))
cvec = CountVectorizer(stop_words='english', max_features=100)
cvec.fit_transform([lemma_sent(txt) for txt in new_txt])
<5000x100 sparse matrix of type '<class 'numpy.int64'>' with 52872 stored elements in Compressed Sparse Row format>
cvec.transform([lemma_sent(txt) for txt in list(nlp.pipe(df.iloc[5000:6000].text))])
<1000x100 sparse matrix of type '<class 'numpy.int64'>' with 10395 stored elements in Compressed Sparse Row format>
The transformation appears to work against new data! We will now attempt the same process against the entire 'rev' dataset.
Note: This is the same tactic as we tried above (commented out), but we are hoping for better, more accurate results since we are reducing the text to lemmas and cleaning it more.
X_train, X_test, y_train, y_test = train_test_split(rev[['text', 'useful']], rev.useful, test_size=0.5)
# Had to run this training and compute command after emptying as much in memory as possible
# new_vec = process_n_count(X_train).compute()
from joblib import dump, load
# dump(new_vec, "vec.joblib")
new_vec = load('vec.joblib')
myX = test(X_test.text, new_vec).compute()
sns.set_style('darkgrid')
# plt.style.use('ggplot')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16,9), sharey=True)
myInd1 = X_train[X_train.useful > 0].index.compute()
myInd0 = X_train[X_train.useful == 0].index.compute()
ax1.bar(range(300), np.array(myX[myInd1].sum(axis=0)).tolist()[0], width=1)
ax2.bar(range(300), np.array(myX[myInd0].sum(axis=0)).tolist()[0], width=1)
<BarContainer object of 300 artists>
# Heatmap
nlp.pipe(rev.text)
After some trial, error, and research, it is my conclusion that the model computation will not differ between one large or several chunks. This stems from a blog written by the library's author - Radim Rehurek - discussing multiprocessing for faster running times. If the content can be processed in parallel, it is not requiring information from the entire corpus and can be run in pieces.
We will verify if this can be done below. Within gensim, we can access the document and word vectors after training updates to see if the model updates.
# Tagging function with delayed capability
@delayed
def tag(df):
df.text = df.text.apply(fix_nl)
tags = []
for i, doc in enumerate(nlp.pipe(df.text, disable=['ner', 'parser'])):
mylist = [chunk.lemma_.lower() for chunk in doc if not chunk.is_punct and not chunk.is_stop]
tags.append(TaggedDocument(mylist, [i]))
vec = Doc2Vec(min_count=1)
vec.build_vocab(tags)
return vec, tags
vec1, tags1 = tag(ddf).compute()
test_arr = vec1.docvecs[0]
vec1.docvecs.most_similar(0, topn=5)
[(17033, 0.38854920864105225), (23295, 0.3664354085922241), (12518, 0.3616259694099426), (12915, 0.3600383400917053), (14743, 0.35090503096580505)]
vec1.wv.most_similar(tags1[0][0], topn=5)
[('cantor', 0.3950194716453552), ('allllways', 0.39479339122772217), ('chitown', 0.3757461905479431), ('fantastico', 0.35876384377479553), ('sick', 0.35497692227363586)]
vec2, tags2 = tag(rev).compute()
vec1.train(tags2, len(tags1) + len(tags2), epochs=5)
test_arr1 = vec1.docvecs[0]
vec1.docvecs.most_similar(0, topn=5)
[(17033, 0.38854920864105225), (23295, 0.3664354085922241), (12518, 0.3616259694099426), (12915, 0.3600383400917053), (14743, 0.35090503096580505)]
vec1.wv.most_similar(tags1[0][0], topn=5)
[('apartment', 0.9960299134254456), ('spicy', 0.9960122108459473), ('oil', 0.9959917664527893), ('dry', 0.9959776401519775), ('b', 0.9959765672683716)]
# Every single value in array is equal
sum(test_arr != test_arr1)
0
It appears the document vectors within the model are not returning different results when compared with the most_similar method. I did see this somewhere but wanted to verify. We can verify the document vectors themselves are not different, as shown when comparing the test arrays.
Moving forward, some Google queries and documentation digging show Doc2Vec does not allow the same training update approach that word vectors allow, as I suspected. We will have to work with the word embeddings instead.
We are actually able to
Where does model.wv.similarity_matrix fit in?
vec = CountVectorizer(max_features=100, ngram_range=(1,2), stop_words='english')
@delayed
def vectorize(df):
return vec.fit_transform(df.text)
it = vectorize(rev).compute()
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-71-bd7a2913c3c3> in <module> 5 return vec.fit_transform(df.text) 6 ----> 7 it = vectorize(rev).compute() ~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 173 dask.base.compute 174 """ --> 175 (result,) = compute(self, traverse=False, **kwargs) 176 return result 177 ~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 444 keys = [x.__dask_keys__() for x in collections] 445 postcomputes = [x.__dask_postcompute__() for x in collections] --> 446 results = schedule(dsk, keys, **kwargs) 447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 448 ~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs) 80 get_id=_thread_get_id, 81 pack_exception=pack_exception, ---> 82 **kwargs 83 ) 84 ~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 478 # Main loop, wait on tasks to finish, insert new ones 479 while state["waiting"] or state["ready"] or state["running"]: --> 480 key, res_info, failed = queue_get(queue) 481 if failed: 482 exc, tb = loads(res_info) ~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in queue_get(q) 142 143 def queue_get(q): --> 144 return q.get() 145 146 ~/anaconda3/envs/py36/lib/python3.6/queue.py in get(self, block, timeout) 162 elif timeout is None: 163 while not self._qsize(): --> 164 self.not_empty.wait() 165 elif timeout < 0: 166 raise ValueError("'timeout' must be a non-negative number") ~/anaconda3/envs/py36/lib/python3.6/threading.py in wait(self, timeout) 293 try: # restore state no matter what (e.g., KeyboardInterrupt) 294 if timeout is None: --> 295 waiter.acquire() 296 gotit = True 297 else: KeyboardInterrupt:
# sklearn tool faster than [fix_nl(txt).split] for txt in rev.text.compute()])
mydct = Dictionary([vec.get_feature_names()])
mycorp = [mydct.doc2bow(fix_nl(txt).split()) for txt in rev.text.compute()]
mylsi = LsiModel(mycorp, id2word=mydct, num_topics=100)
mylsi[mydct.doc2bow(fix_nl(list(rev.text.loc[0].compute())[0]).split())]
# sklearn tool faster than [fix_nl(txt).split] for txt in rev.text.compute()])
mydct = Dictionary([vec.get_feature_names()])
mycorp = [mydct.doc2bow(fix_nl(txt).split()) for txt in rev.text.compute()]
mylsi = LsiModel(mycorp, id2word=mydct, num_topics=2)
mylsi[mydct.doc2bow(fix_nl(list(rev.text.loc[0].compute())[0]).split())]
[(0, 0.8622897373758738), (1, -0.7724632421528443)]
mybow = [mydct.doc2bow(fix_nl(list(rev.text.loc[i].compute())[0]).split()) for i in range(len(rev))]
mylsa = [mylsi[bow] for bow in mybow]
x = []
y = []
for i, val in enumerate(mylsa):
try:
x.append(val[0][1])
except:
pass
try:
y.append(val[1][1])
except:
pass
fig, ax = plt.subplots(figsize=(16,9))
sns.scatterplot(x=x, y=y, alpha=0.1)
<matplotlib.axes._subplots.AxesSubplot at 0x7f39c2184f98>
tsne = TSNE()
TSNE.fit_transform(mylsi)
7.533877018931459
Compare the text to both and check soft cosine similarity?