Many real-world applications benefit from the ability to process data in a streaming manner. Spark provides Structured Streaming which a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. With Structured Streaming, you can process streaming data in real time as if in batch mode. Moveover, Structured Streaming enables fast, scalable, fault-tolerant, end-to-end exactly-once stream processing which make it easier for users to write streaming applications.
In this simple example, we will show that how to use Structured Streaming to maintain a running word count program of text data. This example is initially posted on Structured Streaming Programming Guide. You can download the full code here.
We first need to import the StreamingContext which is the entry point of Structured Streaming.
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from bigdl.util.common import *
sc = SparkContext('local[*]')
Then we create a local StreamingContext with batch interval of 5 seconds.
# Create a local StreamingContext with two working thread and batch interval of 5 seconds
ssc = StreamingContext(sc, 5)
The following lines first create a DStream connecting to a localhost that represents the stream of data that will be received from the data server, and then split each line into words and count word in each batch. The last line prints the first ten elements of each RDD generated in this DStream to the console.
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
To start the computation, we need to call function start(). And the function awaitTermination() specifies the time to wait in seconds to wait for the execution to stop.
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination(20)
When the program starts to run, the streaming computation is processing in the background. To actually run the code in your machine, you can run Netcat as a data server by typing in an existing terminal.
# TERMINAL: # Running Netcat
$ nc -lk 9999
Then, you can type words sequentially in the terminal running the netcat server and the counting results will be printed on the termial that runs the word count program.
# TERMINAL: # Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
If succeed, you can see following outputs on the screen.
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination(20)
------------------------------------------- Time: 2017-04-26 15:02:35 ------------------------------------------- ------------------------------------------- Time: 2017-04-26 15:02:40 ------------------------------------------- (u'apache', 1) (u'spark', 1) ------------------------------------------- Time: 2017-04-26 15:02:45 ------------------------------------------- ------------------------------------------- Time: 2017-04-26 15:02:50 ------------------------------------------- (u'apache', 1) (u'hadoop', 1) ------------------------------------------- Time: 2017-04-26 15:02:55 ------------------------------------------- ------------------------------------------- Time: 2017-04-26 15:03:00 -------------------------------------------
For more information about Structured Streaming, you can refer to this site.