%%groovy import com.twosigma.beakerx.widget.IntSlider sample_size = new IntSlider() sample_size.min = 100 sample_size.max = 10000 sample_size.step = 100 sample_size.value = 5000 sample_size %%groovy beakerx.sample_size = sample_size.value beakerx.sample_size %%python from beakerx.object import beakerx beakerx.sample_size import sys.process._ val runme = "python generate_example.py somedata " + beakerx.sample_size val exitCode = runme ! %classpath add jar /mnt/mesos/sandbox %classpath add jar /opt/spark/jars/* %classpath add jar /opt/hadoop/share/hadoop/common/* %classpath add jar /opt/hadoop/share/hadoop/common/lib/* %classpath add jar /opt/hadoop/share/hadoop/hdfs/* %classpath add jar /opt/hadoop/share/hadoop/hdfs/lib/* %classpath add jar /opt/hadoop/share/hadoop/yarn/* %classpath add jar /opt/hadoop/share/hadoop/yarn/lib/* %classpath add jar /opt/hadoop/share/hadoop/mapreduce/* %classpath add jar /opt/hadoop/share/hadoop/mapreduce/lib/* %classpath add jar /opt/hadoop/share/hadoop/tools/lib/* %%spark import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import org.apache.spark.ml.regression.GBTRegressor import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType} import org.apache.spark.sql.{Encoders, SparkSession} import org.apache.spark.ml.classification.LogisticRegression import scala.math.{ pow, sqrt } // Create a schema val schemaStruct = StructType( StructField("x1", DoubleType) :: StructField("x2", DoubleType) :: StructField("y", IntegerType) :: Nil ) // Use schema to read data from HDFS using default url val df = spark.read .option("header", true) .schema(schemaStruct) .csv("hdfs://name-0-node.hdfs.autoip.dcos.thisdcos.directory:9001/somedata.csv") .na.drop() df.display() // Break data into training and test val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2)) val labelColumn = "y" // Use the VectorAssembler to combine the x1, x2 columns into a single vector column val assembler = new VectorAssembler() .setInputCols(Array("x1", "x2")) .setOutputCol("features") // logistic regression model val lr = new LogisticRegression() .setLabelCol(labelColumn) .setFeaturesCol("features") .setPredictionCol(labelColumn + "_pred") .setMaxIter(100) .setRegParam(0.1) .setElasticNetParam(0.8) // Run simple pipeline val stages = Array(assembler, lr) val pipeline = new Pipeline().setStages(stages) val model = pipeline.fit(trainingData) // We'll make predictions using the model and the test data and get accuracy val predictions = model.transform(testData) val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble // Create function to remove dimensionality of a list def flatten(l: List[Any]): List[Any] = { def _flatten(res: List[Any], rem: List[Any]):List[Any] = rem match { case Nil => res case (h:List[_])::Nil => _flatten(res, h) case (h:List[_])::tail => _flatten(res:::h, tail) case h::tail => _flatten(res:::List(h), tail) } _flatten(List(), l) } // Split positive and negative cases val dfPOS = df.filter("y == 1") val dfNEG = df.filter("y == 0") // Plot the first 300 records of positve and negative cases val plotPOSx1 = flatten(dfPOS.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble) val plotPOSx2 = flatten(dfPOS.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble) val plotNEGx1 = flatten(dfNEG.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble) val plotNEGx2 = flatten(dfNEG.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble) println(" ") val plot = new Plot { title = "Plot of raw data" } val list = List( new Points { x = plotPOSx1.take(300) y = plotPOSx2.take(300) size = 5.0 color = Color.red }, new Points { x = plotNEGx1.take(300) y = plotNEGx2.take(300) size = 5.0 color = Color.blue } ) plot.add(list) // Transform data and replot val ax1 = plotPOSx1.take(300).map(pow(_,2)) val ax2 = plotPOSx2.take(300).map(pow(_,2)) val bx1 = plotNEGx1.take(300).map(pow(_,2)) val bx2 = plotNEGx2.take(300).map(pow(_,2)) val plot = new Plot { title = "Plot of data with features squared" } val list = List( new Points { x = ax1 y = ax2 size = 5.0 color = Color.red }, new Points { x = bx1 y = bx2 size = 5.0 color = Color.blue } ) plot.add(list) // w1 = x1^2, w2 = x2^2 val dfT = df.withColumn("w1", ((df.col("x1")*df.col("x1")))).withColumn("w2", ((df.col("x2")*df.col("x2")))) val Array(trainingData, testData) = dfT.randomSplit(Array(0.8, 0.2)) val labelColumn = "y" // Use the VectorAssembler to combine the w1, w2 columns into a single vector column val assembler = new VectorAssembler() .setInputCols(Array("w1", "w2")) .setOutputCol("features") // logistic regression model used above val lr = new LogisticRegression() .setLabelCol(labelColumn) .setFeaturesCol("features") .setPredictionCol(labelColumn + "_pred") .setMaxIter(100) .setRegParam(0.1) .setElasticNetParam(0.8) // Same pipeline as above val stages = Array(assembler, lr) val pipeline = new Pipeline().setStages(stages) val model = pipeline.fit(trainingData) // We'll make predictions using the model and the test data val predictions = model.transform(testData) val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble val evalData = predictions.select("y", "y_pred").head(predictions.count().toInt) beakerx.evalData = evalData println("Data moved to BeakerX") %%python from beakerx.object import beakerx import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.metrics import confusion_matrix, classification_report dfEval = pd.DataFrame(' '.join(beakerx.evalData).replace('[','').replace(']','').split())[0].str.split(',', expand = True) y_true = pd.to_numeric(dfEval[0], downcast='integer') y_pred = pd.to_numeric(dfEval[1], downcast='integer') # Print a text report of precision, recall, f1-score, support print(classification_report(y_true, y_pred)) # Plot the confusion matrix cm = confusion_matrix(y_true, y_pred) ax = plt.subplot() sns.heatmap(cm, annot=True, ax=ax, fmt='g') ax.set_xlabel("Predicted labels") ax.set_ylabel("True labels") ax.set_title("Confusion Matrix") ax