sparklyr
- part 2¶Please, read the pyspark course here (points 1.1 to 1.3).
[Updated jul 2019]
Open a terminal
Connect to one of the login nodes of the Hadoop Hortonworks Data Platform (HDP) cluster (remember to activate Forticlient VPN first!):
MY_CESGA_USER='abcdef' # my CESGA username
ssh $MY_CESGA_USER@hadoop3.cesga.es
In the new HDP server you have to load the Anaconda module that has the R distribution that works with Spark.
This command needs to be executed everytime a new console is opened at CESGA:
module load anaconda2 # run only once for every console
To see the available modules type:
module available # module av
We could add environment variables to .bashrc to avoid setting Sys.setenv
in R
.
However, it is better to set Sys.setenv()
as the Anaconda version may change!
cd $HOME
export R_PROFILE_USER=/usr/hdp/2.4.2.0-258/spark/R/lib/SparkR/profile/shell.R
# reload .bashrc
source .bashrc
Launch R (version 3.5.1 as of july 2019):
R
Install the libraries as needed. When you finish, exit the interactive sparkR
session.
> install.packages(c("sparklyr", "dplyr", "knitr", "repr", "tidytext", "tidyr", "purrr", "ggplot2", "viridis", "gridExtra", "rbokeh"))
> q()
Sys.getenv('R_PROFILE_USER')
To init a Jupyter notebook:
Open a new console window.
Connect to one of the login nodes of the Hadoop Hortonworks Data Platform (HDP) cluster:
ssh $MY_CESGA_USER@hadoop.cesga.es
Load Anaconda2 distribution:
module load anaconda2
Launch the Jupyter notebook:
start_jupyter
Follow the instructions in the console. You may be asked to paste an URL token into your browser.
Open the provided link in the browser
The notebook you have launched is tightly integrated with Spark in order to:
When you finish your interactive work remember to close the notebook properly:
Tip: it is a good idea to leave two consoles open simultaneously, one for Jupyter (to open the notebook) and the other for Bash and sparkR commands.
First, we need to upload our data from our NFS HOME to the HDFS HOME directory.
See the pyspark course here (point 1.2) for HDFS basic commands.
Let's define some variables:
# my username
MY_CESGA_USER='abcdef'
# my HDFS HOME
MY_HDFS_HOME=/user/$MY_CESGA_USER/
# my NFS HOME
MY_NFS_HOME=$HOME
HDFS basic commands:
# import data into hdfs (jscars.json):
hdfs dfs -put $MY_NFS_HOME/Rsession/sparklyr_start/amazon/reviews_Books_5.json # adapt to your path
# list files
hdfs dfs -ls
# list files in a directory
hdfs dfs -ls $MY_HDFS_HOME
# create a directory called 'data':
hdfs dfs -mkdir data
# delete a directory:
hdfs dfs -rm -r -f data
# delete files, folders from hdfs
hdfs dfs -rm -r -f amazon/reviews_Books_5.json
Set R
environment variables (needed for Jupyter notebooks in the cluster R
installation).
# Sys.setenv(SPARK_HOME='/usr/hdp/2.4.2.0-258/spark/R/lib/SparkR/profile/shell.R')
# commented after being added to .bashrc
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
Sys.getenv('SPARK_HOME')
Load libraries:
options(warn = -1) # disable warnings
c("sparklyr", "dplyr", "knitr", "repr", "tidytext", "tidyr", "purrr", "ggplot2", "viridis", "gridExtra", "rbokeh")
lapply(x, require, character.only = TRUE, quietly = TRUE)
Currently there are three types of contexts:
screen
to run after session close).screen
to run after session close).R/sparklyr
.Defining a new context (sc
) overwrites the previous one.
SPARK_HOME = Sys.getenv('SPARK_HOME')
# Initiating spark context: local (for 'low memory' tasks only!)
# sc <- spark_connect(master = "local", spark_home = SPARK_HOME)
# Initiating spark context: yarn (for loading bigger datasets)
sc <- spark_connect(master = "yarn-client", spark_home = SPARK_HOME)
# sc <- spark_connect(master = "yarn-client", spark_home = "/usr/hdp/2.4.2.0-258/spark")
reviews_Books_5.json
¶We will use a dataset of Amazon Product Data [1] that contains 8.9M book reviews from Amazon, spanning May 1996 - July 2014.
Dataset characteristics:
[1] Image-based recommendations on styles and substitutes J. McAuley, C. Targett, J. Shi, A. van den Hengel SIGIR, 2015 http://jmcauley.ucsd.edu/data/amazon/.
I am translating into R the following tutorial: Sentiment analysis with Spark ML. Material for Machine Learning Workshop Galicia 2016.
books <- spark_read_json(sc, name = "books", path = "amazon/reviews_Books_5.json")
# very big dataset
Here I used the Hive function rpad
to truncate the variable reviewText
to 30 characters. This allows for a correct display of the table.
See more Hive functions in the References section below. And also this:
books %>%
mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>%
select(asin, helpful, overall, reviewText_trunc, reviewTime, reviewerID, reviewerName, summary, unixReviewTime) %>%
head(3) %>%
collect()
asin | helpful | overall | reviewText_trunc | reviewTime | reviewerID | reviewerName | summary | unixReviewTime |
---|---|---|---|---|---|---|---|---|
000100039X | 0, 0 | 5 | Spiritually and mentally inspi | 12 16, 2012 | A10000012B7CGYKOMPQ4L | Adam | Wonderful! | 1355616000 |
000100039X | 0, 2 | 5 | This is one my must have books | 12 11, 2003 | A2S166WSCFIFP5 | adead_poet@hotmail.com "adead_poet@hotmail.com" | close to god | 1071100800 |
000100039X | 0, 0 | 5 | This book provides a reflectio | 01 18, 2014 | A1BM81XB4QHOA3 | Ahoro Blethends "Seriously" | Must Read for Life Afficianados | 1390003200 |
# sdf_schema(books)
books %>%
mutate(reviewText_trunc = as.character(rpad(reviewText, 20, '...'))) %>%
select(reviewText, overall) %>%
sdf_schema() %>%
print()
$reviewText $reviewText$name [1] "reviewText" $reviewText$type [1] "StringType" $overall $overall$name [1] "overall" $overall$type [1] "DoubleType"
Summary of the counts for each review score:
books %>%
count(overall, sort = TRUE) %>%
collect()
overall | n |
---|---|
5 | 4980815 |
4 | 2223094 |
3 | 955189 |
2 | 415110 |
1 | 323833 |
We will avoid neutral reviews by keeping only reviews with 1 or 5 stars overall score. We will also filter out the reviews that contain no text.
reviews <- books %>%
filter(overall == 1 | overall == 5) %>%
filter(reviewText != '')
reviews %>%
mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>%
select(overall, reviewText_trunc) %>%
head(3) %>%
collect()
overall | reviewText_trunc |
---|---|
5 | Spiritually and mentally inspi |
5 | This is one my must have books |
5 | This book provides a reflectio |
We will use cache
when the lineage of your RDD branches out or when an RDD is used multiple times like in a loop
# tbl_cache(sc, reviews, force = TRUE)
## tbl_uncache(sc, books)
Total row count:
reviews %>%
count() %>%
collect()
n |
---|
5304187 |
So far, so good.
We will convert the numerical covariate overall
to binary (0/1) features ("binarize").
Then we'll divide the reviews text into word-tokens ("tokenize").
bin_reviews <- reviews %>%
ft_binarizer(threshold = 2.5, input.col = 'overall', output.col = 'label') %>%
select(reviewText, overall, label)
tokenized_reviews <- bin_reviews %>%
ft_tokenizer(input.col = 'reviewText', output.col = 'word')
Visualize the resulting table and structure:
tokenized_reviews %>%
mutate(reviewText_trunc = as.character(rpad(reviewText, 20, '...'))) %>%
select(reviewText_trunc, overall, label, word) %>%
head(1) %>%
collect()
reviewText_trunc | overall | label | word |
---|---|---|---|
Spiritually and ment | 5 | 1 | spiritually, and , mentally , inspiring! , a , book , that , allows , you , to , question , your , morals , and , will , help , you , discover , who , you , really , are! |
tokenized_reviews %>%
mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>%
select(reviewText_trunc, overall, label, word) %>%
head(3)
# Source: lazy query [?? x 4] # Database: spark_connection reviewText_trunc overall label word <chr> <dbl> <dbl> <list> 1 Spiritually and mentally inspi 5 1 <list [22]> 2 This is one my must have books 5 1 <list [49]> 3 This book provides a reflectio 5 1 <list [40]>
Here we see that the variable word
is a list-column. To process it we would have to use tidyr::unnest
.
However, working with R methods such as unnest
implies that the dataset must be downloaded to memory, which is unfeasible given its size.
Unfortunately, as of today there's no method for unnesting in sparklyr
(see here). However it is listed as a feature request in GitHub.
As an example, you can see what unnest
can do in R:
tokenized_reviews %>%
select(word) %>%
head(10) %>%
collect() %>%
mutate(word = lapply(word, as.character)) %>%
unnest(word) %>%
head(5)
word |
---|
spiritually |
and |
mentally |
inspiring! |
a |
bin_reviews
in PySpark¶When a method is not present in sparklyr
, the easiest solution is resorting to PySpark, which exposes the Spark (Scala) programming model to Python.
A great tutorial to start is this PySpark Course, by @javicacheiro.
We will start feeding some data to PySpark, so first we must save our dataset bin_reviews
in HDFS:
spark_write_json(bin_reviews, "amazon/bin_reviews.json", mode = NULL, options = list())
Just to be sure, we can check if the database has been saved:
$ hdfs dfs -ls amazon
Follow the PySpark transformations to the data, in the notebook sparklyr_python.ipynb:
words
variable.words
column to have each word in its own row.sparklyr
¶Import the transformed data into sparklyr
again:
options(repr.plot.width=12, repr.plot.height=5)
unnested_reviews <- spark_read_json(sc, name = "unnested_reviews_json", path = "amazon/unnested_reviews_json") %>%
filter(length(word) > 2) %>%
mutate(word2 = regexp_replace(word, "[^a-zA-Z0-9]+", "")) %>%
group_by(label) %>%
count(word2, sort = TRUE)
Note the Hive/PostgreSQL function regexp_replace
used to remove punctuation and special marks.
Positive reviews:
positive_reviews <- unnested_reviews %>%
filter(label=="1") %>%
arrange(desc(n)) %>%
head(10) %>%
collect()
Negative reviews:
negative_reviews <- unnested_reviews %>%
filter(label=="0") %>%
arrange(desc(n)) %>%
head(10) %>%
collect()
total_reviews <- positive_reviews %>%
bind_rows(negative_reviews) %>%
arrange(desc(n))
total_reviews$label <- factor(total_reviews$label, labels = c("negative", "positive"))
g <- total_reviews %>%
ggplot(aes(x=reorder(word2, -n))) +
xlab("words in reviews") + ylab("count") +
theme_bw()
Printing ggplot2 plots:
g +
geom_col(aes(y=n, fill = label)) +
scale_fill_manual(values=c("#74d130", "#274a7c"))
g +
geom_col(aes(y=n, fill = n)) +
scale_fill_viridis() +
facet_grid(. ~ label)
p1 <- positive_reviews %>%
figure() %>%
ly_bar(x = word2, y = n,
hover = TRUE, color = NULL) %>%
x_axis(label = "positive words") %>%
y_axis(label = "count")
p2 <- negative_reviews %>%
figure() %>%
ly_bar(x = word2, y = n,
hover = TRUE) %>%
x_axis(label = "negative words") %>%
y_axis(label = "count")
Printing bokeh plots:
grid_plot(list(p2, p1), same_axes = TRUE, width=900, height=350)
Split data into training and evaluation data for our model:
partitions <- unnested_reviews %>%
sdf_partition(trainingData = 0.8, testData = 0.2)
partitions$trainingData %>%
head(3) %>%
collect
label | word | n |
---|---|---|
0 | "and | 600 |
0 | "book" | 737 |
0 | "for | 619 |
partitions$testData %>%
head(3) %>%
collect()
label | word | n |
---|---|---|
0 | "all | 644 |
0 | "he | 970 |
0 | "if | 1209 |
.libPaths()
Sys.getenv("R_HOME")
sessionInfo()
R version 3.3.2 (2016-10-31) Platform: x86_64-pc-linux-gnu (64-bit) Running under: CentOS Linux 7 (Core) locale: [1] LC_CTYPE=es_ES.UTF-8 LC_NUMERIC=C [3] LC_TIME=es_ES.UTF-8 LC_COLLATE=es_ES.UTF-8 [5] LC_MONETARY=es_ES.UTF-8 LC_MESSAGES=es_ES.UTF-8 [7] LC_PAPER=es_ES.UTF-8 LC_NAME=C [9] LC_ADDRESS=C LC_TELEPHONE=C [11] LC_MEASUREMENT=es_ES.UTF-8 LC_IDENTIFICATION=C attached base packages: [1] stats graphics grDevices utils datasets methods base other attached packages: [1] gridExtra_2.2.1 viridis_0.4.0 viridisLite_0.2.0 bindrcpp_0.2 [5] rbokeh_0.5.0 purrr_0.2.2.2 tidyr_0.6.0 tidytext_0.1.3 [9] repr_0.10 ggplot2_2.2.1 knitr_1.15.1 dplyr_0.7.1 [13] sparklyr_0.5.6 SparkR_1.6.1 loaded via a namespace (and not attached): [1] Rcpp_0.12.8 lattice_0.20-34 assertthat_0.1 rprojroot_1.1 [5] digest_0.6.10 psych_1.6.9 mime_0.5 IRdisplay_0.4.4 [9] R6_2.2.0 plyr_1.8.4 backports_1.0.4 evaluate_0.10 [13] httr_1.2.1 rlang_0.1.1 lazyeval_0.2.0 uuid_0.1-2 [17] rstudioapi_0.6 hexbin_1.27.1 Matrix_1.2-7.1 rmarkdown_1.6 [21] labeling_0.3 config_0.2 stringr_1.2.0 foreign_0.8-67 [25] htmlwidgets_0.8 munsell_0.4.3 shiny_1.0.3 broom_0.4.1 [29] janeaustenr_0.1.5 httpuv_1.3.3 gistr_0.3.6 pkgconfig_2.0.1 [33] base64enc_0.1-3 mnormt_1.5-5 htmltools_0.3.5 tibble_1.3.3 [37] codetools_0.2-15 withr_1.0.2 crayon_1.3.2 dbplyr_1.1.0 [41] SnowballC_0.5.1 grid_3.3.2 nlme_3.1-128 jsonlite_1.1 [45] xtable_1.8-2 gtable_0.2.0 DBI_0.7 magrittr_1.5 [49] scales_0.4.1 tokenizers_0.1.4 stringi_1.1.2 pryr_0.1.2 [53] reshape2_1.4.2 IRkernel_0.7.1 RColorBrewer_1.1-2 tools_3.3.2 [57] glue_1.1.1 maps_3.1.1 yaml_2.1.14 parallel_3.3.2 [61] colorspace_1.3-1 pbdZMQ_0.2-4 bindr_0.1