Last active
February 19, 2022 19:59
-
-
Save goraj/9e4c3ea2d4908005ec33a6c52e213ee6 to your computer and use it in GitHub Desktop.
nautilus parquet example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from nautilus_trader.backtest.data.wranglers import BarDataWrangler | |
from nautilus_trader.persistence.external.readers import ParquetReader | |
from nautilus_trader.persistence.catalog import DataCatalog | |
from functools import partial | |
from nautilus_trader.core.datetime import dt_to_unix_nanos | |
from nautilus_trader.model.c_enums.bar_aggregation import BarAggregation | |
from nautilus_trader.model.data.bar import Bar, BarType, BarSpecification | |
from nautilus_trader.model.enums import AggregationSource, PriceType | |
from nautilus_trader.persistence.external.core import process_files, write_objects | |
from nautilus_trader.model.identifiers import InstrumentId | |
from nautilus_trader.model.identifiers import Symbol | |
from nautilus_trader.model.identifiers import Venue | |
from nautilus_trader.model.instruments.equity import Equity | |
from nautilus_trader.model.objects import Money | |
from nautilus_trader.model.objects import Price | |
from nautilus_trader.model.objects import Quantity | |
from nautilus_trader.model.currencies import USD | |
from nautilus_trader.persistence.external.core import write_objects | |
from pathlib import Path | |
import os, shutil | |
def make_equity_instrument(name, exchange): | |
symbol=Symbol(name) | |
venue=Venue(exchange) | |
return Equity( | |
instrument_id=InstrumentId(symbol=symbol, venue=venue), | |
native_symbol=symbol, | |
currency=USD, | |
price_precision=2, | |
price_increment=Price.from_str("0.01"), | |
multiplier=Quantity.from_int(1), | |
lot_size=Quantity.from_int(1), | |
isin=None, | |
ts_event=0, | |
ts_init=0, | |
) | |
def parser(data, instrument): | |
bar_type = BarType( | |
instrument.id, | |
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST), | |
AggregationSource.EXTERNAL, # Bars are aggregated in the data already | |
) | |
wrangler = BarDataWrangler(bar_type, instrument) | |
bars = wrangler.process(data.set_index("datetime")) | |
yield from bars | |
CATALOG_PATH = os.getcwd() + "/catalog" | |
# Clear if it already exists, then create fresh | |
if os.path.exists(CATALOG_PATH): | |
shutil.rmtree(CATALOG_PATH) | |
os.mkdir(CATALOG_PATH) | |
DATA_DIR = Path("/mnt/e/equity/universe/") | |
catalog = DataCatalog(CATALOG_PATH) | |
exchange = "NYSE" | |
tickers = ["SPY", "XBI"] | |
for ticker in tickers: | |
instrument = make_equity_instrument(ticker, exchange) | |
process_files( | |
glob_path=DATA_DIR / f"{instrument.native_symbol}.parquet", | |
reader=ParquetReader(parser=partial(parser, instrument=instrument)), | |
catalog=catalog, | |
) | |
write_objects(catalog, [instrument]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment