This notebook requires that Aerospike Database is running.
!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
# IP Address or DNS name for one host in your Aerospike cluster
import socket
AS_HOST =socket.gethostname()
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Aerospike Spark Connector settings when running in Binder
import os
# PATH to aerospike spark jar file (If running locally, set path local aerospike client jar. SPARK_HOME is not needed)
SPARK_HOME = "/opt/spark-nb/spark-dir-link"
AEROSPIKE_JAR_PATH ="/opt/spark-nb/aerospike-jar-link"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Please follow the instructions below instead of the setup above if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container.
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST = "<seed-host-ip>"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "<namespace>"
AEROSPIKE_SPARK_JAR_VERSION="<spark-connector-version>"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Set SPARK_HOME path.
SPARK_HOME = '<spark-home-dir>'
# Please download the appropriate Aeropsike Connect for Spark from the [download page](https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html)
# Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary
import os
AEROSPIKE_JAR_PATH= "<aerospike-jar-dir>/aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
import findspark
# findspark.init()
# Initialize findspark with SPARK_HOME path only when running in binder. If running locally, make sure virtualenv with findspark is activated.
findspark.init(SPARK_HOME)
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),
("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf.setMaster("local[1]"))
spark = SparkSession(sc)
spark
# sqlContext = SQLContext(sc)
Aerospike is schemaless, however Spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types.
To infer schema, the connector samples a set of records (configurable through aerospike.schema.scan
) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records.
Note that __key
was not part of provided schema. So how can one query using __key
? We can just add __key
in provided schema with appropriate type. Similarly we can add __gen
or __ttl
etc.
schemaWithPK = StructType([
StructField("__key",IntegerType(), False),
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("age", IntegerType(), False),
StructField("salary",IntegerType(), False)])
We recommend that you provide schema for queries that involve collection data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.
Spark assumes that the underlying data store (Aerospike in this case) follows a strict schema for all the records within a table. However, Aerospike is a No-SQL DB and is schemaless. For further information on the Spark connector reconciles those differences, visit Flexible schema page
import random
num_records=200
schema = StructType(
[
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
]
)
inputBuf = []
for i in range(1, num_records) :
name = "name" + str(i)
id_ = i
inputBuf.append((id_, name))
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF=spark.createDataFrame(inputRDD,schema)
#Write the Sample Data to Aerospike
inputDF \
.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "py_input_data")\
.option("aerospike.updateByKey", "id") \
.save()
If none of the column types in the user-specified schema match the bin types of a record in Aerospike, a record with NULLs is returned in the result set.
Please use the filter() in Spark to filter out NULL records. For e.g. df.filter("gender == NULL").show(false), where df is a dataframe and gender is a field that was not specified in the user-specified schema.
If the above mismatch is limited to fewer columns in the user-specified schema then NULL would be returned for those columns in the result set. Note: there is no way to tell apart a NULL due to missing value in the original data set and the NULL due to mismatch, at this point. Hence, the user would have to treat all NULLs as missing values. The columns that are not a part of the schema will be automatically filtered out in the result set by the connector.
Please note that if any field is set to NOT nullable i.e. nullable = false, your query will error out if there’s a type mismatch between an Aerospike bin and the column type specified in the user-specified schema.
schemaIncorrect = StructType(
[
StructField("id", IntegerType(), True),
StructField("name", IntegerType(), True) ##Note incorrect type of name bin
]
)
flexSchemaInference=spark \
.read \
.format("aerospike") \
.schema(schemaIncorrect) \
.option("aerospike.set", "py_input_data").load()
flexSchemaInference.show(5)
##notice all the contents of name column is null due to schema mismatch and aerospike.schema.flexible = true (by default)
If a mismatch between the user-specified schema and the schema of a record in Aerospike is detected at the bin/column level, your query will error out.
#When strict matching is set, we will get an exception due to type mismatch with schema provided.
try:
errorDFStrictSchemaInference=spark \
.read \
.format("aerospike") \
.schema(schemaIncorrect) \
.option("aerospike.schema.flexible" ,"false") \
.option("aerospike.set", "py_input_data").load()
errorDFStrictSchemaInference.show(5)
except Exception as e:
pass
#This will throw error due to type mismatch
# We create age vs salary data, using three different Gaussian distributions
!pip install matplotlib
!pip install numpy
!pip install pandas
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
# Make sure we get the same results every time this workbook is run
# Otherwise we are occasionally exposed to results not working out as expected
np.random.seed(12345)
# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]
# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T
# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}
age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":8000,"age_salary_correlation":0.7}
age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}
distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]
# Sample age/salary data for each distributions
sample_size_1 = 100;
sample_size_2 = 120;
sample_size_3 = 80;
sample_sizes = [sample_size_1,sample_size_2,sample_size_3]
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=sample_size_1)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=sample_size_2)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=sample_size_3)
ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])
print("Data created")
# Plot the sample data
group_1_colour, group_2_colour, group_3_colour ='red','blue', 'pink'
plt.xlabel('Age',fontsize=10)
plt.ylabel("Salary",fontsize=10)
plt.scatter(group_1_ages,group_1_salaries,c=group_1_colour,label="Group 1")
plt.scatter(group_2_ages,group_2_salaries,c=group_2_colour,label="Group 2")
plt.scatter(group_3_ages,group_3_salaries,c=group_3_colour,label="Group 3")
plt.legend(loc='upper left')
plt.show()
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []
for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))
# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)
# Convert to a data frame using a schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", DoubleType(), True),
StructField("salary",IntegerType(), True)
])
inputDF=spark.createDataFrame(inputRDD,schema)
#Write the data frame to Aerospike, the id field is used as the primary key
inputDF \
.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.set", "salary_data")\
.option("aerospike.updateByKey", "id") \
.save()
#Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column
#using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d
#like to be the Primary key, while loading data from the DB.
insertDFWithSchema=spark \
.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", "salary_data") \
.option("aerospike.updateByKey", "id") \
.load()
sqlView="inserttable"
#
# V2 datasource doesn't allow insert into a view.
#
insertDFWithSchema.createTempView(sqlView)
spark.sql("select * from inserttable").show()
# Create a Spark DataFrame by using the Connector Schema inference mechanism
# The fields preceded with __ are metadata fields - key/digest/expiry/generation/ttl
# By default you just get everything, with no column ordering, which is why it looks untidy
# Note we don't get anything in the 'key' field as we have not chosen to save as a bin.
# Use .option("aerospike.sendKey", True) to do this
loadedDFWithoutSchema = (
spark.read.format("aerospike") \
.option("aerospike.set", "salary_data") \
.load()
)
loadedDFWithoutSchema.show(10)
# If we explicitly set the schema, using the previously created schema object
# we effectively type the rows in the Data Frame
loadedDFWithSchema=spark \
.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", "salary_data").load()
loadedDFWithSchema.show(5)
Sample specified number of records from Aerospike to considerably reduce data movement between Aerospike and the Spark clusters. Depending on the aerospike.partition.factor setting, you may get more records than desired. Please use this property in conjunction with Spark limit()
function to get the specified number of records. The sample read is not randomized, so sample more than you need and use the Spark sample()
function to randomize if you see fit. You can use it in conjunction with aerospike.recordspersecond
to control the load on the Aerospike server while sampling.
For more information, please see documentation page.
#number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor}
#total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp)
#use lower partition factor for more accurate sampling
setname="py_input_data"
sample_size=101
df3=spark.read.format("aerospike") \
.option("aerospike.partition.factor","2") \
.option("aerospike.set",setname) \
.option("aerospike.sample.size","101") \
.load()
df4=spark.read.format("aerospike") \
.option("aerospike.partition.factor","6") \
.option("aerospike.set",setname) \
.option("aerospike.sample.size","101") \
.load()
#Notice that more records were read than requested due to the underlying partitioning logic related to the partition factor as described earlier, hence we use Spark limit() function additionally to return the desired number of records.
count3=df3.count()
count4=df4.count()
#Note how limit got only 101 records from df4.
dfWithLimit=df4.limit(101)
limitCount=dfWithLimit.count()
print("count3= ", count3, " count4= ", count4, " limitCount=", limitCount)
# Schema specification
aliases_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False)
])
id_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False),
StructField("aliases",ArrayType(aliases_type),False)
])
street_adress_type = StructType([
StructField("street_name",StringType(),False),
StructField("apt_number",IntegerType(),False)
])
address_type = StructType([
StructField("zip",LongType(),False),
StructField("street",street_adress_type,False),
StructField("city",StringType(),False)
])
workHistory_type = StructType([
StructField ("company_name",StringType(),False),
StructField( "company_address",address_type,False),
StructField("worked_from",StringType(),False)
])
person_type = StructType([
StructField("name",id_type,False),
StructField("SSN",StringType(),False),
StructField("home_address",ArrayType(address_type),False),
StructField("work_history",ArrayType(workHistory_type),False)
])
# JSON data location
complex_data_json="resources/nested_data.json"
# Read data in using prepared schema
cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json)
# Save data to Aerospike
cmplx_data_with_schema \
.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "complex_input_data") \
.option("aerospike.updateByKey", "SSN") \
.save()
loadedComplexDFWithSchema=spark \
.read \
.format("aerospike") \
.option("aerospike.set", "complex_input_data") \
.schema(person_type) \
.load()
loadedComplexDFWithSchema.show(5)
import pandas
import matplotlib
import matplotlib.pyplot as plt
#convert Spark df to pandas df
pdf = loadedDFWithSchema.toPandas()
# Describe the data
pdf.describe()
#Histogram - Age
age_min, age_max = int(np.amin(pdf['age'])), math.ceil(np.amax(pdf['age']))
age_bucket_size = 5
print(age_min,age_max)
pdf[['age']].plot(kind='hist',bins=range(age_min,age_max,age_bucket_size),rwidth=0.8)
plt.xlabel('Age',fontsize=10)
plt.legend(loc=None)
plt.show()
#Histogram - Salary
salary_min, salary_max = int(np.amin(pdf['salary'])), math.ceil(np.amax(pdf['salary']))
salary_bucket_size = 5000
pdf[['salary']].plot(kind='hist',bins=range(salary_min,salary_max,salary_bucket_size),rwidth=0.8)
plt.xlabel('Salary',fontsize=10)
plt.legend(loc=None)
plt.show()
# Heatmap
age_bucket_count = math.ceil((age_max - age_min)/age_bucket_size)
salary_bucket_count = math.ceil((salary_max - salary_min)/salary_bucket_size)
x = [[0 for i in range(salary_bucket_count)] for j in range(age_bucket_count)]
for i in range(len(pdf['age'])):
age_bucket = math.floor((pdf['age'][i] - age_min)/age_bucket_size)
salary_bucket = math.floor((pdf['salary'][i] - salary_min)/salary_bucket_size)
x[age_bucket][salary_bucket] += 1
plt.title("Salary/Age distribution heatmap")
plt.xlabel("Salary in '000s")
plt.ylabel("Age")
plt.imshow(x, cmap='YlOrRd', interpolation='nearest',extent=[salary_min/1000,salary_max/1000,age_min,age_max],
origin="lower")
plt.colorbar(orientation="horizontal")
plt.show()
__key
or __digest
with, with no OR
between two bins.With batch get queries we can apply filters on metadata columns such as __gen
or __ttl
. To do this, these columns should be exposed through the schema.
# Basic PKey query
batchGet1= spark \
.read \
.format("aerospike") \
.option("aerospike.set", "salary_data") \
.option("aerospike.keyType", "int") \
.load().where("__key = 100") \
batchGet1.show()
#Note ASDB only supports equality test with PKs in primary key query.
#So, a where clause with "__key >10", would result in scan query!
# Batch get, primary key based query
from pyspark.sql.functions import col
somePrimaryKeys= list(range(1,10))
someMoreKeys= list(range(12,14))
batchGet2= spark \
.read \
.format("aerospike") \
.option("aerospike.set", "salary_data") \
.option("aerospike.keyType", "int") \
.load().where((col("__key").isin(somePrimaryKeys)) | ( col("__key").isin(someMoreKeys)))
batchGet2.show(5)
__digest
¶__digest
can have only two types BinaryType
(default type) or StringType
.__digest
is StringType
, then set aerospike.digestType
to string
.__digest
batchget call will have null primary key (i.e.__key
is null
).#convert digests to a list of byte[]
digest_list=batchGet2.select("__digest").rdd.flatMap(lambda x: x).collect()
#convert digest to hex string for querying. Only digests of type hex string and byte[] array are allowed.
string_digest=[ ''.join(format(x, '02x') for x in m) for m in digest_list]
#option("aerospike.digestType", "string") hints to assume that __digest type is string in schema inference.
#please note that __key retrieved in this case is null. So be careful to use retrieved keys in downstream query!
batchGetWithDigest= spark \
.read \
.format("aerospike") \
.option("aerospike.set", "salary_data") \
.option("aerospike.digestType", "string") \
.load().where(col("__digest").isin(string_digest))
batchGetWithDigest.show()
#digests can be mixed with primary keys as well
batchGetWithDigestAndKey= spark \
.read \
.format("aerospike") \
.option("aerospike.set", "salary_data") \
.option("aerospike.digestType", "string") \
.option("aerospike.keyType", "int") \
.load().where(col("__digest").isin(string_digest[0:1]) | ( col("__key").isin(someMoreKeys)))
batchGetWithDigestAndKey.show()
#please note to the null in key columns in both dataframe
# This query will run as a scan, which will be slower
somePrimaryKeys= list(range(1,10))
scanQuery1= spark \
.read \
.format("aerospike") \
.option("aerospike.set", "salary_data") \
.option("aerospike.keyType", "int") \
.load().where((col("__key").isin(somePrimaryKeys)) | ( col("age") >50 ))
scanQuery1.show()
scala_predexp= sc._jvm.com.aerospike.spark.utility.AerospikePushdownExpressions
#id % 5 == 0 => get rows where mod(col("id")) ==0
#Equvalent java Exp: Exp.eq(Exp.mod(Exp.intBin("a"), Exp.`val`(5)), Exp.`val`(0))
expIntBin=scala_predexp.intBin("id") # id is the name of column
expMODIntBinEqualToZero=scala_predexp.eq(scala_predexp.mod(expIntBin, scala_predexp.val(5)),scala_predexp.val(0))
expMODIntBinToBase64= scala_predexp.build(expMODIntBinEqualToZero).getBase64()
#expMODIntBinToBase64= "kwGTGpNRAqJpZAUA"
pushdownset = "py_input_data"
pushDownDF =spark\
.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", pushdownset) \
.option("aerospike.pushdown.expressions", expMODIntBinToBase64) \
.load()
pushDownDF.count() #should get 39 records, we have 199/5 records whose id bin is divisble by 5
pushDownDF.show(2)
See https://www.aerospike.com/docs/connect/processing/spark/reference.html for detailed description of the above properties
In this section we use the data we took from Aerospike and apply a clustering algorithm to it.
We assume the data is composed of multiple data sets having a Gaussian multi-variate distribution
We don't know how many clusters there are, so we try clustering based on the assumption there are 1 through 20.
We compare the quality of the results using the Bayesian Information Criterion - https://en.wikipedia.org/wiki/Bayesian_information_criterion and pick the best.
!pip install scikit-learn
from sklearn.mixture import GaussianMixture
# We take the data we previously
ages=pdf['age']
salaries=pdf['salary']
#age_salary_matrix=np.matrix([ages,salaries]).T
age_salary_matrix=np.asarray([ages,salaries]).T
# Find the optimal number of clusters
optimal_cluster_count = 1
best_bic_score = GaussianMixture(1).fit(age_salary_matrix).bic(age_salary_matrix)
for count in range(1,20):
gm=GaussianMixture(count)
gm.fit(age_salary_matrix)
if gm.bic(age_salary_matrix) < best_bic_score:
best_bic_score = gm.bic(age_salary_matrix)
optimal_cluster_count = count
print("Optimal cluster count found to be "+str(optimal_cluster_count))
Next we fit our cluster using the optimal cluster count, and print out the discovered means and covariance matrix
gm = GaussianMixture(optimal_cluster_count)
gm.fit(age_salary_matrix)
estimates = []
# Index
for index in range(0,optimal_cluster_count):
estimated_mean_age = round(gm.means_[index][0],2)
estimated_mean_salary = round(gm.means_[index][1],0)
estimated_age_std_dev = round(math.sqrt(gm.covariances_[index][0][0]),2)
estimated_salary_std_dev = round(math.sqrt(gm.covariances_[index][1][1]),0)
estimated_correlation = round(gm.covariances_[index][0][1] / ( estimated_age_std_dev * estimated_salary_std_dev ),3)
row = [estimated_mean_age,estimated_mean_salary,estimated_age_std_dev,estimated_salary_std_dev,estimated_correlation]
estimates.append(row)
pd.DataFrame(estimates,columns = ["Est Mean Age","Est Mean Salary","Est Age Std Dev","Est Salary Std Dev","Est Correlation"])
distribution_data_as_rows = []
for distribution in distribution_data:
row = [distribution['age_mean'],distribution['salary_mean'],distribution['age_std_dev'],
distribution['salary_std_dev'],distribution['age_salary_correlation']]
distribution_data_as_rows.append(row)
pd.DataFrame(distribution_data_as_rows,columns = ["Mean Age","Mean Salary","Age Std Dev","Salary Std Dev","Correlation"])
You can see that the algorithm provides good estimates of the original parameters
We generate new age/salary pairs for each of the distributions and look at how accurate the prediction is
def prediction_accuracy(model,age_salary_distribution,sample_size):
# Generate new values
new_ages,new_salaries = age_salary_sample(age_salary_distribution,sample_size)
#new_age_salary_matrix=np.matrix([new_ages,new_salaries]).T
new_age_salary_matrix=np.asarray([new_ages,new_salaries]).T
# Find which cluster the mean would be classified into
#mean = np.matrix([age_salary_distribution['age_mean'],age_salary_distribution['salary_mean']])
#mean = np.asarray([age_salary_distribution['age_mean'],age_salary_distribution['salary_mean']])
mean = np.asarray(np.matrix([age_salary_distribution['age_mean'],age_salary_distribution['salary_mean']]))
mean_cluster_index = model.predict(mean)[0]
# How would new samples be classified
classification = model.predict(new_age_salary_matrix)
# How many were classified correctly
correctly_classified = len([ 1 for x in classification if x == mean_cluster_index])
return correctly_classified / sample_size
prediction_accuracy_results = [None for x in range(3)]
for index, age_salary_distribution in enumerate(distribution_data):
prediction_accuracy_results[index] = prediction_accuracy(gm,age_salary_distribution,1000)
overall_accuracy = sum(prediction_accuracy_results)/ len(prediction_accuracy_results)
print("Accuracies for each distribution : "," ,".join(map('{:.2%}'.format,prediction_accuracy_results)))
print("Overall accuracy : ",'{:.2%}'.format(overall_accuracy))
aerolookup allows you to look up records corresponding to a set of keys stored in a Spark DF, streaming or otherwise. It supports:
aerospike.schema.flexible
to true in the SparkConf object.alias = StructType([StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False)])
name = StructType([StructField("first_name", StringType(), False),
StructField("aliases", ArrayType(alias), False)])
street_adress = StructType([StructField("street_name", StringType(), False),
StructField("apt_number", IntegerType(), False)])
address = StructType([StructField("zip", LongType(), False),
StructField("street", street_adress, False),
StructField("city", StringType(), False)])
work_history = StructType([StructField("company_name", StringType(), False),
StructField("company_address", address, False),
StructField("worked_from", StringType(), False)])
output_schema = StructType([StructField("name", name, False),
StructField("SSN", StringType(), False),
StructField("home_address", ArrayType(address), False)])
ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
#Create a set of PKs whose records you'd like to look up in the Aerospike database
customerIdsDF=spark.createDataFrame(ssns,["SSN"])
from pyspark.sql import SQLContext
scala_py_util= sc._jvm.com.aerospike.spark.PythonUtil #Import the scala object
gateway_df=scala_py_util.aerolookup(
customerIdsDF._jdf, #Please note ._jdf
'SSN',
'complex_input_data', #complex_input_data is the set in Aerospike database that you are using to look up the keys stored in SSN DF
output_schema.json(),
'test',
{} # may use this map to pass any aerospike configuration
)
aerolookup_df=pyspark.sql.DataFrame(gateway_df,spark)
#aerolookup_df=pyspark.sql.DataFrame(gateway_df,spark._wrapped)
#Note the wrapping of java object into python.sql.DataFrame
aerolookup_df.show()
aerospike.sindex.enable
to false (by default it is set to true).aerospike.sindex
. If it is not set, connector appropriately selects secondary index for query execution.aerospike.sindex.filter
. This feature may be user to filter out CDT at the database site itself, which is not immediately possible to acheive using standard spark filters.si_records=50
si_schema = StructType([StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("array", ArrayType(IntegerType()), True)])
si_set= "py_si_set"
siBuf = []
for i in range(1, si_records) :
id_ = i
name = "name" + str(i)
arr= [x for x in range(i, i+3) ]
siBuf.append((id_, name, arr))
siRDD = spark.sparkContext.parallelize(siBuf)
siInputDF=spark.createDataFrame(siRDD,si_schema)
#Write the secondary index Data to Aerospike
siInputDF.write.mode('overwrite').format("aerospike") \
.option("aerospike.writeset", si_set).option("aerospike.updateByKey", "id").save()
py_id_idx
, py_name_idx
and py_arr_idx
over respective bins.sindexList(namespace)
API. This API assumes that sparksession is alread created and contains informations such as hostname, namespace in spark runtime configuration.!pip install aerospike
#index names to be created
num_idx= "py_id_idx"
str_idx= "py_name_idx"
arr_idx= "py_arr_idx"
import aerospike
pyClient = aerospike.client({"hosts": [AS_HOST]}).connect()
try:
pyClient.index_integer_create('test', si_set, "id", num_idx)
pyClient.index_string_create('test', si_set, "name", str_idx)
pyClient.index_list_create('test', si_set, 'array', aerospike.INDEX_NUMERIC, arr_idx)
except ex.IndexFoundError as e:
print(e)
#Print all "test" namsespace indices
print(scala_py_util.sindexList("test"))
aerospike.sindex
is not set.aerospike.sindex
to indicate to use the set index for query# automatically an appropriate secindary index is selected
siIdDF = spark.read.format("aerospike").schema(si_schema)\
.option("aerospike.set", si_set)\
.option("aerospike.partition.factor",1)\
.option("aerospike.sindex.enable","true")\
.load()
siIdDF.where(siIdDF.id >= 40).show() #should get 10 records
#search for `using secondary index: py_id_idx` in info logs
aerospike.sindex
to use it for query¶#user specified index "aerospike.sindex"
siNameDF = spark.read.format("aerospike").schema(si_schema)\
.option("aerospike.set", si_set)\
.option("aerospike.partition.factor",1)\
.option("aerospike.sindex","py_name_idx")\
.load()
siNameDF.where(siNameDF.name == "name1").show() #should get 1 records
#search for `using secondary index: py_name_idx` in info logs
#user specified filter in JSON format
arrayQuery =r'''{ "name": "array", "type": "NUMERIC", "colType": 1, "value": 10 }''' # "name" is bin name, colType =1 indicates sindex over array datatype.
siArrayDF = spark.read.format("aerospike").schema(si_schema)\
.option("aerospike.set", si_set).option("aerospike.partition.factor",1)\
.option("aerospike.sindex.filter",arrayQuery)\
.option("aerospike.sindex","py_arr_idx").load()
siArrayDF.show() #should print 3 records,