Below, a seed address for your Aerospike database cluster is required
Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH
Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="127.0.0.1"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AS_FEATURE_KEY_PATH = "/etc/aerospike/features.conf"
AEROSPIKE_SPARK_JAR_VERSION="2.5.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
# if you followed the repository README
import findspark
findspark.init()
import h2o
h2o.init()
Checking whether there is an H2O instance running at http://localhost:54321 . connected.
H2O_cluster_uptime: | 24 days 15 hours 18 mins |
H2O_cluster_timezone: | America/Los_Angeles |
H2O_data_parsing_timezone: | UTC |
H2O_cluster_version: | 3.30.1.2 |
H2O_cluster_version_age: | 1 month and 11 days |
H2O_cluster_name: | H2O_from_python_kmatty_mnldpz |
H2O_cluster_total_nodes: | 1 |
H2O_cluster_free_memory: | 3.057 Gb |
H2O_cluster_total_cores: | 16 |
H2O_cluster_allowed_cores: | 16 |
H2O_cluster_status: | locked, healthy |
H2O_connection_url: | http://localhost:54321 |
H2O_connection_proxy: | {"http": null, "https": null} |
H2O_internal_security: | False |
H2O_API_Extensions: | Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4 |
Python_version: | 3.7.5 final |
# Here we download the Aerospike Spark jar
import urllib
import os
def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):
DOWNLOAD_PREFIX="https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/"
DOWNLOAD_SUFFIX="/artifact/jar"
AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX
return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL
def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):
JAR_NAME="aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
if(not(os.path.exists(JAR_NAME))) :
urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)
else :
print(JAR_NAME+" already downloaded")
return os.path.join(os.getcwd(),JAR_NAME)
AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
aerospike-spark-assembly-2.5.0.jar already downloaded
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
from pysparkling import *
Get a spark session object and set required Aerospike configuration properties
Set up spark and point aerospike db to AS_HOST
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
spark.conf.set("aerospike.namespace",AS_NAMESPACE)
spark.conf.set("aerospike.seedhost",AS_CONNECTION_STRING)
spark.conf.set("aerospike.keyPath",AS_FEATURE_KEY_PATH )
h2oContext = H2OContext.getOrCreate()
Connecting to H2O server at http://192.168.1.6:54321 ... successful.
H2O_cluster_uptime: | 22 secs |
H2O_cluster_timezone: | America/Los_Angeles |
H2O_data_parsing_timezone: | UTC |
H2O_cluster_version: | 3.30.1.2 |
H2O_cluster_version_age: | 1 month and 11 days |
H2O_cluster_name: | sparkling-water-kmatty_local-1602784872166 |
H2O_cluster_total_nodes: | 1 |
H2O_cluster_free_memory: | 794 Mb |
H2O_cluster_total_cores: | 16 |
H2O_cluster_allowed_cores: | 16 |
H2O_cluster_status: | locked, healthy |
H2O_connection_url: | http://192.168.1.6:54321 |
H2O_connection_proxy: | null |
H2O_internal_security: | False |
H2O_API_Extensions: | XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4 |
Python_version: | 3.7.5 final |
Sparkling Water Context: * Sparkling Water Version: 3.30.1.2-1-2.4 * H2O name: sparkling-water-kmatty_local-1602784872166 * cluster size: 1 * list of used nodes: (executorId, host, port) ------------------------ (0,192.168.1.6,54321) ------------------------ Open H2O Flow in browser: http://192.168.1.6:54323 (CMD + click in Mac OSX)
# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]
# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T
# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}
age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":10000,"age_salary_correlation":0.7}
age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}
distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]
# Sample age/salary data for each distributions
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)
ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])
print("Data created")
Data created
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []
for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))
# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)
# Convert to a data frame using a schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", DoubleType(), True),
StructField("salary",IntegerType(), True)
])
inputDF=spark.createDataFrame(inputRDD,schema)
#Write the data frame to Aerospike, the id field is used as the primary key
inputDF \
.write \
.mode('overwrite') \
.format("com.aerospike.spark.sql") \
.option("aerospike.set", "salary_data")\
.option("aerospike.updateByKey", "id") \
.save()
# If we explicitly set the schema, using the previously created schema object
# we effectively type the rows in the Data Frame
loadedDFWithSchema=spark \
.read \
.format("com.aerospike.spark.sql") \
.schema(schema) \
.option("aerospike.set", "salary_data").load()
loadedDFWithSchema.show(5)
+---+---------------+-----------------+------+ | id| name| age|salary| +---+---------------+-----------------+------+ |239|Individual: 239|31.83300818606226| 74975| |101|Individual: 101|43.01299505505053| 73747| |194|Individual: 194|40.82834439786344| 63853| | 31|Individual: 031|25.38038331484876| 52375| |139|Individual: 139|47.62537494799876| 80100| +---+---------------+-----------------+------+ only showing top 5 rows
#Save into an H2OFrame using a Key. A key is an entry in the H2O Key value store that maps to an object in H2O.
loadedDFWithSchema.write.format("h2o").option("key", "key_one").save()
#List the current contents of the H2O cluster, you can use the h2o.ls.
h2o.ls()
h2oframe = h2o.get_frame("key_one")
h2oframe.summary()
id | name | age | salary | |
---|---|---|---|---|
type | int | string | real | int |
mins | 1.0 | NaN | 22.405590847347618 | 37748.0 |
mean | 150.5 | NaN | 35.593540086982685 | 67127.00666666667 |
maxs | 300.0 | NaN | 60.312589253321136 | 107261.0 |
sigma | 86.74675786448738 | NaN | 8.788476744518679 | 15177.875046143428 |
zeros | 0 | 0 | 0 | 0 |
missing | 0 | 0 | 0 | 0 |
0 | 239.0 | Individual: 239 | 31.83300818606226 | 74975.0 |
1 | 101.0 | Individual: 101 | 43.01299505505053 | 73747.0 |
2 | 194.0 | Individual: 194 | 40.82834439786344 | 63853.0 |
3 | 31.0 | Individual: 031 | 25.38038331484876 | 52375.0 |
4 | 139.0 | Individual: 139 | 47.62537494799876 | 80100.0 |
5 | 14.0 | Individual: 014 | 25.41226437694945 | 50203.0 |
6 | 142.0 | Individual: 142 | 35.49930947093095 | 66239.0 |
7 | 272.0 | Individual: 272 | 32.59037083790934 | 51935.0 |
8 | 76.0 | Individual: 076 | 25.066279193638437 | 50236.0 |
9 | 147.0 | Individual: 147 | 44.565530108647465 | 77111.0 |