HyperStream Tutorial 5: Workflows

Workflows define a graph of streams. Usually, the first stream will be a special "raw" stream that pulls in data from a custom data source. Workflows can have multiple time ranges, which will cause the streams to be computed on all of the ranges given.


In this tutorial, we will be ussing a time-series dataset about the temperature in different countries and cities. The dataset is availabel at The Census at School New Zeland. The necessary files for this tutorial are already included in the folder data/TimeSeriesDatasets_130207.

In particular, there are four files with the minimum and maximum temperatures in different cities of Asia, Australia, NZ and USA from 2000 to 2012. And the rainfall levels of New Zeland.


In [1]:
%load_ext watermark

import sys
sys.path.append("../") # Add parent dir in the Path

from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC

from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
from dateutil.parser import parse

%watermark -v -m -p hyperstream -g

hs = HyperStream(loglevel=20)
print hs
CPython 2.7.6
IPython 5.3.0

hyperstream 0.3.0-beta

compiler   : GCC 4.8.4
system     : Linux
release    : 3.19.0-80-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 4
interpreter: 64bit
Git hash   : f0e911526041b91fe7999a8968c80618d410e741
HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>

Reading the data

In the data folder there are four csv files with the names TempAsia.csv, TempAustralia.csv, TempNZ.csv and TempUSA.csv. The first column of each csv file contains a header with the names of the columns. The first one being the date and the following are the minimum and maximum temperature in different cities with the format cityMin and cityMax.

Here is an example of the first 5 rows of the TempAsia.csv file:


The format of the date has the form YYYYMmm where YYYY is the year and mm is the month. Because this format is not recognized by the default parser of the csv_reader tool, we will need to specify our own parser that first replaces the M by an hyphen - and then applies the dateutils.parser.

Then, we will use a tool to read each csv, and a Stream to store all the results of applying the tool. When we specify to the tool that there is a header row in the csv file, the value of each Stream instance will be a dictionary with the name of the column and its corresponding value. For example, a Stream instance with the 4 cities shown above will look like:

[2000-01-19 00:00:00+00:00]: {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2}
In [2]:
def dateparser(dt):
    return parse(dt.replace('M', '-')).replace(tzinfo=UTC)

ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),
                      datetime(2013, 1, 1).replace(tzinfo=UTC))
ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),
                         datetime(2007, 3, 1).replace(tzinfo=UTC))

# M will be the Memory Channel
M = hs.channel_manager.memory

countries = ['Asia', 'Australia', 'NZ', 'USA']
temp_tools_csv = {}
temp_streams = {}
for country in countries:
    temp_tools_csv[country] = hs.plugins.example.tools.csv_reader(
            header=True, dateparser=dateparser)
    temp_streams[country] = M.get_or_create_stream(country)
    temp_tools_csv[country].execute(sources=[], sink=temp_streams[country],

Now that we have generated one Stream per each country, we can inspect the first Stream Instance of each Stream.

In [3]:
for country in countries:
    # Print two examples per stream
    print('\n{}: First Stream Instance'.format(country))
    key, value = temp_streams[country].window().first()
    print '[%s]: %s' % (key, value)
Asia: First Stream Instance
[2000-01-21 00:00:00+00:00]: {'NewDelhiMax': 20.1, 'NewDelhiMin': 8.1, 'HongKongMin': 14.3, 'KualaLumpurMin': 23.5, 'TokyoMax': 11.2, 'KualaLumpurMax': 32.2, 'HongKongMax': 19.5, 'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2}

Australia: First Stream Instance
[2000-01-21 00:00:00+00:00]: {'BrisbaneMax': 28.2, 'MelbourneMin': 15.9, 'Melbournemax': 24.3, 'BrisbaneMin': 19.1, 'CanberraMin': 10.1, 'GoldCoastMax': 27.5, 'SydneyMax': 24.9, 'Canberramax': 24.5, 'GoldCoastMin': 20.2, 'SydneyMin': 17.7}

NZ: First Stream Instance
[2000-01-21 00:00:00+00:00]: {'WellingtonMin': 14.2, 'ChristchurchMin': 10.8, 'HamiltonMin': 12.4, 'DunedinMax': 18.2, 'WellingtonMax': 20.0, 'ChristchurchMax': 20.2, 'AucklandMax': 23.4, 'HamiltonMax': 23.8, 'DunedinMin': 8.8, 'AucklandMin': 15.5}

USA: First Stream Instance
[2000-01-21 00:00:00+00:00]: {'ChicagoMax': 1.8, 'LosAngelesMin': 10.0, 'HoustonMax': 21.6, 'NYMax': 4.6, 'SeattleMax': 7.9, 'SeattleMin': 1.4, 'ChicagoMin': -8.1, 'NYMin': -5.6, 'HoustonMin': 7.4, 'LosAngelesMax': 19.6}

Visualize the temperatures in one Country

Now, we can visualize the temperatures of all the cities in one country. First, we will create a list of all the cities in one of the Streams by looking at the first Stream Instance. Then, we will create a list of lists containing the temperature value of each city, together with their corresponding time. Then, we can use the function plot_multiple_stock created for this tutorial.

In [4]:
country = countries[0]
this_cities_list = [key for key, value in temp_streams[country].window().items()[0].value.iteritems()]

data = {city:[] for city in this_cities_list}
time = []
for key, values in temp_streams[country].window().items():
    for city, temperature in values.iteritems():
names = data.keys()
data = [value for key, value in data.iteritems()]
plot_multiple_stock(data, time=time, names=names, title='Temperatures in ' + country, ylabel='ºC')
Temperatures in Asia
In [ ]:
In [5]:
from hyperstream import StreamInstance
from hyperstream import StreamId

one_country_stream = temp_streams[country]

# It is similar to a database channel
A = hs.channel_manager.assets
this_cities_stream = A.get_or_create_stream('cities_{}'.format(country))

mapping = {}
for city in this_cities_list:
    mapping[city] = city

A.write_to_stream(stream_id=this_cities_stream.stream_id, data=StreamInstance(ti_all.end, mapping))

print this_cities_stream.window(TimeInterval.up_to_now()).items()
[StreamInstance(timestamp=datetime.datetime(2013, 1, 1, 0, 0, tzinfo=<bson.tz_util.FixedOffset object at 0x7f6a14186290>), value={u'NewDelhiMax': u'NewDelhiMax', u'NewDelhiMin': u'NewDelhiMin', u'HongKongMin': u'HongKongMin', u'KualaLumpurMin': u'KualaLumpurMin', u'TokyoMax': u'TokyoMax', u'KualaLumpurMax': u'KualaLumpurMax', u'HongKongMax': u'HongKongMax', u'BangkokMin': u'BangkokMin', u'BangkokMax': u'BangkokMax', u'TokyoMin': u'TokyoMin'})]
In [6]:
for city in this_cities_list:
    if not hs.plate_manager.meta_data_manager.contains(identifier='city_'+city):
        print("Adding " + city)
        hs.plate_manager.meta_data_manager.insert(parent='root', data=city,
                                                  tag='city', identifier='city_'+city)
In [7]:
cities_plate = hs.plate_manager.create_plate(plate_id='C', meta_data_id='city', parent_plate=None, 
                                             values=[], complement=True, description='Cities')
this_country_temps = []
for city in this_cities_list:
    print("Adding " + city)
                                                                         meta_data=(('city', city),))))
Adding NewDelhiMax
Adding NewDelhiMin
Adding HongKongMin
Adding KualaLumpurMin
Adding TokyoMax
Adding KualaLumpurMax
Adding HongKongMax
Adding BangkokMin
Adding BangkokMax
Adding TokyoMin

It is possible to create all the Streams passing a list to the splitter tool splitter_from_list. However, this could not be automated in a workflow

# TODO Ussing this new tool, it is not necessary to create a new stream. However, if it is a Stream it could be
# automated for any other countries
splitter_tool = hs.plugins.example.tools.splitter_from_list(element=None)

# TODO try to change the parameter name of MultiOutputTool splitting_stream to splitting_parameter
# or something that does not force you to think that it is a stream
splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_list, output_plate=cities_plate, 
                      interval=ti_all, input_plate_value=None, sinks=this_country_temps)

TODO: Ask Tom: Question: With the splitter_from_stream version we still need to create a list with the mapping... Then, I can not see the difference between using one or the other method. Answer: The list in the Stream is allowed to change over time, this makes the future Streams to be more robust to change (e.g. the number of houses in the SPHERE project). Also, there is a tool that uses a dictionary for the splitting criteria, that is splitter_of_dict that expects the splitter_stream to be None, and a mapping parameter containing the static mapping.

In [8]:
splitter_tool = hs.tools.splitter_from_stream(element=None, use_mapping_keys_only=False)

splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_stream, output_plate=cities_plate, 
                      interval=ti_all, input_plate_value=None, sinks=this_country_temps)
In [9]:
one_city = this_country_temps[0]
city_name = one_city.stream_id.meta_data[0][1]
my_time, my_data = zip(*[(key.__str__(), value) for key, value in one_city.window(ti_all).items()])

plot_high_chart(my_time, my_data, type="high_stock", title='Temperature in {}'.format(city_name), yax='ºC')
Temperature in NewDelhiMax

Put this all together as a workflow

In [10]:
w = hs.create_workflow(
    name="World climate data and statistics", 
    description="Climate data statistics of ceveral cities from Asia, Australia, New Zeland and USA",

First create the nodes

Nodes correspond to the Streams that we used above.

In [11]:
from collections import namedtuple
NodeDef = namedtuple('NodeDef', ['channel', 'stream_name', 'plate_ids'], verbose=False)

nodes = (
    NodeDef(S, "wearable",     ["H"]),
    NodeDef(M, "wearable_xl1", ["H"]),
    NodeDef(M, "window_5",     ["H"]),
    NodeDef(M, "window_300",   ["H"]),
    NodeDef(M, "arm_angle",    ["H"]),
    NodeDef(M, "inactivity",   ["H"])

# Simple object to hold nodes
class NodeCollection(object): 

N = NodeCollection()

for n in nodes:
    setattr(N, n.stream_name, w.create_node(channel=n.channel, stream_name=n.stream_name, plate_ids=n.plate_ids))
NameError                                 Traceback (most recent call last)
<ipython-input-11-f0156f57b82b> in <module>()
      4 nodes = (
----> 5     NodeDef(S, "wearable",     ["H"]),
      6     NodeDef(M, "wearable_xl1", ["H"]),
      7     NodeDef(M, "window_5",     ["H"]),

NameError: name 'S' is not defined