In [1]:
import coursier._

interp.repositories() ++= Seq(MavenRepository("https://repo1.maven.org/maven2"))
Out[1]:
import coursier._

In [2]:
import $ivy.`org.apache.spark::spark-sql:2.3.1`
import $ivy.`sh.almond::almond-spark:0.4.0`
import $ivy.`com.github.julien-truffaut::monocle-core:1.5.0`
import $ivy.`com.github.julien-truffaut::monocle-macro:1.5.0`

import $ivy.`org.hablapps::spark-optics:0.1.0`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
Out[2]:
import $ivy.$                                  

import $ivy.$                              

import $ivy.$                                               

import $ivy.$                                                


import $ivy.$                                             


import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.functions._

import org.apache.spark.sql._
In [3]:
val sparkSession = NotebookSparkSession.builder().master("local").appName("jupiter").getOrCreate()
sparkSession.sparkContext.setLogLevel("ERROR")
Loading spark-stubs
Getting spark JARs
Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Out[3]:
sparkSession: SparkSession = [email protected]
In [4]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Street(number: Int, name: String)
case class Address(city: String, street: Street)
case class Company(name: String, address: Address)
case class Employee(name: String, company: Company)
Out[4]:
defined class Street
defined class Address
defined class Company
defined class Employee

Spark lenses Spark has a columnar format, the columns can be of any basic sql type, integers, floats, strings, timestamps, dates. Also spark allow us to use complex structures, as structs what would be a product in ADT. Also arrays and maps are consider complex types. In our case we are going to focus in structs only, and to make it easier, first we are going to create a case class that will be our default structure. Due to issues of creating case classes in jupyter, we already have them precompiled in the project. And they follow the following code: case class Street(number: Int, name: String) case class Address(city: String, street: Street) case class Company(name: String, address: Address) case class Employee(name: String, company: Company)

In [5]:
//import org.habla.sparklens.{Employee,Company,Address,Street}
val employee = Employee("john", Company("awesome inc", Address("london", Street(23, "high street"))))
Out[5]:
employee: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "high street")))
)
In [6]:
import sparkSession.implicits._
val df = List(employee).toDS.toDF
Out[6]:
import sparkSession.implicits._

df: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
In [7]:
df.show
df.printSchema
+----+--------------------+
|name|             company|
+----+--------------------+
|john|[awesome inc, [lo...|
+----+--------------------+

root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- number: integer (nullable = false)
 |    |    |    |-- name: string (nullable = true)

As you can see, now we have a dataframe representation of the employee: The name is a string element, and the company is a struct, that also have complex types inside. Due to the sql oriented api of the dataset api, its hard to modify a single element, keeping the structure the same, eaven for the first level data.

In [8]:
val employeeNameChanged = df.select(concat(df("name"),lit("!!!")).as("name"),df("company"))
employeeNameChanged.show
employeeNameChanged.printSchema
+-------+--------------------+
|   name|             company|
+-------+--------------------+
|john!!!|[awesome inc, [lo...|
+-------+--------------------+

root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- number: integer (nullable = false)
 |    |    |    |-- name: string (nullable = true)

Out[8]:
employeeNameChanged: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]

And for the structs? Let's try to change the name of the company.

In [9]:
val companyNameChanged = df.select(
    df("name"),
    struct(
        concat(df("company.name"),lit("!!!")).as("name"),
        df("company.address")
    ).as("company")
)
companyNameChanged.show
companyNameChanged.printSchema
+----+--------------------+
|name|             company|
+----+--------------------+
|john|[awesome inc!!!, ...|
+----+--------------------+

root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- number: integer (nullable = false)
 |    |    |    |-- name: string (nullable = true)

Out[9]:
companyNameChanged: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]

OMG!! 😱😱😱 We have to keep track of the name of the transformed element, and also for all the parents!!! But if we had our case classes, how come we do this?

In [10]:
employee.copy(name = employee.name+"!!!")
employee.copy(company = employee.company.copy(name = employee.company.name+"!!!"))
Out[10]:
res9_0: Employee = Employee(
  "john!!!",
  Company("awesome inc", Address("london", Street(23, "high street")))
)
res9_1: Employee = Employee(
  "john",
  Company("awesome inc!!!", Address("london", Street(23, "high street")))
)

Well, it's sorter, but it's still a pain in the back. Luckly we have optics :D

In [11]:
import monocle.Lens
import monocle.macros.GenLens

val company   : Lens[Employee, Company] = GenLens[Employee](_.company)
val address   : Lens[Company , Address] = GenLens[Company](_.address)
val street    : Lens[Address , Street]  = GenLens[Address](_.street)
val streetName: Lens[Street  , String]  = GenLens[Street](_.name)

val employeeStreet = company composeLens address composeLens street composeLens streetName
Out[11]:
import monocle.Lens

import monocle.macros.GenLens


company: Lens[Employee, Company] = [email protected]
address: Lens[Company, Address] = [email protected]8eb
street: Lens[Address, Street] = [email protected]
streetName: Lens[Street, String] = [email protected]
employeeStreet: monocle.PLens[Employee, Employee, String, String] = [email protected]
In [12]:
val streetChanger:Employee => Employee = employeeStreet.modify(_ + "!!!")
streetChanger(employee)
Out[12]:
streetChanger: Employee => Employee = <function1>
res11_1: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "high street!!!")))
)

That easy? Wish there was something like this in spark...

In [15]:
import org.hablapps.sparkOptics.Lens
import org.hablapps.sparkOptics.syntax._
val lens = Lens("company.address.street.name")(df.schema)
val transformedDF = df.select(lens.modify(concat(_,lit("!!!"))):_*)
transformedDF.printSchema
transformedDF.as[Employee].head
root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = false)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- number: integer (nullable = true)
 |    |    |    |-- name: string (nullable = true)

Out[15]:
import org.hablapps.sparkOptics.Lens

import org.hablapps.sparkOptics.syntax._

lens: Lens = Lens(company.address.street.name)
transformedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
res14_5: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "high street!!!")))
)

Hold on, explain me that. Start from the begin, make it like monocle. Ok, lets create our first lens.

In [16]:
import org.hablapps.sparkOptics.ProtoLens.ProtoLens

val companyProtoLens: ProtoLens = Lens("company")
//the name of the column, similar to the "_.company" of "GenLens[Employee](_.company),
//this is the element that we will focus in the structure
val companyLens: Lens = companyProtoLens(df.schema) 
//providing the schema, it's similar to the "Employee" of "GenLens[Employee](_.company)
//this is the context of the lens
Out[16]:
import org.hablapps.sparkOptics.ProtoLens.ProtoLens


companyProtoLens: types.StructType => Lens = <function1>
companyLens: Lens = Lens(company)

First difference with monocle and sparkOptics, monocle, due to the hard typed languaje of scala, it returns compiling errors if you try to do a GenLens[Employee](_.unknownField). But spark sql is a dynamic typed, but lenses helps you to make your transformations a little bit more safe.

In [17]:
import scala.util.Try
val unknownFieldLens:ProtoLens = Lens("unknownField")
Try{unknownFieldLens(df.schema)}
Out[17]:
import scala.util.Try

unknownFieldLens: types.StructType => Lens = <function1>
res16_2: Try[Lens] = Failure(
  java.lang.AssertionError: assertion failed: the column unknownField not found in [name,company]
)

It's not a compile error, but it's something! You can create a ProtoLens (a lens only with the column name defined) and when you try to generate a Lens, it gives you an error, you can't create invalid lenses! But lets see how we can compose new lenses.

In [18]:
import org.apache.spark.sql.types.StructType
val companyL: Lens = Lens("company")(df.schema)
val companySchema = df.schema.fields.find(_.name == "company").get.dataType.asInstanceOf[StructType]
val addressL = Lens("address")(companySchema)
val addressSchema = companySchema.fields.find(_.name == "address").get.dataType.asInstanceOf[StructType]
val streetL = Lens("street")(addressSchema)
val streetSchema = addressSchema.fields.find(_.name == "street").get.dataType.asInstanceOf[StructType]
val streetNameL = Lens("name")(streetSchema)
val employeeCompanyStreetName = companyL composeLens addressL composeLens streetL composeLens streetNameL
val modifiedDF = df.select(employeeCompanyStreetName.set(lit("new street name")):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = false)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- number: integer (nullable = true)
 |    |    |    |-- name: string (nullable = false)

Out[18]:
import org.apache.spark.sql.types.StructType

companyL: Lens = Lens(company)
companySchema: StructType = StructType(
  StructField("name", StringType, true, {}),
  StructField(
    "address",
    StructType(
      StructField("city", StringType, true, {}),
      StructField(
        "street",
        StructType(
          StructField("number", IntegerType, false, {}),
          StructField("name", StringType, true, {})
        ),
        true,
        {}
      )
    ),
    true,
    {}
  )
)
addressL: Lens = Lens(address)
addressSchema: StructType = StructType(
  StructField("city", StringType, true, {}),
  StructField(
    "street",
    StructType(
      StructField("number", IntegerType, false, {}),
      StructField("name", StringType, true, {})
    ),
    true,
    {}
  )
)
streetL: Lens = Lens(street)
streetSchema: StructType = StructType(
  StructField("number", IntegerType, false, {}),
  StructField("name", StringType, true, {})
)
streetNameL: Lens = Lens(name)
employeeCompanyStreetName: Lens = Lens(company.address.street.name)
modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
res17_11: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "new street name")))
)

Too much code still, passing all the time the schema of each element... In spark the schemas are recursive, they not only have the schema of the level, also of all the sub elements. So we can take advance of the ProtoLenses.

In [19]:
val shorterLens = 
Lens("company")(df.schema) composeProtoLens Lens("address") composeProtoLens Lens("street") composeProtoLens Lens("name") 
val modifiedDF = df.select(shorterLens.modify(upper):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = false)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- number: integer (nullable = true)
 |    |    |    |-- name: string (nullable = true)

Out[19]:
shorterLens: Lens = Lens(company.address.street.name)
modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
res18_3: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "HIGH STREET")))
)

We have created first a Lens, and then compose them with ProtoLenses, in the composition the lens will extract the schema of the selected element for you, checking if it exist. Still too much code? You can compose with a syntax closer to spark.

In [20]:
val flashLens = Lens("company.address.street.name")(df.schema)
val modifiedDF = df.select(flashLens.modify(upper):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = false)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- number: integer (nullable = true)
 |    |    |    |-- name: string (nullable = true)

Out[20]:
flashLens: Lens = Lens(company.address.street.name)
modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
res19_3: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "HIGH STREET")))
)

Whant to see how much code whould have been that example?

In [21]:
val mDF = df.select(df("name"),struct(
  df("company.name").as("name"),
  struct(
    df("company.address.city").as("city"),
    struct(
      df("company.address.street.number").as("number"),
      upper(df("company.address.street.name")).as("name")
    ).as("street")
  ).as("address")
).as("company"))
mDF.printSchema
val longCodeEmployee = mDF.as[Employee].head
longCodeEmployee == modifiedDF.as[Employee].head
root
 |-- name: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- address: struct (nullable = false)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- number: integer (nullable = true)
 |    |    |    |-- name: string (nullable = true)

Out[21]:
mDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
longCodeEmployee: Employee = Employee(
  "john",
  Company("awesome inc", Address("london", Street(23, "HIGH STREET")))
)
res20_3: Boolean = true

This is only for a 4 levels depth structure, and each level only 2 elements, imagine for a larger structure. 😱

Why use this utilities? Why not datasets? Datasets it's a great api, but it has the problem that it can only work with well defined case classes, and can't work with interfaces. So when you need to abstract yourself, you only have the dataframe api. Using protolens, you can interact with common elements of different dataframes, making simple, reusable and clear code. All your topics from kafka share common metadata fields? create lenses for them.

In [22]:
df.select(flashLens.prune(Vector.empty):_*).schema
Out[22]:
res21: StructType = StructType(
  StructField("name", StringType, true, {}),
  StructField(
    "company",
    StructType(
      StructField("name", StringType, true, {}),
      StructField(
        "address",
        StructType(
          StructField("city", StringType, true, {}),
          StructField(
            "street",
            StructType(StructField("number", IntegerType, true, {})),
            false,
            {}
          )
        ),
        false,
        {}
      )
    ),
    false,
    {}
  )
)
In [23]:
df.select(flashLens.rename("newName"):_*).schema
Out[23]:
res22: StructType = StructType(
  StructField("name", StringType, true, {}),
  StructField(
    "company",
    StructType(
      StructField("name", StringType, true, {}),
      StructField(
        "address",
        StructType(
          StructField("city", StringType, true, {}),
          StructField(
            "street",
            StructType(
              StructField("number", IntegerType, true, {}),
              StructField("newName", StringType, true, {})
            ),
            false,
            {}
          )
        ),
        false,
        {}
      )
    ),
    false,
    {}
  )
)
In [34]:
flashLens.modifyDF(c => concat(c,c))(df).select(flashLens.get).as[String].head
Out[34]:
res33: String = "high streethigh street"
In [36]:
flashLens.modifyDF(c => concat(c,c))(df).schema
Out[36]:
res35: StructType = StructType(
  StructField("name", StringType, true, {}),
  StructField(
    "company",
    StructType(
      StructField("name", StringType, true, {}),
      StructField(
        "address",
        StructType(
          StructField("city", StringType, true, {}),
          StructField(
            "street",
            StructType(
              StructField("number", IntegerType, true, {}),
              StructField("name", StringType, true, {})
            ),
            false,
            {}
          )
        ),
        false,
        {}
      )
    ),
    false,
    {}
  )
)
In [ ]:

In [ ]:

In [ ]:

In [ ]:

In [ ]:

In [ ]:

In [ ]:

In [ ]:

In [ ]: