Skip to content

Instantly share code, notes, and snippets.

@mserranom
Last active May 7, 2018 16:08
Show Gist options
  • Save mserranom/e1bbb63117a363360b35266844480703 to your computer and use it in GitHub Desktop.
Save mserranom/e1bbb63117a363360b35266844480703 to your computer and use it in GitHub Desktop.
Example for a Blurr transformation schema that uses a Python convention-over-configuration DSL-ish instead of YAML
Type: Blurr:Transform:Streaming
Version: '2018-03-01'
Description: New York Store Exchange Transformations
Name: nyse
Import:
- { Module: datetime, Identifiers: [ datetime ] }
Identity: source.symbol
Time: datetime.strptime(source.date, '%Y-%m-%d')
Stores:
- Type: Blurr:Store:Memory
Name: memory
Aggregates:
- Type: Blurr:Aggregate:Identity
Name: company
Store: memory
Fields:
- Name: country
Value: source.country
- Type: Blurr:Aggregate:Block
Name: stats
Store: memory
Split: source.timestamp.day - stats._time_start.day > 1
When: source.symbol in ['AAPL', 'MSFT', 'GOOG', 'FB']
Fields:
- Name: close
Type: float
Value: source.close
- Name: volatility
Type: float
Value: (float(source.high) / float(source.low)) - 1
- Name: volume
Type: float
Value: source.volume
Type: Blurr:Transform:Window
Version: '2018-03-01'
Name: moving_averages
SourceDTC: nyse
Anchor:
Condition: nyse.stats.volatility < 0.04
Aggregates:
- Type: Blurr:Aggregate:Window
Name: last_3
WindowType: count
WindowValue: -3
Source: nyse.stats
Fields:
- Name: close_avg
Type: float
Value: sum(source.close) / len(source.close)
- Name: max_volatility
Type: float
Value: max(source.volatility)
from datetime import datetime
from blurr.annotations import StreamingDTC, WindowDTC, Identity, Block
from blurr import WindowType
from blurr.store import MemoryStore
from typing import List, Dict
store = MemoryStore()
time = lambda source: datetime.strptime(source.date, '%Y-%m-%d')
streaming_dtc = StreamingDTC(
Identity="source.symbol",
Time=lambda source: datetime.strptime(source.date, '%Y-%m-%d'),
Store=store)
@Identity(DTC=streaming_dtc)
class CompanyInfoAggregate:
def country(source) -> str:
return source.country
@Block(DTC=streaming_dtc,
When=lambda source: source.symbol in ['AAPL', 'MSFT', 'GOOG', 'FB'],
Split=lambda source, block: source.timestamp.day - block._time_start.day > 1)
class StatsAggregate:
def close(source) -> float:
return source.close
def volatility(source) -> float:
return (float(source.high) / float(source.low)) - 1
def volume(source) -> float:
return source.volume
window_dtc = WindowDTC(
source=streaming_dtc,
anchor_condition=lambda source: source.volatility < 0.04)
@Window(DTC=window_dtc,
Source=StatsAggregate,
WindowType=WindowType.count,
WindowValue=-3)
class MovingAveragesWindow:
def close_avg(close : List[float]) -> float:
return sum(close) / len(close)
def close_avg(volatility: List[float]) -> float:
return max(volatility)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment