Skip to content

Instantly share code, notes, and snippets.

@harrylyx
Created March 1, 2022 03:25
Show Gist options
  • Save harrylyx/a22c59000c38b37aa3c77f3ec5f6419d to your computer and use it in GitHub Desktop.
Save harrylyx/a22c59000c38b37aa3c77f3ec5f6419d to your computer and use it in GitHub Desktop.
import os
import json
import asyncio
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd
import akshare as ak
from tqdm import tqdm, trange
DATA_PATH = pathlib.Path(__file__).parent / "data"
LOG_PATH = pathlib.Path(__file__).parent / "logs/fail.log"
START_DATE = "2000-01-01"
END_DATE = "2022-12-31"
MAX_WORKERS = 16
logger = logging.getLogger() # 不加名称设置root logger
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(pathname)s[line:%(lineno)d] - %(funcName)s - %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 使用FileHandler输出到文件
fh = logging.FileHandler(LOG_PATH)
fh.setLevel(logging.ERROR)
fh.setFormatter(formatter)
# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
# 添加两个Handler
logger.addHandler(ch)
logger.addHandler(fh)
columns = [
"date",
"open",
"close",
"high",
"low",
"volume",
"turnover",
"amplitude",
"quote_change",
"ups_and_downs",
"turnover_rate",
]
def get_stock_daily_hist(symbol, start_date, end_date):
stock_daily_df = ak.stock_zh_a_hist(
symbol=symbol,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="hfq",
)
stock_daily_df.columns = columns
stock_daily_df.date = pd.to_datetime(stock_daily_df.date)
return stock_daily_df
def save_stock_daily_hist(stock_daily_df, symbol):
start_year = datetime.strptime(START_DATE, "%Y-%m-%d").year
end_year = datetime.strptime(END_DATE, "%Y-%m-%d").year
for year in range(start_year, end_year):
tmp_df = stock_daily_df[stock_daily_df["date"].dt.year == year]
if tmp_df.empty:
continue
if not os.path.exists(DATA_PATH / f"stock_hist"):
os.mkdir(DATA_PATH / f"stock_hist")
if not os.path.exists(DATA_PATH / f"stock_hist/{year}"):
os.mkdir(DATA_PATH / f"stock_hist/{year}")
tmp_df.to_hdf(
DATA_PATH / f"stock_hist/{year}/{symbol}.h5",
format="table",
key="day",
mode="a",
)
logging.info(f"{symbol} success")
def update_stock_hist(symbol, year):
file_path = DATA_PATH / f"stock_hist/{year}/{symbol}.h5"
if os.path.exists(file_path):
try:
df = get_stock_daily_hist(symbol, f"{year}0101", f"{year}1231")
except Exception:
logging.error(f"{symbol} {year} failed")
# 存储到hdf5
h5 = pd.HDFStore(file_path, "a")
h5.append("day", df)
h5.close()
logging.info(f"{symbol} success")
def download_stock_hist(symbol, start_date, end_date):
df = get_stock_daily_hist(symbol, start_date, end_date)
save_stock_daily_hist(df, symbol)
async def download_all():
loop = asyncio.get_running_loop()
stock_dict = json.load(open(DATA_PATH / "stock_info/stock_code.json"))
tasks = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
for symbol, name in tqdm(stock_dict.items()):
tasks.append(
loop.run_in_executor(
executor, download_stock_hist, symbol, START_DATE, END_DATE
)
)
await asyncio.gather(*tasks)
async def update_all():
loop = asyncio.get_running_loop()
stock_dict = json.load(open(DATA_PATH / "stock_info/stock_code.json"))
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
tasks = [
loop.run_in_executor(executor, update_stock_hist, symbol, 2022)
for symbol, name in tqdm(stock_dict.items())
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(download_all())
# asyncio.run(update_all())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment