#!/usr/bin/env python # coding: utf-8 # # # # HyperStream Tutorial 3: Stream composition # # We will be ussing the tool created in the previous tutorial and we will compose the output of the stream with a new one. # In[1]: get_ipython().run_line_magic('load_ext', 'watermark') import sys from datetime import datetime sys.path.append("../") # Add parent dir in the Path from hyperstream import HyperStream from hyperstream import TimeInterval from hyperstream.utils import UTC from utils import plot_high_chart get_ipython().run_line_magic('watermark', '-v -m -p hyperstream -g') # In[2]: hs = HyperStream(loglevel=20) print hs reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv') sea_ice_stream = hs.channel_manager.memory.get_or_create_stream("sea_ice") ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2012, 1, 1).replace(tzinfo=UTC)) reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti) for key, value in sea_ice_stream.window().items()[:10]: print '[%s]: %s' % (key, value) # ## Stream composition # # We can compose a chain of streams using different tools to get a new stream. As an example, we can use the tool **read_csv** to generate a stream from a csv file. Then, we can apply the tool **list_mean**, that computes the mean of all the values of each instance of a stream, and outputs a new stream. Finally, we can define the new stream to store the output in **memory** or in a **MongoDB** database. In this case, we will store the final Stream in the MongoDB database. # # |~stream||tool||stream||tool||stream| # |:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:| # | csv_file | $\rightarrow$ | reader_tool | $\rightarrow$ | sea_ice_stream | $\rightarrow$ | list_mean_tool | $\rightarrow$ | sea_ice_mean_stream | # |filesystem||memory||memory||memory||MongoDB| # In[3]: list_mean_tool = hs.tools.list_mean() sea_ice_means_stream = hs.channel_manager.mongo.get_or_create_stream('sea_ice_means') list_mean_tool.execute(sources=[sea_ice_stream], sink=sea_ice_means_stream, interval=ti) for key, value in sea_ice_means_stream.window().items()[:10]: print '[%s]: %s' % (key, value) # ## Visualization # # We can now plot all the values of the last computed window. In this case there is only one window with all the data computed by the tool. # In[4]: my_time, my_data = zip(*[(key.__str__(), value) for key, value in sea_ice_means_stream.window().items()]) plot_high_chart(my_time, my_data, type="high_stock", title='Mean of sea levels in the Artic and the Antartica', yax='meters') # In[ ]: