import java.sql.{Date,Timestamp} spark.version import org.apache.spark.sql.functions.to_date val df = Seq( ("notebook","2019-01-01"), ("notebook", "2019-01-10"), ("small_phone", "2019-01-15"), ("small_phone", "2019-01-30") ).toDF("device", "purchase_date").sort("device","purchase_date") df.dtypes df.withColumn("purchase_date",to_date($"purchase_date")).dtypes import org.apache.spark.sql.functions.to_date val df = Seq( ("notebook","27/12/2019"), ("notebook", "01/12/2019"), ("small_phone", "23/01/2019"), ("small_phone", "27/12/2019") ).toDF("device", "purchase_date").sort("device","purchase_date") %%dataframe df %%dataframe df.withColumn("purchase_date",to_date($"purchase_date", "dd/MM/yyyy")) val df = Seq( ("notebook","2019-01-01 00:00:00"), ("notebook", "2019-01-10 13:00:00"), ("small_phone", "2019-01-15 12:00:00"), ("small_phone", "2019-01-30 09:30:00") ).toDF("device", "purchase_time").sort("device","purchase_time") df.dtypes import org.apache.spark.sql.functions.to_timestamp df.withColumn("purchase_time",to_timestamp($"purchase_time")).dtypes val df = Seq( ("notebook","27/12/2019 12:00"), ("notebook", "01/12/2019 00:00"), ("small_phone", "23/01/2019 12:00"), ("small_phone", "27/12/2019 12:00") ).toDF("device", "purchase_time").sort("device","purchase_time") df.dtypes %%dataframe df.withColumn("purchase_time",to_timestamp($"purchase_time")) %%dataframe df.withColumn("purchase_time",to_timestamp($"purchase_time","d/M/y H:m")) val df = Seq( ("notebook",Timestamp.valueOf("2019-01-29 12:00:00")), ("notebook", Timestamp.valueOf("2019-01-01 00:00:00")), ("small_phone", Timestamp.valueOf("2019-01-15 23:00:00")), ("small_phone", Timestamp.valueOf("2019-01-01 09:00:00")) ).toDF("device", "purchase_time").sort("device","purchase_time") df.dtypes %%dataframe df.withColumn("purchase_date",to_date($"purchase_time")) import java.sql.Date import org.apache.spark.sql.functions.to_timestamp val df = Seq( ("notebook",Date.valueOf("2019-01-29")), ("notebook", Date.valueOf("2019-01-01")), ("small_phone", Date.valueOf("2019-01-15")), ("small_phone", Date.valueOf("2019-01-01")) ).toDF("device", "purchase_date").sort("device","purchase_date") %%dataframe df %%dataframe df.withColumn("purchase_time",to_timestamp($"purchase_date")) import java.sql.Timestamp import org.apache.spark.sql.functions.date_format val df = Seq( ("notebook",Timestamp.valueOf("2019-01-29 12:00:00")), ("notebook", Timestamp.valueOf("2019-01-01 00:00:00")), ("small_phone", Timestamp.valueOf("2019-01-15 23:00:00")), ("small_phone", Timestamp.valueOf("2019-01-01 09:00:00")) ).toDF("device", "purchase_time").sort("device","purchase_time") %%dataframe df.withColumn("formatted_purchase_time",date_format($"purchase_time","y-MM")) import org.apache.spark.sql.functions.current_timestamp val df = Seq( ("foo"), ("bar"), ("baz") ).toDF("col1") %%dataframe %%scan df .withColumn("now", current_timestamp) import org.apache.spark.sql.functions.current_date val df = Seq( ("foo"), ("bar"), ("baz") ).toDF("col1") %%dataframe %%scan df .withColumn("today", current_date) import org.apache.spark.sql.functions.hour val df = Seq( ("foo", "2019-01-01 01:00:00.000"), ("bar", "2019-01-01 12:30:00.000"), ("baz", "2019-01-01 23:01:00.000") ).toDF("col1", "some_timestamp") %%dataframe df %%dataframe %%scan df .withColumn("hour", hour($"some_timestamp")) import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.functions._ // dummy dataframe for testing val df = Seq( ("2018-12-28"), ("2019-01-01"), ("2019-01-04") ).toDF("source_date") %%dataframe df // turn a day name (like "Wed") to its position on the week (e.g. 3) def dayNameToIndex(col: Column) : Column = { when(col.isNull, null) .when(col === "Sun", 0) .when(col === "Mon", 1) .when(col === "Tue", 2) .when(col === "Wed", 3) .when(col === "Thu", 4) .when(col === "Fri", 5) .when(col === "Sat", 7) } %%dataframe // need to use expr because the number of days to subtract is a column value df .withColumn("day_index", dayNameToIndex(date_format(col("source_date"), "E"))) .withColumn("week_start", expr("date_sub(source_date, day_index)")) .drop("day_index")