We will be ussing the tool created in the previous tutorial and we will compose the output of the stream with a new one.
%load_ext watermark
import sys
from datetime import datetime
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC
from utils import plot_high_chart
%watermark -v -m -p hyperstream -g
CPython 2.7.6 IPython 5.3.0 hyperstream 0.3.0-beta compiler : GCC 4.8.4 system : Linux release : 3.19.0-80-generic machine : x86_64 processor : x86_64 CPU cores : 4 interpreter: 64bit Git hash : f0e911526041b91fe7999a8968c80618d410e741
hs = HyperStream(loglevel=20)
print hs
reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')
sea_ice_stream = hs.channel_manager.memory.get_or_create_stream("sea_ice")
ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2012, 1, 1).replace(tzinfo=UTC))
reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)
for key, value in sea_ice_stream.window().items()[:10]:
print '[%s]: %s' % (key, value)
HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session> [1990-02-01 00:00:00+00:00]: [13.33, 2.15] [1990-03-01 00:00:00+00:00]: [13.44, 2.71] [1990-04-01 00:00:00+00:00]: [12.16, 5.1] [1990-05-01 00:00:00+00:00]: [10.84, 7.37] [1990-06-01 00:00:00+00:00]: [9.12, 10.26] [1990-07-01 00:00:00+00:00]: [6.44, 12.17] [1990-08-01 00:00:00+00:00]: [4.92, 13.95] [1990-09-01 00:00:00+00:00]: [4.5, 14.3] [1990-10-01 00:00:00+00:00]: [6.67, 13.71] [1990-11-01 00:00:00+00:00]: [9.58, 11.24]
We can compose a chain of streams using different tools to get a new stream. As an example, we can use the tool read_csv to generate a stream from a csv file. Then, we can apply the tool list_mean, that computes the mean of all the values of each instance of a stream, and outputs a new stream. Finally, we can define the new stream to store the output in memory or in a MongoDB database. In this case, we will store the final Stream in the MongoDB database.
~stream | tool | stream | tool | stream | ||||
---|---|---|---|---|---|---|---|---|
csv_file | $\rightarrow$ | reader_tool | $\rightarrow$ | sea_ice_stream | $\rightarrow$ | list_mean_tool | $\rightarrow$ | sea_ice_mean_stream |
filesystem | memory | memory | memory | MongoDB |
list_mean_tool = hs.tools.list_mean()
sea_ice_means_stream = hs.channel_manager.mongo.get_or_create_stream('sea_ice_means')
list_mean_tool.execute(sources=[sea_ice_stream], sink=sea_ice_means_stream, interval=ti)
for key, value in sea_ice_means_stream.window().items()[:10]:
print '[%s]: %s' % (key, value)
[1990-02-01 00:00:00+00:00]: 7.74 [1990-03-01 00:00:00+00:00]: 8.075 [1990-04-01 00:00:00+00:00]: 8.63 [1990-05-01 00:00:00+00:00]: 9.105 [1990-06-01 00:00:00+00:00]: 9.69 [1990-07-01 00:00:00+00:00]: 9.305 [1990-08-01 00:00:00+00:00]: 9.435 [1990-09-01 00:00:00+00:00]: 9.4 [1990-10-01 00:00:00+00:00]: 10.19 [1990-11-01 00:00:00+00:00]: 10.41
We can now plot all the values of the last computed window. In this case there is only one window with all the data computed by the tool.
my_time, my_data = zip(*[(key.__str__(), value) for key, value in sea_ice_means_stream.window().items()])
plot_high_chart(my_time, my_data, type="high_stock",
title='Mean of sea levels in the Artic and the Antartica', yax='meters')