Producing and Consuming Messages to/from Kafka and plotting, using python producer and spark consumer

To run this notebook you must already have created a Kafka topic

Imports

We use utility functions from the hops library to make Kafka configuration simple

Dependencies:

  • hops-py-util
  • confluent-kafka
In [1]:
from hops import kafka
from hops import tls
from hops import hdfs
from confluent_kafka import Producer, Consumer
import numpy as np
from pyspark.sql.types import StructType, StructField, FloatType, TimestampType
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
34application_1538483294796_0037pysparkidleLinkLink
SparkSession available as 'spark'.

Constants

Update the TOPIC_NAME field to reflect the name of your Kafka topic that you want to read/write from/to Update the OUTPUT_PATH field to where the output data should be written

In [2]:
TOPIC_NAME = "test"
OUTPUT_PATH = "/Projects/" + hdfs.project_name() + "/Resources/data-csv"
CHECKPOINT_PATH = "/Projects/" + hdfs.project_name() + "/Resources/checkpoint-csv"

Produce some Messages to the Topic

Specify the configuration, using hops-py-util

In [3]:
config = {
    "bootstrap.servers": kafka.get_broker_endpoints(),
    "security.protocol": kafka.get_security_protocol(),
    "ssl.ca.location": tls.get_ca_chain_location(),
    "ssl.certificate.location": tls.get_client_certificate_location(),
    "ssl.key.location": tls.get_client_key_location(),
    "group.id": "something"
}

Create producer with kafka-confluent API

In [4]:
producer = Producer(config)

producer.produce is an asychronous call so we create a callback to be notified when messages are delivered

In [5]:
def delivery_callback(err, msg):
    """
    Optional per-message delivery callback (triggered by poll() or flush())
    when a message has been successfully delivered or permanently
    failed delivery (after retries).
    """
    if err:
        print("Message failed delivery: {}".format(err))
    else:
        print('Message: {} delivered to topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))

Produce 100 random number-messages to the topic

In [15]:
normal_rnd_dist = np.random.normal(0, 0.1, 100)
for i in range(0, 100):
    producer.produce(TOPIC_NAME, str(normal_rnd_dist[i]), "key", callback=delivery_callback)
    
# Trigger the sending of all messages to the brokers, 20sec timeout
producer.flush(20)
Message: -0.113515966706715 delivered to topic: test, partition: 1, offset: 450, timestamp: (1, 1538570713617L)
Message: -0.13372629136129505 delivered to topic: test, partition: 1, offset: 451, timestamp: (1, 1538570713617L)
Message: -0.030109530115267787 delivered to topic: test, partition: 1, offset: 452, timestamp: (1, 1538570713617L)
Message: 0.09914600908518259 delivered to topic: test, partition: 1, offset: 453, timestamp: (1, 1538570713617L)
Message: -0.13942848544599754 delivered to topic: test, partition: 1, offset: 454, timestamp: (1, 1538570713617L)
Message: 0.06244323355006787 delivered to topic: test, partition: 1, offset: 455, timestamp: (1, 1538570713617L)
Message: 0.12423451639758504 delivered to topic: test, partition: 1, offset: 456, timestamp: (1, 1538570713617L)
Message: 0.01979165601399879 delivered to topic: test, partition: 1, offset: 457, timestamp: (1, 1538570713617L)
Message: 0.13896608728374968 delivered to topic: test, partition: 1, offset: 458, timestamp: (1, 1538570713617L)
Message: -0.22441498120997885 delivered to topic: test, partition: 1, offset: 459, timestamp: (1, 1538570713617L)
Message: 0.1099915077238914 delivered to topic: test, partition: 1, offset: 460, timestamp: (1, 1538570713617L)
Message: 0.11565688359003806 delivered to topic: test, partition: 1, offset: 461, timestamp: (1, 1538570713617L)
Message: 0.01708963484491049 delivered to topic: test, partition: 1, offset: 462, timestamp: (1, 1538570713617L)
Message: -0.10090270372296102 delivered to topic: test, partition: 1, offset: 463, timestamp: (1, 1538570713617L)
Message: -0.05870498166968106 delivered to topic: test, partition: 1, offset: 464, timestamp: (1, 1538570713617L)
Message: -0.05802497535874125 delivered to topic: test, partition: 1, offset: 465, timestamp: (1, 1538570713617L)
Message: 0.06852280438233449 delivered to topic: test, partition: 1, offset: 466, timestamp: (1, 1538570713617L)
Message: -0.07384578426064682 delivered to topic: test, partition: 1, offset: 467, timestamp: (1, 1538570713617L)
Message: -0.049791032528996144 delivered to topic: test, partition: 1, offset: 468, timestamp: (1, 1538570713617L)
Message: -0.0812131295770192 delivered to topic: test, partition: 1, offset: 469, timestamp: (1, 1538570713617L)
Message: -0.16636558959634237 delivered to topic: test, partition: 1, offset: 470, timestamp: (1, 1538570713617L)
Message: -0.05029487377263785 delivered to topic: test, partition: 1, offset: 471, timestamp: (1, 1538570713617L)
Message: -0.0634611358539206 delivered to topic: test, partition: 1, offset: 472, timestamp: (1, 1538570713617L)
Message: 0.047822771574680845 delivered to topic: test, partition: 1, offset: 473, timestamp: (1, 1538570713617L)
Message: -0.035862408830423124 delivered to topic: test, partition: 1, offset: 474, timestamp: (1, 1538570713617L)
Message: -0.10771865791104468 delivered to topic: test, partition: 1, offset: 475, timestamp: (1, 1538570713617L)
Message: -0.09621414290887964 delivered to topic: test, partition: 1, offset: 476, timestamp: (1, 1538570713617L)
Message: -0.12471722823698611 delivered to topic: test, partition: 1, offset: 477, timestamp: (1, 1538570713617L)
Message: -0.04144248818411699 delivered to topic: test, partition: 1, offset: 478, timestamp: (1, 1538570713617L)
Message: -0.022852969314669194 delivered to topic: test, partition: 1, offset: 479, timestamp: (1, 1538570713617L)
Message: 0.03812687911561363 delivered to topic: test, partition: 1, offset: 480, timestamp: (1, 1538570713617L)
Message: 0.11600386213341707 delivered to topic: test, partition: 1, offset: 481, timestamp: (1, 1538570713617L)
Message: 0.09175683466848605 delivered to topic: test, partition: 1, offset: 482, timestamp: (1, 1538570713617L)
Message: -0.12798420753009673 delivered to topic: test, partition: 1, offset: 483, timestamp: (1, 1538570713617L)
Message: 0.008424637864889025 delivered to topic: test, partition: 1, offset: 484, timestamp: (1, 1538570713617L)
Message: 0.008146330017132953 delivered to topic: test, partition: 1, offset: 485, timestamp: (1, 1538570713617L)
Message: -0.02340418188111429 delivered to topic: test, partition: 1, offset: 486, timestamp: (1, 1538570713617L)
Message: -0.12962750536767612 delivered to topic: test, partition: 1, offset: 487, timestamp: (1, 1538570713617L)
Message: -0.07628248175485523 delivered to topic: test, partition: 1, offset: 488, timestamp: (1, 1538570713617L)
Message: -0.09094972953830724 delivered to topic: test, partition: 1, offset: 489, timestamp: (1, 1538570713617L)
Message: 0.04784824141202446 delivered to topic: test, partition: 1, offset: 490, timestamp: (1, 1538570713617L)
Message: -0.0008079695296911359 delivered to topic: test, partition: 1, offset: 491, timestamp: (1, 1538570713617L)
Message: -0.12966562414756075 delivered to topic: test, partition: 1, offset: 492, timestamp: (1, 1538570713617L)
Message: 0.15101829400472663 delivered to topic: test, partition: 1, offset: 493, timestamp: (1, 1538570713617L)
Message: 0.07271092045856317 delivered to topic: test, partition: 1, offset: 494, timestamp: (1, 1538570713617L)
Message: -0.14274124593717222 delivered to topic: test, partition: 1, offset: 495, timestamp: (1, 1538570713617L)
Message: -0.07738435882850264 delivered to topic: test, partition: 1, offset: 496, timestamp: (1, 1538570713617L)
Message: 0.017391910899179314 delivered to topic: test, partition: 1, offset: 497, timestamp: (1, 1538570713617L)
Message: 0.11182130735642559 delivered to topic: test, partition: 1, offset: 498, timestamp: (1, 1538570713617L)
Message: 0.17939618586484868 delivered to topic: test, partition: 1, offset: 499, timestamp: (1, 1538570713617L)
Message: -0.030430503280300954 delivered to topic: test, partition: 1, offset: 500, timestamp: (1, 1538570713617L)
Message: 0.010896595939977088 delivered to topic: test, partition: 1, offset: 501, timestamp: (1, 1538570713617L)
Message: 0.07153886574524339 delivered to topic: test, partition: 1, offset: 502, timestamp: (1, 1538570713617L)
Message: -0.027702479187509583 delivered to topic: test, partition: 1, offset: 503, timestamp: (1, 1538570713617L)
Message: -0.025904346778860443 delivered to topic: test, partition: 1, offset: 504, timestamp: (1, 1538570713617L)
Message: 0.054551892172761886 delivered to topic: test, partition: 1, offset: 505, timestamp: (1, 1538570713617L)
Message: -0.027343347495906986 delivered to topic: test, partition: 1, offset: 506, timestamp: (1, 1538570713617L)
Message: 0.14735111070401619 delivered to topic: test, partition: 1, offset: 507, timestamp: (1, 1538570713617L)
Message: 0.03346639447277224 delivered to topic: test, partition: 1, offset: 508, timestamp: (1, 1538570713617L)
Message: 0.1826423337750613 delivered to topic: test, partition: 1, offset: 509, timestamp: (1, 1538570713617L)
Message: 0.04600033520055015 delivered to topic: test, partition: 1, offset: 510, timestamp: (1, 1538570713617L)
Message: -0.3554881125189931 delivered to topic: test, partition: 1, offset: 511, timestamp: (1, 1538570713617L)
Message: -0.01938300986377106 delivered to topic: test, partition: 1, offset: 512, timestamp: (1, 1538570713617L)
Message: -0.06868591688742505 delivered to topic: test, partition: 1, offset: 513, timestamp: (1, 1538570713617L)
Message: 0.04342203531972067 delivered to topic: test, partition: 1, offset: 514, timestamp: (1, 1538570713617L)
Message: 0.026605428384062164 delivered to topic: test, partition: 1, offset: 515, timestamp: (1, 1538570713617L)
Message: -0.23891058137348586 delivered to topic: test, partition: 1, offset: 516, timestamp: (1, 1538570713617L)
Message: 0.063107528304626 delivered to topic: test, partition: 1, offset: 517, timestamp: (1, 1538570713617L)
Message: -0.06384750369644372 delivered to topic: test, partition: 1, offset: 518, timestamp: (1, 1538570713617L)
Message: 0.09966984328987485 delivered to topic: test, partition: 1, offset: 519, timestamp: (1, 1538570713617L)
Message: 0.19078821436517882 delivered to topic: test, partition: 1, offset: 520, timestamp: (1, 1538570713617L)
Message: 0.11490040429088477 delivered to topic: test, partition: 1, offset: 521, timestamp: (1, 1538570713617L)
Message: 0.003344215541099674 delivered to topic: test, partition: 1, offset: 522, timestamp: (1, 1538570713617L)
Message: 0.01482845788007928 delivered to topic: test, partition: 1, offset: 523, timestamp: (1, 1538570713617L)
Message: -0.03389124274730744 delivered to topic: test, partition: 1, offset: 524, timestamp: (1, 1538570713618L)
Message: -0.0017421597393584454 delivered to topic: test, partition: 1, offset: 525, timestamp: (1, 1538570713618L)
Message: 0.02696093734246954 delivered to topic: test, partition: 1, offset: 526, timestamp: (1, 1538570713618L)
Message: -0.0685537177874654 delivered to topic: test, partition: 1, offset: 527, timestamp: (1, 1538570713618L)
Message: 0.008024171552555073 delivered to topic: test, partition: 1, offset: 528, timestamp: (1, 1538570713618L)
Message: -0.007976413182145927 delivered to topic: test, partition: 1, offset: 529, timestamp: (1, 1538570713618L)
Message: -0.1760613222741813 delivered to topic: test, partition: 1, offset: 530, timestamp: (1, 1538570713618L)
Message: -0.04272057630253764 delivered to topic: test, partition: 1, offset: 531, timestamp: (1, 1538570713618L)
Message: -0.08676547080431635 delivered to topic: test, partition: 1, offset: 532, timestamp: (1, 1538570713618L)
Message: -0.005037102529253483 delivered to topic: test, partition: 1, offset: 533, timestamp: (1, 1538570713618L)
Message: -0.026670469314402163 delivered to topic: test, partition: 1, offset: 534, timestamp: (1, 1538570713618L)
Message: -0.07311083242634318 delivered to topic: test, partition: 1, offset: 535, timestamp: (1, 1538570713618L)
Message: -0.023465465848581793 delivered to topic: test, partition: 1, offset: 536, timestamp: (1, 1538570713618L)
Message: 0.11321026392440192 delivered to topic: test, partition: 1, offset: 537, timestamp: (1, 1538570713618L)
Message: -0.15336795722952007 delivered to topic: test, partition: 1, offset: 538, timestamp: (1, 1538570713618L)
Message: -0.2558586664453579 delivered to topic: test, partition: 1, offset: 539, timestamp: (1, 1538570713618L)
Message: 0.008079556492722571 delivered to topic: test, partition: 1, offset: 540, timestamp: (1, 1538570713618L)
Message: -0.0407650872978403 delivered to topic: test, partition: 1, offset: 541, timestamp: (1, 1538570713618L)
Message: -0.03444415125912261 delivered to topic: test, partition: 1, offset: 542, timestamp: (1, 1538570713618L)
Message: 0.05797445559620995 delivered to topic: test, partition: 1, offset: 543, timestamp: (1, 1538570713618L)
Message: -0.05231783634896588 delivered to topic: test, partition: 1, offset: 544, timestamp: (1, 1538570713618L)
Message: 0.11925067273119393 delivered to topic: test, partition: 1, offset: 545, timestamp: (1, 1538570713618L)
Message: -0.050746734607947985 delivered to topic: test, partition: 1, offset: 546, timestamp: (1, 1538570713618L)
Message: -0.12585044629192257 delivered to topic: test, partition: 1, offset: 547, timestamp: (1, 1538570713618L)
Message: -0.20932999866560886 delivered to topic: test, partition: 1, offset: 548, timestamp: (1, 1538570713618L)
Message: 0.07143653458156844 delivered to topic: test, partition: 1, offset: 549, timestamp: (1, 1538570713618L)
0

Consume the Kafka Topic using Spark and Write to a Sink

The below snippet creates a streaming DataFrame with Kafka as a data source. Spark is lazy so it will not start streaming the data from Kafka into the dataframe until we specify an output sink (which we do later on in this notebook)

In [7]:
#lazy
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("subscribe", TOPIC_NAME) \
  .load()

When using Kafka as a data source, Spark gives us a default kafka schema as printed below

In [8]:
df.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

We are using the Spark structured streaming engine, which means that we can express stream queries just as we would do in batch jobs.

Below we filter the input stream to select only the message values and their timestamp

In [12]:
messages = df.selectExpr("CAST(value AS STRING)", "timestamp").selectExpr("CAST(value AS FLOAT)", "timestamp")

Specify the output query and the sink of the stream job to be a CSV file in HopsFS.

By using checkpointing and a WAL, spark gives us end-to-end exactly-once fault-tolerance

In [16]:
query = messages \
       .writeStream \
       .format("csv") \
       .option("path", OUTPUT_PATH) \
       .option("checkpointLocation", CHECKPOINT_PATH) \
       .start()

Run the streaming job, in theory streaming jobs should run forever.

However for this notebook example we are just going to read for 10 seconds and dump the output to the sink CSV file

In [17]:
query.awaitTermination(timeout=20) # 20 seconds timeout
query.stop()

Sometimes there is a delay before the spark job starts writing to the sink,

before going on to the next step in this notebook, go to your HDFS `OUTPUT_PATH` and verify that the csv output is not empty.

If it is empty, re-run the query above

Read the Data from the Sink

In [19]:
schema = StructType([
    StructField("value", FloatType(), True),
    StructField("timestamp", TimestampType(), True)])

df1 = spark.read \
     .format("csv") \
     .option("header", "false") \
     .schema(schema) \
     .load(OUTPUT_PATH)
In [20]:
df1.printSchema()
root
 |-- value: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Visualize the DataFrame using SparkMagic

This visualization currenly only works in Python 2.*

This command copies the spark dataframe from the cluster to the local machine and converts it to a pandas dataframe named "df1". This pandas dataframe is available in all cells started with the sparkmagic: %%local and can be used for visualizations and plotting.

In [24]:
%%spark -o df1

Below is sparkmagics default plotting

In [26]:
%%local
df1

Install matplotlib on the local machine in case it is not already installed

In [ ]:
%%bash
pip install --user matplotlib
In [39]:
%%local
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline 
from pylab import rcParams
rcParams['figure.figsize'] = 15, 10
hist, bins = np.histogram(df1["value"], bins=50)
width = 0.7 * (bins[1] - bins[0])
center = (bins[:-1] + bins[1:]) / 2
plt.bar(center, hist, align='center', width=width)
plt.title("Histogram of values")
plt.xlabel("value")
plt.ylabel("count")
plt.show()