Aerospike Connect for Spark - H2O Tutorial for Python

Tested with Java 8, Spark 2.4.0, H2O 3.30.1.2, h2o_pysparkling_2.4, Python 3.7, and Aerospike Spark Connector 2.5

Setup

Below, a seed address for your Aerospike database cluster is required

Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH

Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct

In [4]:
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="127.0.0.1"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test" 
AS_FEATURE_KEY_PATH = "/etc/aerospike/features.conf"
AEROSPIKE_SPARK_JAR_VERSION="2.5.0"

AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
In [5]:
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set 
# if you followed the repository README

import findspark
findspark.init()
In [1]:
import h2o
h2o.init()
Checking whether there is an H2O instance running at http://localhost:54321 . connected.
H2O_cluster_uptime: 24 days 15 hours 18 mins
H2O_cluster_timezone: America/Los_Angeles
H2O_data_parsing_timezone: UTC
H2O_cluster_version: 3.30.1.2
H2O_cluster_version_age: 1 month and 11 days
H2O_cluster_name: H2O_from_python_kmatty_mnldpz
H2O_cluster_total_nodes: 1
H2O_cluster_free_memory: 3.057 Gb
H2O_cluster_total_cores: 16
H2O_cluster_allowed_cores: 16
H2O_cluster_status: locked, healthy
H2O_connection_url: http://localhost:54321
H2O_connection_proxy: {"http": null, "https": null}
H2O_internal_security: False
H2O_API_Extensions: Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4
Python_version: 3.7.5 final
In [6]:
# Here we download the Aerospike Spark jar
import urllib
import os

def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):
    DOWNLOAD_PREFIX="https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/"
    DOWNLOAD_SUFFIX="/artifact/jar"
    AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX
    return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL

def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):
    JAR_NAME="aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
    if(not(os.path.exists(JAR_NAME))) :
        urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)
    else :
        print(JAR_NAME+" already downloaded")
    return os.path.join(os.getcwd(),JAR_NAME)

AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
aerospike-spark-assembly-2.5.0.jar already downloaded
In [10]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
from pysparkling import *

Get a spark session object and set required Aerospike configuration properties

Set up spark and point aerospike db to AS_HOST

In [11]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
spark.conf.set("aerospike.namespace",AS_NAMESPACE)
spark.conf.set("aerospike.seedhost",AS_CONNECTION_STRING)
spark.conf.set("aerospike.keyPath",AS_FEATURE_KEY_PATH )
In [12]:
h2oContext = H2OContext.getOrCreate()
Connecting to H2O server at http://192.168.1.6:54321 ... successful.
H2O_cluster_uptime: 22 secs
H2O_cluster_timezone: America/Los_Angeles
H2O_data_parsing_timezone: UTC
H2O_cluster_version: 3.30.1.2
H2O_cluster_version_age: 1 month and 11 days
H2O_cluster_name: sparkling-water-kmatty_local-1602784872166
H2O_cluster_total_nodes: 1
H2O_cluster_free_memory: 794 Mb
H2O_cluster_total_cores: 16
H2O_cluster_allowed_cores: 16
H2O_cluster_status: locked, healthy
H2O_connection_url: http://192.168.1.6:54321
H2O_connection_proxy: null
H2O_internal_security: False
H2O_API_Extensions: XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4
Python_version: 3.7.5 final
Sparkling Water Context:
 * Sparkling Water Version: 3.30.1.2-1-2.4
 * H2O name: sparkling-water-kmatty_local-1602784872166
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (0,192.168.1.6,54321)
  ------------------------

  Open H2O Flow in browser: http://192.168.1.6:54323 (CMD + click in Mac OSX)

    

Create Sample Data and load it into Aerospike

In [13]:
# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math

# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
    return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2], 
           [correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]

# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
    mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
    cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
                            distribution_params["age_salary_correlation"])
    return np.random.multivariate_normal(mean, cov, sample_size).T

# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
                             "age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}

age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
                             "age_std_dev":4,"salary_std_dev":10000,"age_salary_correlation":0.7}

age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
                             "age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}

distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]

# Sample age/salary data for each distributions
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)

ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])

print("Data created")
Data created
In [14]:
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []

for  i in range(0, len(ages)) :
     id = i + 1 # Avoid counting from zero
     name = "Individual: {:03d}".format(id)
     # Note we need to make sure values are typed correctly
     # salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
     age = float(ages[i])
     salary = int(salaries[i])
     inputBuf.append((id, name,age,salary))

# Convert to an RDD 
inputRDD = spark.sparkContext.parallelize(inputBuf)
       
# Convert to a data frame using a schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", DoubleType(), True),
    StructField("salary",IntegerType(), True)
])

inputDF=spark.createDataFrame(inputRDD,schema)

#Write the data frame to Aerospike, the id field is used as the primary key
inputDF \
.write \
.mode('overwrite') \
.format("com.aerospike.spark.sql")  \
.option("aerospike.set", "salary_data")\
.option("aerospike.updateByKey", "id") \
.save()

Step 1: Load data into a DataFrame using user specified schema

In [15]:
# If we explicitly set the schema, using the previously created schema object
# we effectively type the rows in the Data Frame

loadedDFWithSchema=spark \
.read \
.format("com.aerospike.spark.sql") \
.schema(schema) \
.option("aerospike.set", "salary_data").load()

loadedDFWithSchema.show(5)
+---+---------------+-----------------+------+
| id|           name|              age|salary|
+---+---------------+-----------------+------+
|239|Individual: 239|31.83300818606226| 74975|
|101|Individual: 101|43.01299505505053| 73747|
|194|Individual: 194|40.82834439786344| 63853|
| 31|Individual: 031|25.38038331484876| 52375|
|139|Individual: 139|47.62537494799876| 80100|
+---+---------------+-----------------+------+
only showing top 5 rows

Step 2: Load Data from Spark DataFrame into H2OFrame

In [ ]:
#Save into an H2OFrame using a Key. A key is an entry in the H2O Key value store that maps to an object in H2O.
loadedDFWithSchema.write.format("h2o").option("key", "key_one").save()
In [20]:
#List the current contents of the H2O cluster, you can use the h2o.ls.
h2o.ls()

h2oframe = h2o.get_frame("key_one")

Step 3: Create a model using H2O libraries

In [21]:
h2oframe.summary()
id name age salary
type int string real int
mins 1.0 NaN 22.40559084734761837748.0
mean 150.5 NaN 35.59354008698268567127.00666666667
maxs 300.0 NaN 60.312589253321136107261.0
sigma 86.74675786448738NaN 8.788476744518679 15177.875046143428
zeros 0 0 0 0
missing0 0 0 0
0 239.0 Individual: 23931.83300818606226 74975.0
1 101.0 Individual: 10143.01299505505053 73747.0
2 194.0 Individual: 19440.82834439786344 63853.0
3 31.0 Individual: 03125.38038331484876 52375.0
4 139.0 Individual: 13947.62537494799876 80100.0
5 14.0 Individual: 01425.41226437694945 50203.0
6 142.0 Individual: 14235.49930947093095 66239.0
7 272.0 Individual: 27232.59037083790934 51935.0
8 76.0 Individual: 07625.06627919363843750236.0
9 147.0 Individual: 14744.56553010864746577111.0
In [ ]: