In [1]:
!apt-get install openjdk-8-jdk-headless > /dev/null
!wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install findspark
--2019-04-19 05:46:15--  http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1
Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 230778742 (220M) [application/octet-stream]
Saving to: ‘spark-2.4.1-bin-hadoop2.7.tgz’

spark-2.4.1-bin-had 100%[===================>] 220.09M  3.69MB/s    in 70s     

2019-04-19 05:47:26 (3.13 MB/s) - ‘spark-2.4.1-bin-hadoop2.7.tgz’ saved [230778742/230778742]

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/b1/c8/e6e1f6a303ae5122dc28d131b5a67c5eb87cbf8f7ac5b9f87764ea1b1e1e/findspark-1.3.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.3.0
In [0]:
"""
We are running these lines because we are operating on Google Colab
"""
from google.colab import drive
drive.mount('/content/gdrive')

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"
os.chdir('/content/gdrive/My Drive/finch/spark/text_classification/imdb')
In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [0]:
def get_idx2word(_index_from=3):
  word2idx = tf.keras.datasets.imdb.get_word_index()
  word2idx = {k:(v+_index_from) for k,v in word2idx.items()}
  word2idx["<pad>"] = 0
  word2idx["<start>"] = 1
  word2idx["<unk>"] = 2
  idx2word = {idx: w for w, idx in word2idx.items()}
  return idx2word


def make_df(x, y):
  return sess.createDataFrame(
    [(int(y_), [idx2word[idx] for idx in x_]) for x_, y_ in zip(x, y)],
    ['label', 'words'])
In [5]:
import tensorflow as tf
idx2word = get_idx2word()
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.imdb.load_data(num_words=20000)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb_word_index.json
1646592/1641221 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
17465344/17464789 [==============================] - 0s 0us/step
In [6]:
sess = SparkSession.builder.appName('imdb').getOrCreate()

pipeline = Pipeline(stages=[
  CountVectorizer(inputCol='words', outputCol='tf'),
  IDF(inputCol='tf', outputCol='tf_idf'),
  LogisticRegression(featuresCol='tf_idf', regParam=1.),
])

df_train = make_df(X_train, y_train)
df_test = make_df(X_test, y_test)

prediction = pipeline.fit(df_train).transform(df_test)
print("Testing Accuracy: {:.3f}".format(
  MulticlassClassificationEvaluator(metricName='accuracy').evaluate(prediction)))
Testing Accuracy: 0.882