Introduction | Requirements | Word Count | Motivation | Usage | Installation | Design Principles | Scripts | Command-line Interface | Contributing | Credits | More Info | Project Structure | License
riko is a pure Python framework for analyzing and processing streams
of
structured data. riko
has synchronous and asynchronous APIs, supports parallel
execution, and is well suited for processing rss feeds [1]. riko
also supplies
a command-line interface for executing flows
.
With riko
, you can
- Read csv/xml/json/html files
- Create text and data processing
flows
via modular pipes - Parse, extract, and process rss feeds
- Create awesome mashups [2], APIs, and maps
- Perform parallel processing via cpus/processors or threads
- and much more...
[1] | Really Simple Syndication |
[2] | Mashup (web application hybrid) |
riko
has been tested and is known to work on Python 2.7 and PyPy2 5.1.1.
Feature | Dependency | Installation |
---|---|---|
Async API | Twisted | pip install riko[async] |
Accelerated xml parsing | lxml [3] | pip install riko[lxml] |
[3] | If lxml isn't present, riko will default to the builtin Python xml parser |
In this example, we use several pipes to count the words on a webpage.
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. `get_path` just looks up a file in the `data` directory
>>> # 2. the `detag` option will strip all html tags from the result
>>> url = get_path('users.jyu.fi.html') # 1
>>> fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 2
>>> replace_conf = {'rule': {'find': '\n', 'replace': ' '}}
>>>
>>> flow = (SyncPipe('fetchpage', conf=fetch_conf)
... .strreplace(conf=replace_conf, assign='content')
... .stringtokenizer(conf={'delimiter': ' '}, emit=True)
... .count())
>>>
>>> stream = flow.output
>>> next(stream)
{'count': 70}
Yahoo! Pipes [4] was a user friendly web application used to:
aggregate, manipulate, and mashup content from around the web
Wanting to create custom pipes, I came across pipe2py which translated a
Yahoo! Pipes pipe into python code. pipe2py
suited my needs at the time
but was unmaintained and lacked asynchronous or parallel processing APIs.
riko
addresses the shortcomings of pipe2py
but removed support for
importing Yahoo! Pipes json workflow schemas. riko
contains ~40 built-in
modules, aka pipes
, that allow you to programatically perform most of the
tasks Yahoo! Pipes allowed.
riko
provides a number of benefits / differences from other stream processing
applications such as Huginn, Flink, Spark, and Storm [5]. Namely:
- a small footprint (CPU and memory usage)
- native RSS support
- simple installation and usage
- a pure python library with pypy support
- modular
pipes
to filter, sort, and modifystreams
The subsequent tradeoffs riko
makes are:
- not distributed (able to run on a cluster of servers)
- no GUI for creating
flows
- doesn't continually monitor
streams
for new data - can't react to specific events
- iterator (pull) based so streams only supports a single consumer [6]
The following table summaries these observations:
Framework | Stream Type | Footprint | RSS | simple [7] | async | parallel | CEP [8] | distributed |
---|---|---|---|---|---|---|---|---|
riko | pull | small | √ | √ | √ | √ | ||
pipe2py | pull | small | √ | √ | ||||
Huginn | push | med | √ | [9] | √ | √ | ||
Others | push | large | [10] | [11] | [12] | √ | √ | √ |
For more detailed information, please check-out the FAQ.
[4] | Yahoo discontinued Yahoo! Pipes in 2015, but you can view what remains |
[5] | Huginn, Flink, Spark, and Storm |
[6] | You can mitigate this via the split module |
[7] | Doesn't depend on outside services like MySQL, Kafka, YARN, ZooKeeper, or Mesos |
[8] | Complex Event Processing |
[9] | Huginn doesn't appear to make async web requests |
[10] | Many frameworks can't parse RSS streams without the use of 3rd party libraries |
[11] | While most frameworks offer a local mode, many require integrating with a data ingestor (e.g., Flume/Kafka) to do anything useful |
[12] | I can't find evidence that these frameworks offer an async api (apparently Spark doesn't) |
riko
is intended to be used directly as a Python library.
riko
can fetching streams from both local and remote filepaths via source
pipes
. Each source
pipe
returns a stream
, i.e., an iterator of
dictionaries, aka items
.
>>> from riko.modules.pipefetch import pipe as fetch
>>> from riko.modules.pipefetchsitefeed import pipe as fetchsitefeed
>>>
>>> ### Fetch an rss feed ###
>>> stream = fetch(conf={'url': 'https://news.ycombinator.com/rss'})
>>>
>>> ### Fetch the first rss feed found ###
>>> stream = fetchsitefeed(conf={'url': 'http://www.bbc.com/news'})
>>>
>>> ### View the fetched rss feed(s) ###
>>> #
>>> # Note: regardless of how you fetch an rss feed, it will have the same
>>> # structure
>>> item = next(stream)
>>> sorted(item.keys())
[
'author', 'author.name', 'author.uri', 'comments', 'content',
'dc:creator', 'id', 'link', 'pubDate', 'summary', 'title',
'updated', 'updated_parsed', 'y:id', 'y:published', 'y:title']
>>> item['title'], item['author'], item['link']
(
'Using NFC tags in the car', 'Liam Green-Hughes',
'http://www.greenhughes.com/content/using-nfc-tags-car')
Please see the FAQ for a complete list of supported file types and protocols. Please see Fetching data and feeds for more examples.
riko
can modify streams
by combining any of the 40 built-in pipes
>>> from itertools import chain
>>> from riko import get_path
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. `get_path` just looks up a file in the `data` directory
>>> # 2. the `dotall` option is used to match `.*` across newlines
>>> fetch_conf = {'url': get_path('feed.xml')} # 1
>>> filter_rule = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> sub_conf = {'path': 'content.value'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True} # 2
>>> sort_conf = {'rule': {'sort_key': 'content', 'sort_dir': 'desc'}}
>>>
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> #
>>> # The following flow will:
>>> # 1. fetch the rss feed
>>> # 2. filter for items published before 2/5/2009
>>> # 3. extract the path `content.value` from each feed item
>>> # 4. replace the extracted text with the last href url contained
>>> # within it
>>> # 5. reverse sort the items by the replaced url
>>> # 6. obtain the raw stream
>>> #
>>> # Note: sorting is not lazy so take caution when using this pipe
>>> from riko.lib.collections import SyncPipe
>>>
>>> flow = (SyncPipe('fetch', conf=fetch_conf) # 1
... .filter(conf={'rule': filter_rule}) # 2
... .subelement(conf=sub_conf, emit=True) # 3
... .regex(conf={'rule': regex_rule}) # 4
... .sort(conf=sort_conf)) # 5
>>>
>>> stream = flow.output # 6
>>> next(stream)
{'content': 'mailto:[email protected]'}
Please see Alternate workflow creation for an alternative (function based) method for
creating a stream
. Please see pipes for a complete list of available pipes
.
An example using riko
's parallel API to spawn a ThreadPool
[13]
>>> from riko import get_path
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. `get_path` just looks up a file in the `data` directory
>>> # 2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml') # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True} # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a parallel SyncPipe flow ###
>>> #
>>> # The following flow will:
>>> # 1. fetch the rss feed
>>> # 2. filter for items published before 2/5/2009
>>> # 3. extract the path `content.value` from each feed item
>>> # 4. replace the extracted text with the last href url contained
>>> # within it
>>> # 5. filter for items with local file urls (which happen to be rss
>>> # feeds)
>>> # 6. strip any trailing `\` from the url
>>> # 7. remove duplicate urls
>>> # 8. fetch each rss feed
>>> # 9. merge the feeds into a single stream of items
>>> flow = (SyncPipe('fetch', conf={'url': url}, parallel=True) # 1
... .filter(conf={'rule': filter_rule1}) # 2
... .subelement(conf=sub_conf, emit=True) # 3
... .regex(conf={'rule': regex_rule}) # 4
... .filter(conf={'rule': filter_rule2}) # 5
... .strtransform(conf=strtransform_conf) # 6
... .uniq(conf={'uniq_key': 'strtransform'}) # 7
... .fetch(conf={'url': {'subkey': 'strtransform'}})) # 8
>>>
>>> stream = flow.list # 9
>>> len(stream)
25
To enable asynchronous processing, you must install the async
module.
pip install riko[async]
An example using riko
's asynchronous API.
>>> from twisted.internet.task import react
>>> from twisted.internet.defer import inlineCallbacks
>>> from riko import get_path
>>> from riko.twisted.collections import AsyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. `get_path` just looks up a file in the `data` directory
>>> # 2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml') # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True} # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a AsyncPipe flow ###
>>> #
>>> # See `Parallel processing` above for an explanation of the steps this
>>> # performs
>>> @inlineCallbacks
... def run(reactor):
... flow = yield (AsyncPipe('fetch', conf={'url': url})
... .filter(conf={'rule': filter_rule1})
... .subelement(conf=sub_conf, emit=True)
... .regex(conf={'rule': regex_rule})
... .filter(conf={'rule': filter_rule2})
... .strtransform(conf=strtransform_conf)
... .uniq(conf={'uniq_key': 'strtransform'})
... .fetch(conf={'url': {'subkey': 'strtransform'}}))
...
... stream = flow.list
... print(len(stream))
...
>>> react(run)
25
Please see the cookbook or ipython notebook for more examples.
[13] | You can instead enable a ProcessPool by additionally passing threads=False to SyncPipe , i.e., SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False) . |
(You are using a virtualenv, right?)
At the command line, install riko
using either pip
(recommended)
pip install riko
or easy_install
easy_install riko
Please see the installation doc for more details.
The primary data structures in riko
are the item
and stream
. An item
is just a python dictionary, and a stream
is an iterator of items
. You can
create a stream
manually with something as simple as
[{'content': 'hello world'}]
. You manipulate streams
in
riko
via pipes
. A pipe
is simply a function that accepts either a
stream
or item
, and returns a stream
. pipes
are composable: you
can use the output of one pipe
as the input to another pipe
.
riko
pipes
come in two flavors; operators
and processors
.
operators
operate on an entire stream
at once and are unable to handle
individual items. Example operators
include pipecount
, pipefilter
,
and pipereverse
.
>>> from riko.modules.pipereverse import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'title': 'riko pt. 2'}
processors
process individual items
and can be parallelized across
threads or processes. Example processors
include pipefetchsitefeed
,
pipehash
, pipeitembuilder
, and piperegex
.
>>> from riko.modules.pipehash import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> stream = pipe(item, field='title')
>>> next(stream)
{'title': 'riko pt. 1', 'hash': 2853617420}
Some processors
, e.g., pipestringtokenizer
, return multiple results.
>>> from riko.modules.pipestringtokenizer import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> tokenizer_conf = {'delimiter': ' '}
>>> stream = pipe(item, conf=tokenizer_conf, field='title')
>>> next(stream)
{
'title': 'riko pt. 1',
'stringtokenizer': [
{'content': 'riko'},
{'content': 'pt.'},
{'content': '1'}]}
>>> # In this case, if we just want the result, we can `emit` it instead
>>> stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
>>> next(stream)
{'content': 'riko'}
operators
are split into sub-types of aggregators
and composers
. aggregators
, e.g., pipecount
, combine
all items
of an input stream
into a new stream
with a single item
;
while composers
, e.g., pipefilter
, create a new stream
containing
some or all items
of an input stream
.
>>> from riko.modules.pipecount import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'count': 2}
processors
are split into sub-types of source
and transformer
.
sources
, e.g., pipeitembuilder
, can create a stream
while
transformers
, e.g. pipehash
can only transform items in a stream
.
>>> from riko.modules.pipeitembuilder import pipe
>>>
>>> attrs = {'key': 'title', 'value': 'riko pt. 1'}
>>> next(pipe(conf={'attrs': attrs}))
{'title': 'riko pt. 1'}
The following table summaries these observations:
type | sub-type | input | output | parallelizable? | creates streams? |
operator | aggregator | stream | stream [14] | ||
composer | stream | stream | |||
processor | source | item | stream | √ | √ |
transformer | item | stream | √ |
If you are unsure of the type of pipe
you have, check its metadata.
>>> from riko.modules.pipefetchpage import asyncPipe
>>> from riko.modules.pipecount import pipe
>>>
>>> asyncPipe.__dict__
{'type': 'processor', 'name': 'fetchpage', 'sub_type': 'source'}
>>> pipe.__dict__
{'type': 'operator', 'name': 'count', 'sub_type': 'aggregator'}
The SyncPipe
and AsyncPipe
classes (among other things) perform this
check for you to allow for convenient method chaining and transparent
parallelization.
>>> from riko.lib.collections import SyncPipe
>>>
>>> attrs = [
... {'key': 'title', 'value': 'riko pt. 1'},
... {'key': 'content', 'value': "Let's talk about riko!"}]
>>> flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
>>> flow.list[0]
[
{
'title': 'riko pt. 1',
'content': "Let's talk about riko!",
'hash': 1346301218}]
Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.
[14] | the output stream of an aggregator is an iterator of only 1 item . |
riko
provides a command, runpipe
, to execute workflows
. A
workflow
is simply a file containing a function named pipe
that creates
a flow
and processes the resulting stream
.
flow.py
from __future__ import print_function
from riko.lib.collections import SyncPipe
conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}
def pipe(test=False):
flow = (SyncPipe('itembuilder', conf=conf1, test=test)
.strreplace(conf=conf2))
stream = flow.output
for i in stream:
print(i)
Now to execute flow.py
, type the command runpipe flow
. You should
then see the following output in your terminal:
https://google.co.uk
runpipe
will also search the examples
directory for workflows
. Type
runpipe demo
and you should see the following output:
something...
riko
comes with a built in task manager manage
.
pip install riko[develop]
Run python linter and nose tests
manage lint
manage test
Please mimic the coding style/conventions used in this repo. If you add new classes or functions, please add the appropriate doc blocks with examples. Also, make sure the python linter and nose tests pass.
Please see the contributing doc for more details.
Shoutout to pipe2py for heavily inspiring riko
. riko
started out as a fork
of pipe2py
, but has since diverged so much that little (if any) of the original
code-base remains.
┌── benchmarks
│ ├── __init__.py
│ └── parallel.py
├── bin
│ └── run
├── data/*
├── docs
│ ├── AUTHORS.rst
│ ├── CHANGES.rst
│ ├── COOKBOOK.rst
│ ├── FAQ.rst
│ ├── INSTALLATION.rst
│ └── TODO.rst
├── examples/*
├── helpers/*
├── riko
│ ├── __init__.py
│ ├── lib
│ │ ├── __init__.py
│ │ ├── autorss.py
│ │ ├── collections.py
│ │ ├── dotdict.py
│ │ ├── log.py
│ │ ├── tags.py
│ │ └── utils.py
│ ├── modules/*
│ └── twisted
│ ├── __init__.py
│ ├── collections.py
│ └── utils.py
├── tests
│ ├── __init__.py
│ ├── standard.rc
│ └── test_examples.py
├── CONTRIBUTING.rst
├── dev-requirements.txt
├── LICENSE
├── Makefile
├── manage.py
├── MANIFEST.in
├── optional-requirements.txt
├── py2-requirements.txt
├── README.rst
├── requirements.txt
├── setup.cfg
├── setup.py
└── tox.ini
riko
is distributed under the MIT License.