Sys.getenv('R_PROFILE_USER') # Sys.setenv(SPARK_HOME='/usr/hdp/2.4.2.0-258/spark/R/lib/SparkR/profile/shell.R') # commented after being added to .bashrc .libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths())) Sys.getenv('SPARK_HOME') options(warn = -1) # disable warnings c("sparklyr", "dplyr", "knitr", "repr", "tidytext", "tidyr", "purrr", "ggplot2", "viridis", "gridExtra", "rbokeh") lapply(x, require, character.only = TRUE, quietly = TRUE) SPARK_HOME = Sys.getenv('SPARK_HOME') # Initiating spark context: local (for 'low memory' tasks only!) # sc <- spark_connect(master = "local", spark_home = SPARK_HOME) # Initiating spark context: yarn (for loading bigger datasets) sc <- spark_connect(master = "yarn-client", spark_home = SPARK_HOME) # sc <- spark_connect(master = "yarn-client", spark_home = "/usr/hdp/2.4.2.0-258/spark") books <- spark_read_json(sc, name = "books", path = "amazon/reviews_Books_5.json") # very big dataset books %>% mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>% select(asin, helpful, overall, reviewText_trunc, reviewTime, reviewerID, reviewerName, summary, unixReviewTime) %>% head(3) %>% collect() # sdf_schema(books) books %>% mutate(reviewText_trunc = as.character(rpad(reviewText, 20, '...'))) %>% select(reviewText, overall) %>% sdf_schema() %>% print() books %>% count(overall, sort = TRUE) %>% collect() reviews <- books %>% filter(overall == 1 | overall == 5) %>% filter(reviewText != '') reviews %>% mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>% select(overall, reviewText_trunc) %>% head(3) %>% collect() # tbl_cache(sc, reviews, force = TRUE) ## tbl_uncache(sc, books) reviews %>% count() %>% collect() bin_reviews <- reviews %>% ft_binarizer(threshold = 2.5, input.col = 'overall', output.col = 'label') %>% select(reviewText, overall, label) tokenized_reviews <- bin_reviews %>% ft_tokenizer(input.col = 'reviewText', output.col = 'word') tokenized_reviews %>% mutate(reviewText_trunc = as.character(rpad(reviewText, 20, '...'))) %>% select(reviewText_trunc, overall, label, word) %>% head(1) %>% collect() tokenized_reviews %>% mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>% select(reviewText_trunc, overall, label, word) %>% head(3) tokenized_reviews %>% select(word) %>% head(10) %>% collect() %>% mutate(word = lapply(word, as.character)) %>% unnest(word) %>% head(5) spark_write_json(bin_reviews, "amazon/bin_reviews.json", mode = NULL, options = list()) options(repr.plot.width=12, repr.plot.height=5) unnested_reviews <- spark_read_json(sc, name = "unnested_reviews_json", path = "amazon/unnested_reviews_json") %>% filter(length(word) > 2) %>% mutate(word2 = regexp_replace(word, "[^a-zA-Z0-9]+", "")) %>% group_by(label) %>% count(word2, sort = TRUE) positive_reviews <- unnested_reviews %>% filter(label=="1") %>% arrange(desc(n)) %>% head(10) %>% collect() negative_reviews <- unnested_reviews %>% filter(label=="0") %>% arrange(desc(n)) %>% head(10) %>% collect() total_reviews <- positive_reviews %>% bind_rows(negative_reviews) %>% arrange(desc(n)) total_reviews$label <- factor(total_reviews$label, labels = c("negative", "positive")) g <- total_reviews %>% ggplot(aes(x=reorder(word2, -n))) + xlab("words in reviews") + ylab("count") + theme_bw() g + geom_col(aes(y=n, fill = label)) + scale_fill_manual(values=c("#74d130", "#274a7c")) g + geom_col(aes(y=n, fill = n)) + scale_fill_viridis() + facet_grid(. ~ label) p1 <- positive_reviews %>% figure() %>% ly_bar(x = word2, y = n, hover = TRUE, color = NULL) %>% x_axis(label = "positive words") %>% y_axis(label = "count") p2 <- negative_reviews %>% figure() %>% ly_bar(x = word2, y = n, hover = TRUE) %>% x_axis(label = "negative words") %>% y_axis(label = "count") grid_plot(list(p2, p1), same_axes = TRUE, width=900, height=350) partitions <- unnested_reviews %>% sdf_partition(trainingData = 0.8, testData = 0.2) partitions$trainingData %>% head(3) %>% collect partitions$testData %>% head(3) %>% collect() .libPaths() Sys.getenv("R_HOME") sessionInfo()