#!/usr/bin/env python
# coding: utf-8
# # Environment Setup
# In[4]:
from os import path as p, chdir
if 'examples' in p.abspath('.'):
chdir('..')
# # Examples
# ## Fetch a webpage
# In this example, we fetch the title of a webpage.
# In[5]:
from riko.modules.fetchpage import pipe
url = 'https://news.ycombinator.com/'
next(pipe(conf={'url': url, 'start': '
', 'end': ''}))
# ## Fetch a webpage using an xpath
# Here, we fetch the the first hackernews story link using an xpath.
# In[6]:
from riko.modules.xpathfetchpage import pipe
xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a'
next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath}))
# ## Word Count
# Here, we use several pipes to count the number of words on a webpage.
# In[8]:
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
from riko.collections import SyncPipe
### Set the pipe configurations ###
#
# Notes:
# 1. the `detag` option will strip all html tags from the result
# 2. fetch the text contained inside the 'body' tag of the hackernews homepage
# 3. replace newlines with spaces and assign the result to 'content'
# 4. tokenize the resulting text using whitespace as the delimeter
# 5. count the number of times each token appears
# 6. obtain the raw stream
# 7. extract the first word and its count
url = 'https://news.ycombinator.com/'
fetch_conf = {'url': url, 'start': '', 'end': '', 'detag': True} # 1
replace_conf = {'rule': [{'find': '\r\n', 'replace': ' '}, {'find': '\n', 'replace': ' '}]}
flow = (
SyncPipe('fetchpage', conf=fetch_conf) # 2
.strreplace(conf=replace_conf, assign='content') # 3
.tokenizer(conf={'delimiter': ' '}, emit=True) # 4
.count(conf={'count_key': 'content'})) # 5
stream = flow.output # 6
next(stream) # 7
# ## Fetching feeds
# `riko` can fetch rss feeds from both local and remote filepaths via "source"
# `pipes`. Each "source" `pipe` returns a `stream`, i.e., an iterator of
# dictionaries, aka `items`.
# In[8]:
from riko.modules.fetch import pipe
### Fetch an RSS feed ###
stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'})
item = next(stream)
item['title'], item['link'], item['comments']
# In[9]:
from riko.modules.fetchsitefeed import pipe
### Fetch the first RSS feed found ###
#
# Note: regardless of how you fetch an RSS feed, it will have the same
# structure
stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
item = next(stream)
item.keys()
# In[10]:
item['title'], item['author'], item['id']
# Please see the [FAQ](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst) for a complete list of supported [file types](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-file-types-are-supported) and
# [protocols](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-protocols-are-supported). Please see [Fetching data and feeds](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#fetching-data-and-feeds) for more examples.
# ## Synchronous processing
# `riko` can modify `streams` via the 40 built-in `pipes`
# In[11]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the link
# 3. sort the items ascending by title
# 4. fetch the first comment from each item
# 5. flatten the result into one raw stream
# 6. extract the first item's content
#
# Note: sorting is not lazy so take caution when using this pipe
flow = (
SyncPipe('fetch', conf=fetch_conf) # 1
.filter(conf={'rule': filter_rule}) # 2
.sort(conf={'rule': {'sort_key': 'title'}}) # 3
.xpathfetchpage(conf=xpath_conf)) # 4
stream = flow.output # 5
next(stream)['content'] # 6
# Please see [alternate workflow creation](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#synchronous-processing) for an alternative (function based) method for
# creating a `stream`. Please see [pipes](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-pipes-are-available) for a complete list of available `pipes`.
# ## Parallel processing
# An example using `riko`'s parallel API to spawn a `ThreadPool`. You can instead enable a `ProcessPool` by additionally passing `threads=False` to `SyncPipe`, i.e., `SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False)`.
# In[12]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a parallel SyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. fetch the first comment from all items in parallel (using 4 workers)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since parallel fetching doesn't guarantee
# order
flow = (
SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf)) # 3
stream = flow.output # 4
next(stream)['content'] # 5
# ## Asynchronous processing
# To enable asynchronous processing, you must install the `async` module.
# `pip install riko[async]`
# An example using `riko`'s asynchronous API.
# In[2]:
from riko.bado import coroutine, react
from riko.collections import AsyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create an AsyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. asynchronously fetch the first comment from each item (using 4 connections)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since async fetching doesn't guarantee
# order
@coroutine
def run(reactor):
stream = yield (
AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf) # 3
.output) # 4
print(next(stream)['content']) # 5
try:
react(run)
except SystemExit:
pass
# # Design Principles
# The primary data structures in `riko` are the `item` and `stream`. An `item`
# is just a python dictionary, and a `stream` is an iterator of `items`. You can
# create a `stream` manually with something as simple as
# `[{'content': 'hello world'}]`. You manipulate `streams` in
# `riko` via `pipes`. A `pipe` is simply a function that accepts either a
# `stream` or `item`, and returns a `stream`. `pipes` are composable: you
# can use the output of one `pipe` as the input to another `pipe`.
#
# `riko` `pipes` come in two flavors; `operators` and `processors`.
# `operators` operate on an entire `stream` at once and are unable to handle
# individual items. Example `operators` include `pipecount`, `pipefilter`,
# and `pipereverse`.
# In[14]:
from riko.modules.reverse import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
# `processors` process individual `items` and can be parallelized across
# threads or processes. Example `processors` include `pipefetchsitefeed`,
# `pipehash`, `pipeitembuilder`, and `piperegex`.
# In[15]:
from riko.modules.hash import pipe
item = {'title': 'riko pt. 1'}
stream = pipe(item, field='title')
next(stream)
# Some `processors`, e.g., `pipetokenizer`, return multiple results.
# In[16]:
from riko.modules.tokenizer import pipe
item = {'title': 'riko pt. 1'}
tokenizer_conf = {'delimiter': ' '}
stream = pipe(item, conf=tokenizer_conf, field='title')
next(stream)
# In[17]:
# In this case, if we just want the result, we can `emit` it instead
stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
next(stream)
# `operators` are split into sub-types of `aggregators`
# and `composers`. `aggregators`, e.g., `count`, combine
# all `items` of an input `stream` into a new `stream` with a single `item`;
# while `composers`, e.g., `filter`, create a new `stream` containing
# some or all `items` of an input `stream`.
# In[26]:
from riko.modules.count import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
# `processors` are split into sub-types of `source` and `transformer`.
# `sources`, e.g., `itembuilder`, can create a `stream` while
# `transformers`, e.g. `hash` can only transform items in a `stream`.
# In[27]:
from riko.modules.itembuilder import pipe
attrs = {'key': 'title', 'value': 'riko pt. 1'}
next(pipe(conf={'attrs': attrs}))
# The following table summaries these observations:
#
#
# type | sub-type | input | output | parallelizable? | creates streams?
# -----|----------|-------|--------|-----------------|-----------------
# operator | aggregator | stream | stream * |
# operator | composer | stream | stream |
# processor | source | item | stream | √ | √
# processor | transformer | item | stream | √
# If you are unsure of the type of `pipe` you have, check its metadata.
#
# `*` the output `stream` of an `aggregator` is an iterator of only 1 `item`.
# In[3]:
from riko.modules import fetchpage, count
fetchpage.async_pipe.__dict__
# In[29]:
count.pipe.__dict__
# The `SyncPipe` and `AsyncPipe` classes (among other things) perform this
# check for you to allow for convenient method chaining and transparent
# parallelization.
# In[30]:
from riko.collections import SyncPipe
attrs = [
{'key': 'title', 'value': 'riko pt. 1'},
{'key': 'content', 'value': "Let's talk about riko!"}]
flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
flow.list[0]
# Please see the [cookbook](https://github.com/nerevu/riko/blob/master/docs/COOKBOOK.rst) for advanced examples including how to wire in
# vales from other pipes or accept user input.
# # Command-line Interface
# `riko` provides a command, `runpipe`, to execute `workflows`. A
# `workflow` is simply a file containing a function named `pipe` that creates
# a `flow` and processes the resulting `stream`.
# ## CLI Setup
# `flow.py`
# In[31]:
from __future__ import print_function
from riko.collections import SyncPipe
conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}
def pipe(test=False):
kwargs = {'conf': conf1, 'test': test}
flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)
stream = flow.output
for i in stream:
print(i)
# ## CLI Usage
# Now to execute `flow.py`, type the command `runpipe flow`. You should
# then see the following output in your terminal:
# `https://google.co.uk`
# `runpipe` will also search the `examples` directory for `workflows`. Type
# `runpipe demo` and you should see the following output:
# `Deadline to clear up health law eligibility near 682`