Skip to content

Instantly share code, notes, and snippets.

View qharlie's full-sized avatar

charlie qharlie

  • USA
View GitHub Profile
C:\Users\charl\anaconda3\envs\prql\python.exe C:/Users/charl/workspace/prql/python/src/main.py
********************************************************************************
Parsing /../resource/expressions.prql
********************************************************************************
from otb_table
left_join some_other_table [otb_id=some_id]
select [
otb_name,
some_other_name,
def memoize(f):
""" Memoization decorator for a function taking one or more arguments. """
class memodict(dict):
def __getitem__(self, *key):
return dict.__getitem__(self, key)
def __missing__(self, key):
ret = self[key] = f(*key)
return ret
@qharlie
qharlie / main.py
Created June 26, 2020 06:09
Trying to visualize the Environment in evaluate()
# Copyright 2018 Tensorforce Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@qharlie
qharlie / NATs-Replay.py
Created August 20, 2019 02:25 — forked from bdowling/NATs-Replay.py
These are just some sample scripts to dump the nats stream and replay it to a local NATs server.
#!/usr/bin/env python
from nats.aio.client import Client as NATS
import json
import asyncio
import logging
import argparse
import gzip
import glob
import time
import util.data as btu
from strategy import SMAStrategy
from joblib import Parallel, delayed
tickers = btu.get_nasdaq100()
def test_ticker(ticker):
cash = 100000
c = btu.run_backtest(SMAStrategy, ticker, '2019-05-01', '2019-05-14',cash)
percent_gain = ((c.broker.getvalue() - cash) / cash) * 100
print('Final Portfolio Value: {} for {} executing {} orders ( {}% )'.format(c.broker.getvalue(), ticker, len(c.broker.orders),percent_gain ))
import backtrader as bt
import coloredlogs, logging
logger = logging.getLogger(__name__)
coloredlogs.install(fmt='%(name)s - %(levelname)s %(message)s', level='DEBUG')
class BaseStrategy(bt.Strategy):
def log(self, txt, dt=None):
import backtrader as bt
from strategy import BaseStrategy
class SMAStrategy(BaseStrategy):
def __init__(self):
super().__init__()
self.sma_fast = bt.ind.ExponentialMovingAverage(period=10)
self.sma_slow = bt.ind.ExponentialMovingAverage(period=30)
import backtrader as bt
from collections import defaultdict
class KitchenSinkAnalyzer(bt.Analyzer):
params = dict(sma_period=10, slow_sma_period=30, rsi_period=14, mean_period=10, moment_period=12, percent_period=15)
def start(self):
self.smas = {data: bt.indicators.SMA(data, period=self.p.sma_period)
for data in self.datas}
import backtrader as bt
from collections import defaultdict
class KitchenSinkAnalyzer(bt.Analyzer):
params = dict(sma_period=10, slow_sma_period=30, rsi_period=14, mean_period=10, moment_period=12, percent_period=15)
def start(self):
self.smas = {data: bt.indicators.SMA(data, period=self.p.sma_period)
for data in self.datas}
import backtrader as bt
import util.data as btu
import os
def run_stock_scanner(tickers, _from, _to, analyzers):
cerebro = bt.Cerebro()
for ticker in tickers:
DataFactory = btu.get_store().getdata