Skip to content

Instantly share code, notes, and snippets.

@gary136
Last active August 23, 2020 10:07
Show Gist options
  • Save gary136/20970376b341f4199979a4570db5a113 to your computer and use it in GitHub Desktop.
Save gary136/20970376b341f4199979a4570db5a113 to your computer and use it in GitHub Desktop.
twstock.py
import pandas as pd
import requests
import numpy as np
from io import StringIO
import time
import matplotlib.pyplot as plt
import html5lib
from datetime import datetime, timedelta
import calendar
import datetime as dt
from functools import reduce
import json
import os
from collections import Counter
# 小程式
def revr(data, nmbr):
cols = data.columns.tolist()
cols = cols[nmbr:] + cols[:nmbr]
data = data[cols]
return data
def add_months(sourcedate, months):
sourcedate = datetime.strptime(sourcedate, "%Y%m%d").date() if type(sourcedate)==str else sourcedate
month = sourcedate.month - 1 + months
year = sourcedate.year + month // 12
month = month % 12 + 1
day = min(sourcedate.day, calendar.monthrange(year,month)[1])
return dt.date(year, month, day)
def fullpathname(outdir, outname):
if not os.path.exists(outdir):
os.mkdir(outdir)
return os.path.join(outdir, outname)
# class TwFinanceStatement:
class TwFinanceRatio:
# 對照表
m_mapping = {'上市':'sii','上櫃':'otc'}
base_url = 'https://mops.twse.com.tw/mops/web/'
p_mapping = {
# '損益表':base_url+'t163sb04','資產負債表':base_url+'t163sb05',
'營益分析':base_url+'t163sb06'
,'財務結構分析':base_url+'t51sb02'}
old_p_mapping = {
# '損益表':base_url+'t51sb08','資產負債表':base_url+'t51sb07',
'營益分析':base_url+'t51sb06'
,'財務結構分析':base_url+'ajax_t51sb02'}
def __init__(self, mkt_type, year, season, purpose, \
m_mapping=m_mapping, p_mapping=p_mapping, old_p_mapping=old_p_mapping, smp=True):
year = year if year < 1000 else year-1911
self.year = year #yyy
self.season = '0'+str(season) if type(season)==int else season
self.mkt_type = m_mapping[mkt_type] if mkt_type in m_mapping else None
self.purpose = purpose
p_mapping = p_mapping if year>=102 else old_p_mapping
self.url = p_mapping[self.purpose] if self.purpose in p_mapping else None
self.smp = smp
def add_raw(self):
form = {'encodeURIComponent':1,
'step':1,
'firstin':1,
'off':1,
'TYPEK':self.mkt_type,
'year':str(self.year),
'season':self.season
}
form['ifrs']='Y' if self.year>=102 else 'N'
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
r = requests.post(self.url, form, headers=headers)
r.encoding = 'utf8'
dfs = pd.read_html(StringIO(r.text))
self.raw_data = dfs
def unify(self):
if self.purpose=='營益分析':
dfs = [i for i in self.raw_data if i.shape[0]>5]
dfs = [i for i in dfs if i.shape[1]>5]
data = dfs[0]
data.columns = ['公司代號','公司名稱','營業收入(百萬元)','毛利率(%)','營業利益率(%)','稅前純益率(%)','稅後純益率(%)']
data = data[data['公司代號']!='公司代號']
data = data.reset_index().drop('index', axis=1)
data['財報年度'] = self.year+1911 #yyyy
data['季'] = self.season
data = revr(data,-2)
self.data = data
elif self.purpose=='財務結構分析':
dfs = [i for i in self.raw_data if i.shape[0]>5]
dfs = [i for i in dfs if i.shape[1]>5]
data = dfs[0]
data.columns = data.columns.get_level_values(1)
data = data[data['公司代號']!='公司代號']
data = data.reset_index().drop('index', axis=1)
data.rename(columns={'股東權益報酬率(%)':'權益報酬率(%)'
,'不動產、廠房及設備週轉率(次)':'固定資產週轉率(次)'
,'負債佔資產比率(%)':'負債比率(%)'
,'公司簡稱':'公司名稱'}, inplace=True)
data['財報年度'] = self.year+1911 #yyyy
# data['季'] = self.season
data = revr(data,-1)
if self.smp==True :
clms = list(data.columns)
clms = [i for i in clms if i not in \
['平均收現日數','平均售貨日數','平均銷貨日數'
,'長期資金佔不動產、廠房及設備比率(%)','純益率(%)','長期資金佔固定資產比率(%)'
,'應收款項收現日數','稅前純益佔實收資本比率(%)','營業利益佔實收資本比率(%)']]
data = data[clms]
self.data = data
def siiPrice(str_date):
url = f'https://www.twse.com.tw/exchangeReport/MI_INDEX?response=csv&date={str_date}&type=ALL'
r = requests.get(url)
if r.text=='':
return None
df = pd.read_csv(StringIO(r.text.replace("=", "")),
header=["證券代號" in l for l in r.text.split("\n")].index(True)-1)
df.drop(['Unnamed: 16','最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量'], axis=1, inplace=True)
for i in ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '本益比']:
df[i] = df[i].apply(lambda x: pd.to_numeric(x.replace(",", ""), errors='coerce') if type(x)!=float else x)
df['股價日期'] = datetime(int(str_date[:4]),int(str_date[4:6]),int(str_date[6:]),0,0,0)
df = revr(df,-1)
return df
def otcPrice(str_date):
url = f'https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430_result.php?l=zh-tw&o=htm&d={str_date}&se=AL&s=0,asc,0'
r = requests.get(url)
if r.text=='':
return None
df = pd.read_html(StringIO(r.text))[0]
df.columns = df.columns.get_level_values(1)
df.drop(['次日漲停價','次日跌停價'], axis=1, inplace=True)
spl = str_date.split('/')
df['股價日期'] = datetime(int(spl[0])+1911,int(spl[1]),int(spl[2]),0,0,0)
df = revr(df,-1)
return df
def TwPrice(dt, mkt):
# 對照表
mkt_mapping = {'上市':'sii','上櫃':'otc'}
dt_mapping = {}
if '/' not in dt:
dt_mapping['sii_date']=dt
dt_mapping['otc_date']=f'{int(dt[:-4])-1911}/{dt[-4:-2]}/{dt[-2:]}'
else:
dt_mapping['sii_date']=f'{int(dt[:-6])+1911}{dt[-5:-3]}{dt[-2:]}'
dt_mapping['otc_date']=dt
mkt = mkt_mapping[mkt]
dt = dt_mapping[f'{mkt}_date']
price = siiPrice(dt) if mkt=='sii' else otcPrice(dt)
return price
class TwRevenue:
m_mapping = {'上市':'sii','上櫃':'otc'}
base_url = 'https://mops.twse.com.tw/nas/t21/{}/t21sc03_{}_{}_0.html'
def __init__(self, mkt_type, year, month, url=base_url, m_mapping=m_mapping):
self.mkt_type = m_mapping[mkt_type] if mkt_type in m_mapping else None
year = year if year < 1000 else year-1911
self.year = year #yyy
self.month = month
self.url = url.format(self.mkt_type, self.year, self.month)
def add_raw(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
r = requests.get(self.url, headers=headers)
r.encoding = 'big5'
dfs = pd.read_html(StringIO(r.text))
self.raw_data = dfs
def unify(self, smp=True):
data = pd.concat([data for data in self.raw_data if data.shape[1] <= 11 and data.shape[1] > 5]\
,axis=0,ignore_index=True)
if 'levels' in dir(data.columns):
data.columns = data.columns.get_level_values(1)
else:
data = data[list(range(0,10))]
column_index = data.index[(data[0] == '公司代號')][0]
data.columns = data.iloc[column_index]
data['當月營收'] = pd.to_numeric(data['當月營收'], 'coerce')
data = data[~data['當月營收'].isnull()]
data = data[~data['公司代號'].str.contains('合計')]
if '備註' not in data.columns:
data['備註'] = '-'
data.drop_duplicates(subset='公司代號', keep='first', inplace=True)
# 置換例外資料
if (self.year,self.month)==(102,1):
data.replace(to_replace='不適用', value=np.NaN, inplace=True)
data = data.drop('備註', axis=1)
data['年'] = self.year
data['月'] = self.month
if smp==True:
clms = ['年','月','公司代號','公司名稱','當月營收','去年當月營收','去年同月增減(%)']
else:
clms = ['年','月','公司代號','公司名稱','當月營收','去年當月營收','去年同月增減(%)'\
,'上月營收','上月比較增減(%)','前期比較增減(%)','去年累計營收','當月累計營收']
data = data[clms]
data = data.rename(columns={'當月營收':f'{self.month}月營收'
,'去年當月營收':f'去年{self.month}月營收'
,'去年同月增減(%)':f'{self.month}月營收年成長(%)'})
self.data = data
def join_tej(d, fileName='tej.csv'):
tej = pd.read_csv(fileName)
tej = tej[['ID', 'NAME', 'MKT', 'INDUSTRY_NAME_1', 'INDUSTRY_NAME_2']]
tej['ID'] = tej['ID'].astype(str)
d = pd.merge(d, tej, how='inner', left_on='公司代號', right_on='ID')
d = d.drop(['ID', 'NAME'], axis=1)
d.rename(columns = {'MKT':'市場'
,'INDUSTRY_NAME_1':'主產業'
,'INDUSTRY_NAME_2':'次產業'}
, inplace=True)
d['市場'] = d['市場'].apply(lambda x:'上市' if x=='TSE' else '上櫃')
cols = list(d.columns)
cols = cols[:2]+cols[-3:]+cols[2:-3]
d = d[cols]
pe1 = d.groupby('主產業')['本益比'].median().reset_index().rename(columns={'本益比':'PE1 中位數'})
pe2 = d.groupby('次產業')['本益比'].median().reset_index().rename(columns={'本益比':'PE2 中位數'})
d = pd.merge(pd.merge(d,pe1),pe2)
for i in ['PE1 中位數', 'PE2 中位數']:
d[i] = d[i].replace(0, 1e-4)
return d
def join_tej_drv(d, fileName='tej_drv.csv'):
tej = pd.read_csv(fileName)
tej = tej[['ID', 'NAME', 'MKT', 'INDUSTRY_NAME_1', 'INDUSTRY_NAME_2', 'INDUSTRY_SELF_DIFINED']]
tej['ID'] = tej['ID'].astype(str)
d = pd.merge(d, tej, how='inner', left_on='公司代號', right_on='ID')
d = d.drop(['ID', 'NAME'], axis=1)
d.rename(columns = {'MKT':'市場'
,'INDUSTRY_NAME_1':'主產業'
,'INDUSTRY_NAME_2':'次產業'
,'INDUSTRY_SELF_DIFINED':'自定產業'}, inplace=True)
d['市場'] = d['市場'].apply(lambda x:'上市' if x=='TSE' else '上櫃')
cols = list(d.columns)
cols = cols[:2]+cols[-4:]+cols[2:-4]
d = d[cols]
pe1 = d.groupby('主產業')['本益比'].median().reset_index().rename(columns={'本益比':'PE1 中位數'})
pe2 = d.groupby('次產業')['本益比'].median().reset_index().rename(columns={'本益比':'PE2 中位數'})
pe3 = d.groupby('自定產業')['本益比'].median().reset_index().rename(columns={'本益比':'PE3 中位數'})
d = reduce(lambda x,y:pd.merge(x,y), (d,pe1,pe2,pe3))
for i in ['PE1 中位數', 'PE2 中位數', 'PE3 中位數']:
d[i] = d[i].replace(0, 1e-4)
return d
# 營益分析 inner join 財務結構分析
def genRatio(y):
# financeStatement - 年度財務比率數值
x = TwFinanceRatio('上市', y, 4, '營益分析')
x.add_raw()
x.unify()
incm = x.data
x = TwFinanceRatio('上市', y, None, '財務結構分析')
x.add_raw()
x.unify()
baln = x.data
sii = pd.merge(incm[[i for i in incm.columns if i!='季']], baln, how='inner', on=['財報年度', '公司代號', '公司名稱'])
print(f'{y} stage = siiRatio', end=' ')
x = TwFinanceRatio('上櫃', y, 4, '營益分析')
x.add_raw()
x.unify()
incm = x.data
x = TwFinanceRatio('上櫃', y, None, '財務結構分析')
x.add_raw()
x.unify()
baln = x.data
otc = pd.merge(incm[[i for i in incm.columns if i!='季']], baln, how='inner', on=['財報年度', '公司代號', '公司名稱'])
print('otcRatio')
ratio = pd.concat([sii,otc], axis=0, ignore_index=True)
return ratio
# 近3月營收 YOY(%)
def last3MthRvnuGrw(y,last_m=3):
str_last_m = '0'+str(last_m) if last_m<=9 else str(last_m)
last_ymd = f'{y}{str_last_m}01'
YmList = [add_months(last_ymd, i).strftime("%Y%m") for i in range(0,-3,-1)]
r_container = {}
for seq,ym in enumerate(YmList):
y = int(ym[:4])
m = ym[4:]
m = int(m[1]) if m[0]=='0' else int(m)
x = TwRevenue('上市', y, m)
x.add_raw()
x.unify()
sii_r = x.data
if seq==2:
print('siiRvnu', end=' ')
x = TwRevenue('上櫃', y, m)
x.add_raw()
x.unify()
otc_r = x.data
if seq==2:
print('otcRvnu')
r = pd.concat([sii_r,otc_r], axis=0, ignore_index=True)
r = r.drop(['年','月'], axis=1)
r_container[(y,m)]=r
m_list = [i[1] for i in r_container.keys()]
rvnu = reduce(lambda x,y:pd.merge(x,y,on=['公司代號','公司名稱'],how='inner'), list(r_container.values()))
rvnu = reduce(lambda x,y:pd.merge(x,y,on=['公司代號','公司名稱'],how='inner'), list(r_container.values()))
rvnu['近3月營收 平均YOY(%)'] = round((rvnu[f'{m_list[0]}月營收年成長(%)']\
+rvnu[f'{m_list[1]}月營收年成長(%)']+rvnu[f'{m_list[2]}月營收年成長(%)'])/3,2)
rvnu['近3月累計營收'] = rvnu[f'{m_list[0]}月營收']+rvnu[f'{m_list[1]}月營收']+rvnu[f'{m_list[2]}月營收']
rvnu['去年近3月累計營收'] = rvnu[f'去年{m_list[0]}月營收']\
+rvnu[f'去年{m_list[1]}月營收']+rvnu[f'去年{m_list[2]}月營收']
rvnu['近3月累計營收 YOY(%)'] = round((rvnu['近3月累計營收']/rvnu['去年近3月累計營收']-1)*100,2)
rvnu = rvnu[['公司代號','公司名稱','近3月營收 平均YOY(%)','近3月累計營收 YOY(%)']]
return rvnu
# 取得該月首交易日
def FirstTxnDate(eg_yr, mkt, mh=4, dt=1, ft=siiPrice):
cn_yr = eg_yr-1911
mh = str(mh) if mh>=10 else '0'+str(mh)
dt = str(dt) if dt>=10 else '0'+str(dt)
test_ymd = f'{eg_yr}{mh}{dt}'
d_test = ft(test_ymd)
while d_test is None:
d_plus_one = datetime.strptime(test_ymd, "%Y%m%d").date()+ timedelta(days=1)
test_ymd = datetime.strftime(d_plus_one, "%Y%m%d")
d_test = siiPrice(test_ymd)
eg_yr,mh,dt = test_ymd[:4],test_ymd[4:6],test_ymd[6:]
if mkt=='sii':
return f'{eg_yr}{mh}{dt}'
elif mkt=='otc':
return f'{cn_yr}/{mh}/{dt}'
else:
return None
# 取得該月末交易日
def LastTxnDate(eg_yr, mkt, mh=12, dt=31, ft=siiPrice):
cn_yr = eg_yr-1911
mh = str(mh) if mh>=10 else '0'+str(mh)
dt = str(dt) if dt>=10 else '0'+str(dt)
test_ymd = f'{eg_yr}{mh}{dt}'
d_test = ft(test_ymd)
while d_test is None:
d_plus_one = datetime.strptime(test_ymd, "%Y%m%d").date()+ timedelta(days=-1)
test_ymd = datetime.strftime(d_plus_one, "%Y%m%d")
d_test = siiPrice(test_ymd)
eg_yr,mh,dt = test_ymd[:4],test_ymd[4:6],test_ymd[6:]
if mkt=='sii':
return f'{eg_yr}{mh}{dt}'
elif mkt=='otc':
return f'{cn_yr}/{mh}/{dt}'
else:
return None
# 取得該交易日價格
def allPrice(dt):
sii_p = TwPrice(dt, '上市')
sii_p = sii_p[['股價日期', '證券代號', '證券名稱', '收盤價']]
sii_p = sii_p.rename(columns = {'證券代號':'公司代號', '證券名稱':'公司名稱'})
otc_p = TwPrice(dt, '上櫃')
otc_p = otc_p[['股價日期', '代號', '名稱', '收盤']]
otc_p = otc_p.rename(columns = {'代號':'公司代號', '名稱':'公司名稱', '收盤':'收盤價'})
price = pd.concat([sii_p,otc_p])
return price
############### (財務比率 left join 近3月營收 left join 測試日收盤價格) inner join TEJ ###############
def gen(y, fileName=None, tej=True, tej_drv=False):
# financeRatio - 年度財務比率數值
if fileName==None:
ratio = genRatio(y)
else:
ratio = pd.read_csv(fileName)
ratio = ratio[ratio.財報年度==y]
# revenue - 隔年1~3月的營收
rvnu = last3MthRvnuGrw(y+1) # 隔年
# price - 財報+隔年1~3月的營收 揭露後首個交易日的收盤價
dt = FirstTxnDate(eg_yr=y+1, mkt='sii', mh=4, dt=11) # 隔年
price = allPrice(dt)
price = price.drop('股價日期', axis=1).rename(columns = {'收盤價':'年報公佈後收盤價'})
# merge & process data
for i in [ratio,rvnu,price]:
i['公司代號'] = i['公司代號'].astype(str)
data = reduce(lambda x,y:pd.merge(x,y,on=['公司代號','公司名稱'],how='left'), [ratio,rvnu,price])
# data['每股盈餘(元)'] = data['每股盈餘(元)'].fillna(1e-4)
data = data.fillna(1e-4)
def flt(x):
try:
return float(x)
except ValueError:
# return 0
return 1e-4
for i in list(filter(lambda x: x not in ['財報年度', '公司代號', '公司名稱', '股價日期'], data.columns)):
data[i] = data[i].apply(flt)
data['本益比'] = round(data['年報公佈後收盤價'] / data['每股盈餘(元)'], 2)
cols = list(data.columns)
cols = cols[1:3]+cols[:1]+cols[3:]
data = data[cols]
# tej
if tej==True:
data = join_tej(data)
elif tej_drv==True:
data = join_tej_drv(data)
return data
#########################################################################################################
# 計算 YOY
def drv(d_current, d_last, spc_cmpr=None):
cmpr = ['營業收入(百萬元)', '毛利率(%)', '營業利益率(%)', \
'稅後純益率(%)', '資產報酬率(%)', '權益報酬率(%)', '現金再投資比率(%)'] \
if spc_cmpr==None else spc_cmpr
# 營業收入分開處理
names_mapping = {}
for c in [c for c in cmpr if '營業收入' not in c]:
names_mapping[c] = f'{c[:-3]} YOY(%)'
orgn_col = ['公司代號']+cmpr
drv_col = ['公司代號']+['營業收入 YOY(%)']+list(names_mapping.values())
# 計算差異
d_cmpr = pd.merge(d_current[orgn_col], d_last[orgn_col], how='left', on=['公司代號'])
d_cmpr['營業收入 YOY(%)'] = round(((d_cmpr['營業收入(百萬元)_x'] / d_cmpr['營業收入(百萬元)_y'])-1)*100, 2)
for c in names_mapping:
d_cmpr[names_mapping[c]] = d_cmpr[f'{c}_x'] - d_cmpr[f'{c}_y']
d_cmpr = d_cmpr[drv_col]
x = pd.merge(d_current, d_cmpr, how='inner', on=['公司代號'])
x = x.fillna(1e-4)
return x
# 推算還原價格
# etf 股利
class twEtfDiv:
def __init__(self, year, start='0101', end='1231'):
year = year if year > 1000 else year+1911
self.year = year #yyyy
self.start = start
self.end = end
self.url = f"https://www.twse.com.tw/exchangeReport/TWT49U?response=json&strDate={year}{start}&endDate={year}{end}"
self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
def add_raw(self):
r = requests.get(self.url, headers=self.headers)
d = json.loads(r.text)
data = pd.DataFrame(data=d['data'], columns=d['fields'])
data = data[data.股票代號.str.startswith('0')]
self.raw_data = data
def unify(self):
self.raw_data['詳細資料'] = self.raw_data['詳細資料'].apply(lambda x:x.split("'")[1][9:])
self.raw_data.drop(['漲停價格','跌停價格','開盤競價基準','最近一次申報資料 季別/日期',\
'最近一次申報每股 (單位)淨值','最近一次申報每股 (單位)盈餘'], axis=1, inplace=True)
self.raw_data['資料日期'] = self.raw_data['詳細資料'].apply(lambda x:x[-8:])
self.raw_data = self.raw_data[self.raw_data['除權息前收盤價']!=self.raw_data['減除股利參考價']]
def stockDividend(dtl):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
basic_url = 'https://www.twse.com.tw/zh/'
url = basic_url+dtl
r = requests.get(url, headers=headers)
d = pd.read_html(StringIO(r.text))[0]
tgt = d.iloc[4][1].split(' ')[0]
time.sleep(3)
return tgt
def cashDividend(dtl):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
basic_url = 'https://www.twse.com.tw/zh/'
url = basic_url+dtl
r = requests.get(url, headers=headers)
d = pd.read_html(StringIO(r.text))[0]
tgt = d.iloc[2][1].split(' ')[0]
time.sleep(3)
return tgt
x1 = self.raw_data[self.raw_data['權/息']=='息']
x1['現金股利'] = x1['權值+息值']
x1['股票股利'] = 0
x2 = self.raw_data[self.raw_data['權/息']=='權']
x2['現金股利'] = 0
x2['股票股利'] = x2['詳細資料'].apply(stockDividend)
x12 = self.raw_data[self.raw_data['權/息']=='權息']
x12['現金股利']=x12['詳細資料'].apply(cashDividend)
x12['股票股利']=x12['詳細資料'].apply(stockDividend)
d_all = pd.concat([x1,x2,x12],ignore_index=True)
d_all = d_all[['資料日期', '股票代號', '股票名稱', '現金股利', '股票股利', '詳細資料']]
self.data = d_all
# 公司股利
class twDividend:
m_mapping = {'上市':'sii','上櫃':'otc'}
def __init__(self, mkt_type, year, m_mapping=m_mapping):
year = year if year <= 1000 else year-1911
self.mkt_type = m_mapping[mkt_type]
self.year = str(year) #yyy
self.url = f"https://mops.twse.com.tw/mops/web/ajax_t108sb27"
self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
def add_raw(self):
form = {
"step": "1",
"firstin": "1",
"TYPEK": self.mkt_type,
"year": self.year,
"type": "2"
}
r = requests.post(self.url, data=form, headers=self.headers)
self.raw_data = pd.read_html(StringIO(r.text))[0]
def unify(self):
d = self.raw_data
d.columns = d.columns.get_level_values(-1)
d = d[d['公司代號']!='公司代號']
d['資料日期'] = d[['除權交易日' , '除息交易日']].apply(lambda x:x[0] if x[0] is not np.nan else x[1], axis=1)
for i in ['盈餘分配之股東現金股利(元/股)' , '法定盈餘公積、資本公積發放之現金(元/股)', \
'盈餘轉增資配股(元/股)' , '法定盈餘公積、資本公積轉增資配股(元/股)']:
d[i] = d[i].replace(to_replace='-', value=np.nan)
d['現金股利'] = d[['盈餘分配之股東現金股利(元/股)' , '法定盈餘公積、資本公積發放之現金(元/股)']].apply\
(lambda x:float(x[0])+float(x[1]) if x[0] is not np.nan and x[1] is not np.nan else np.nan, axis=1)
d['股票股利'] = d[['盈餘轉增資配股(元/股)' , '法定盈餘公積、資本公積轉增資配股(元/股)']].apply\
(lambda x:(float(x[0])+float(x[1]))*100 if x[0] is not np.nan and x[1] is not np.nan else np.nan, axis=1)
d = d[['資料日期', '公司代號', '公司名稱', '現金股利', '股票股利']]
d = d.fillna(0)
d = d[(d['現金股利']!=0) | (d['股票股利']!=0)]
d = d.drop_duplicates()
def f(x):
x = x.split('/')
return str(int(x[0])+1911)+x[1]+x[2]
d['資料日期'] = d['資料日期'].apply(f)
d['MKT'] = '上市' if self.mkt_type=='sii' else '上櫃'
cols = list(d.columns)
d = d[cols[:1]+cols[-1:]+cols[1:-1]]
self.data = d
# 減資換發新股
class twCptRdc:
m_mapping = {'上市':'sii','上櫃':'otc'}
def __init__(self, mkt_type, year, start='0101', end='1231', m_mapping=m_mapping):
eg_year = year if year > 1000 else year+1911 #yyyy
cn_year = year if year < 1000 else year-1911
self.mkt_type = m_mapping[mkt_type]
if self.mkt_type=='sii':
self.year = eg_year
self.full_start = f'{eg_year}{start}'
self.full_end = f'{eg_year}{end}'
self.url = f'https://www.twse.com.tw/exchangeReport/TWTAUU?response=json&strDate={self.full_start}&endDate={self.full_end}'
elif self.mkt_type=='otc':
self.year = cn_year
self.full_start = f'{cn_year}/{start[:2]}/{start[2:]}'
self.full_end = f'{cn_year}/{end[:2]}/{end[2:]}'
self.url = f'https://www.tpex.org.tw/web/stock/exright/revivt/revivt_result.php?l=zh-tw&d={self.full_start}&ed={self.full_end}&s=0,asc,0&o=csv'
self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
def add_raw(self):
if self.mkt_type=='sii':
r = requests.get(self.url, headers=self.headers)
d = json.loads(r.text)
data = pd.DataFrame(data=d['data'], columns=d['fields'])
data = data.rename(columns={'停止買賣前收盤價格':'停止買賣前價格'})
self.raw_data = data
elif self.mkt_type=='otc':
r = requests.get(self.url, headers=self.headers)
d = pd.read_csv(StringIO(r.text), header = ["股票代號" in l for l in r.text.split("\n")].index(True))
d = d[~d.股票代號.isnull()]
d.股票代號 = d.股票代號.astype(int)
d.股票代號 = d.股票代號.astype(str)
data = d.rename(columns={'恢復買賣日期 ':'恢復買賣日期'
,'最後交易日之收盤價格':'停止買賣前價格'
,'減資恢復買賣開始日參考價格':'恢復買賣參考價'
,'開始交易基準價':'開盤競價基準'})
data['恢復買賣日期'] = data['恢復買賣日期'].apply(lambda x:f'{x[:3]}/{x[3:5]}/{x[5:]}')
self.raw_data = data
def unify(self):
data = self.raw_data
data = data[data['停止買賣前價格']!=data['恢復買賣參考價']]
data.drop(['漲停價格','跌停價格','開盤競價基準','除權參考價'], axis=1, inplace=True)
if self.mkt_type=='sii':
data['詳細資料'] = data['詳細資料'].apply(lambda x:x.split("'")[1][9:])
def rdc(x):
p1, p2, reason, dtl = x[0], x[1], x[2], x[3]
if reason=='彌補虧損':
exchange_ratio,refund = float(p1)/float(p2)*1000,0
else:
stockId = dtl.split('STK_NO=')[1].rstrip()
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'}
basic_url = 'https://www.twse.com.tw/zh/'
url = basic_url+dtl
r = requests.get(url, headers=headers)
d = pd.read_html(StringIO(r.text))[0]
exchange_ratio = float(d.iloc[3][1].split(' ')[0])
refund = float(d.iloc[4][1].split(' ')[0])
time.sleep(4)
return exchange_ratio,refund
tmpDtl = data[['停止買賣前價格','恢復買賣參考價','減資原因','詳細資料']].apply(rdc, axis=1)
elif self.mkt_type=='otc':
# 因較難取得詳細資料簡化計算
def rdc(x):
p1, p2 = x[0], x[1]
exchange_ratio,refund = float(p1)/float(p2)*1000,0
return exchange_ratio,refund
tmpDtl = data[['停止買賣前價格','恢復買賣參考價']].apply(rdc, axis=1)
data['每千股換發股數'] = tmpDtl.apply(lambda x:x[0])
data['每股退還金額'] = tmpDtl.apply(lambda x:x[1])
cols = ['恢復買賣日期', '股票代號', '名稱', '停止買賣前價格', '恢復買賣參考價', '減資原因', '每千股換發股數', '每股退還金額']
data = data[cols]
data = data.rename(columns={'恢復買賣日期':'資料日期'})
for i in ['每千股換發股數', '每股退還金額']:
data[i] = data[i].apply(lambda x:round(x,2))
for i in ['停止買賣前價格', '恢復買賣參考價', '每千股換發股數', '每股退還金額']:
data[i] = data[i].astype(float)
def f(x):
x = x.split('/')
return str(int(x[0])+1911)+x[1]+x[2]
data['資料日期'] = data['資料日期'].apply(f)
data['MKT'] = '上市' if self.mkt_type=='sii' else '上櫃'
cols = list(data.columns)
data = data[cols[:1]+cols[-1:]+cols[1:-1]]
self.data = data
# data - 回測日價格 - 還原價格 - 年報酬 - 同期台灣50之報酬
class RvnCmpt:
def __init__(self,data,data_next,start_date='0401',end_date='0401'):
self.data=data
self.data_next=data_next
self.stm_year=list(self.data.財報年度)[0]
self.test_start_year=self.stm_year+1
self.test_end_year=self.stm_year+2 if self.data_next is not None else self.test_start_year
self.start_date=start_date
self.end_date=end_date
self.start_ymd=f'{self.test_start_year}{self.start_date}'
self.end_ymd=f'{self.test_end_year}{self.end_date}'
def addPrice(self):
if self.data_next is not None:
data = pd.merge(self.data, self.data_next[['公司代號','年報公佈後收盤價']], how='inner', on='公司代號')
data = data[data['年報公佈後收盤價_y']!=0]
data.rename(columns = {'年報公佈後收盤價_x':'年報公佈後收盤價'
,'年報公佈後收盤價_y':'次年年報公佈後收盤價'}, inplace=True)
else:
# get price that day
dt = self.end_ymd # yyyymmdd
sii_dt = dt
otc_dt = f'{(int(dt[:4])-1911)}/{dt[4:6]}/{dt[6:]}'
# scraping
sii_price = siiPrice(sii_dt)
sii_price = sii_price[['證券代號', '證券名稱', '收盤價']]
sii_price = sii_price.rename(columns = {'證券代號':'公司代號', '證券名稱':'公司名稱'})
otc_price = otcPrice(otc_dt)
otc_price = otc_price[['代號', '名稱', '收盤']]
otc_price = otc_price.rename(columns = {'代號':'公司代號', '名稱':'公司名稱', '收盤':'收盤價'})
p = pd.concat([sii_price,otc_price])
p = p[(~p.公司名稱.str.contains('筆')) & (p.收盤價!='----')]
p = p[['公司代號', '收盤價']]
p.收盤價 = p.收盤價.apply(float)
# combine
data = pd.merge(self.data,p,how='inner',on='公司代號')
data = data[data['收盤價']!=0]
data = data.rename(columns = {'收盤價':'測試日收盤價'})
self.data=data
def adjDiv(self):
if self.data_next is not None:
original_price_name = '次年年報公佈後收盤價'
div = pd.concat([pd.read_csv(f'./div/div_{self.test_start_year}.csv')\
,pd.read_csv(f'./div/div_{self.test_end_year}.csv')], ignore_index=True)
div = div[(div['資料日期']>=int(f'{self.test_start_year}{self.start_date}'))\
&(div['資料日期']<int(f'{self.test_end_year}{self.end_date}'))]
else:
original_price_name = '測試日收盤價'
div = pd.read_csv(f'./div/div_{self.test_start_year}.csv')
div = div[(div['資料日期']>=int(f'{self.test_start_year}{self.start_date}'))\
&(div['資料日期']<int(f'{self.test_end_year}{self.end_date}'))]
div = div.rename(columns={'股票代號':'公司代號'})
div = div[['公司代號', '現金股利', '股票股利']]
div.公司代號 = div.公司代號.astype(str)
self.div = div
self.data = pd.merge(self.data,div,how='left',on='公司代號')
self.data['現金股利'] = self.data['現金股利'].fillna(0)
self.data['股票股利'] = self.data['股票股利'].fillna(0)
self.data['還原股利後價格'] = self.data[[original_price_name, '現金股利', '股票股利']].apply\
(lambda x:(x[0]*(1000+x[2])/1000)+x[1], axis=1)
data = self.data.drop_duplicates()
# address companines issueing multiple dividents
mtp_div_companies = [k for (k,v) in Counter(data.公司代號).items() if v > 1]
if mtp_div_companies!=[]:
def recalculate_div(companyId,data):
data_exclude = data[data.公司代號!=companyId]
tgt = data[data.公司代號==companyId]
tgt = tgt.reset_index().drop('index', axis=1)
div_cols = ['公司代號','現金股利','股票股利']
recalculate_cols = ['還原股利後價格']
same_cols = ['公司代號']+list(filter(lambda x:x not in div_cols+recalculate_cols, list(tgt.columns)))
tgt_div = tgt[div_cols]
tgt_div = tgt_div.groupby('公司代號').sum().reset_index()
tgt_same = tgt.loc[[0]][same_cols]
tgt = pd.merge(tgt_same,tgt_div,how='left',on='公司代號')
p = '次年年報公佈後收盤價' if '測試日收盤價' not in same_cols else '測試日收盤價'
tgt['還原股利後價格'] = tgt[[p, '現金股利', '股票股利']].apply\
(lambda x:(x[0]*(1000+x[2])/1000)+x[1], axis=1)
data = pd.concat([data_exclude,tgt],ignore_index=True)
return data
for i in mtp_div_companies:
data = recalculate_div(i,data)
self.data = data
def adjRdc(self):
original_price_name = '還原股利後價格'
if self.data_next is not None:
rdc = pd.concat([pd.read_csv(f'./rdc/rdc_{self.test_start_year}.csv')\
,pd.read_csv(f'./rdc/rdc_{self.test_end_year}.csv')], ignore_index=True)
rdc = rdc[(rdc['資料日期']>=int(f'{self.test_start_year}{self.start_date}'))\
&(rdc['資料日期']<int(f'{self.test_end_year}{self.end_date}'))]
else:
rdc = pd.read_csv(f'./rdc/rdc_{self.test_start_year}.csv')
rdc = rdc[(rdc['資料日期']>=int(f'{self.test_start_year}{self.start_date}'))\
&(rdc['資料日期']<int(f'{self.test_end_year}{self.end_date}'))]
rdc = rdc.rename(columns={'股票代號':'公司代號'})
rdc = rdc[['公司代號', '每千股換發股數', '每股退還金額']]
rdc.公司代號 = rdc.公司代號.astype(str)
self.rdc = rdc
self.data = pd.merge(self.data,rdc,how='left',on='公司代號')
self.data['每千股換發股數'] = self.data['每千股換發股數'].fillna(0)
self.data['每股退還金額'] = self.data['每股退還金額'].fillna(0)
self.data['還原股利及減資價格'] = self.data[[original_price_name, '每千股換發股數', '每股退還金額']].apply\
(lambda x:x[0] if x[1]==0 else x[0]*(x[1]/1000)+x[2], axis=1)
self.data = self.data.drop_duplicates()
def getRvn(self):
self.data = self.data.drop('還原股利後價格', axis=1)
self.data = self.data.rename(columns={'還原股利及減資價格':'還原股價'})
self.data['年報酬(%)'] = round(((self.data['還原股價']/self.data['年報公佈後收盤價'])-1)*100,2)
self.data['還原股價'] = self.data['還原股價'].apply(lambda x:round(x,2))
def get_tw50(self):
etf = pd.read_csv(f'./etf/etf_{self.test_start_year}.csv')
etf = etf[(etf['資料日期']>=int(f'{self.test_start_year}{self.start_date}'))\
&(etf['資料日期']<int(f'{self.test_end_year}{self.end_date}'))]
def index(dt):
d_test = siiPrice(dt)
while d_test is None:
dt = str(int(dt)+1)
d_test = siiPrice(dt)
d = d_test
d = d[d['證券代號']=='0050']
d = list(d.收盤價)[0]
return d
ix1 = index(self.start_ymd)
tw50_div = etf[etf['股票代號']=='0050']
tw50_div = list(tw50_div.現金股利)[0]
ix2 = index(self.end_ymd)
rev = round(((ix2+tw50_div)/ix1 -1)*100, 2)
self.tw50 = ix1,ix2,rev
def cmpr_tw50(self):
self.data['勝過台灣50'] = self.data['年報酬(%)'].apply(lambda x:'Y' if x>self.tw50[2] else 'N')
self.exc_pcnt = round(len([i for i in list(self.data['勝過台灣50'])\
if i!='N']) / len(list(self.data['勝過台灣50'])) * 100, 2)
@staticmethod
def back_testing(x,f):
print(f'財報年度 = {x.stm_year}, 回測期間 = {x.start_ymd}~{x.end_ymd}')
data = f(x.data)
# # address companines issueing multiple dividents
# data = data.drop_duplicates()
# mtp_div_companies = [k for (k,v) in Counter(data.公司代號).items() if v > 1]
# if mtp_div_companies!=[]:
# def recalculate_div(companyId,data,tw_50_rvn = x.tw50[2]):
# data_exclude = data[data.公司代號!=companyId]
# x = data[data.公司代號==companyId]
# x = x.reset_index().drop('index', axis=1)
# div_cols = ['公司代號','現金股利','股票股利']
# recalculate_cols = ['還原股價','年報酬(%)','勝過台灣50']
# same_cols = ['公司代號']+list(filter(lambda x:x not in div_cols+recalculate_cols, list(x.columns)))
# x_div = x[div_cols]
# x_div = x_div.groupby('公司代號').sum().reset_index()
# x = x.loc[[0]]
# x_same = x[same_cols]
# x = pd.merge(x_same,x_div,how='left',on='公司代號')
# p = '次年年報公佈後收盤價' if '測試日收盤價' not in same_cols else '測試日收盤價'
# x['還原股價'] = x[[p, '現金股利', '股票股利']].apply\
# (lambda x:(x[0]*(1000+x[2])/1000)+x[1], axis=1)
# x['年報酬(%)'] = round(((x['還原股價']/x['年報公佈後收盤價'])-1)*100,2)
# x['勝過台灣50'] = x['年報酬(%)'].apply(lambda x:'Y' if x>tw_50_rvn else 'N')
# data = pd.concat([data_exclude,x],ignore_index=True)
# return data
# for i in mtp_div_companies:
# data = recalculate_div(i,data)
rev = np.array(data['年報酬(%)'])
rev = rev[~np.isnan(rev)]
rev = np.array(list(filter(lambda x:x!=np.inf, rev)))
r = round(rev.mean(), 2)
exc_rev = r - x.tw50[2]
exc_pcnt = round(len([i for i in list(data['勝過台灣50'])\
if i!='N']) / len(list(data['勝過台灣50'])) * 100, 2)
print(f'台灣50指數 {x.tw50[0]} -> {x.tw50[1]}, 年報酬 = {x.tw50[2]}%, 報酬>台灣50個股比例 = {x.exc_pcnt}%')
print(f'篩選股數 = {data.shape[0]}')
print(f'年報酬 = {r}%, 超額報酬 = {exc_rev}%, 報酬>台灣50個股比例 = {exc_pcnt}%\n')
return data,r
@staticmethod
def SearchBestCondition(drv_ds, f, graph=True):
prm_list = []
y_list = list(drv_ds.keys())
for y in y_list:
if y!=y_list[-1]:
x = RvnCmpt(drv_ds[y],drv_ds[y+1])
else: # 不完整年度處理
dt = LastTxnDate(y+1, 'sii')[-4:] # 隔年
x = RvnCmpt(drv_ds[y],None,end_date=dt)
x.addPrice()
x.adjDiv()
x.adjRdc()
x.getRvn()
x.get_tw50()
x.cmpr_tw50()
data,r = RvnCmpt.back_testing(x,f)
prm_list.append([data,x.tw50[2],r])
data_array,tw50_array,slct_array = np.array(prm_list)[:,0],np.array(prm_list)[:,1],np.array(prm_list)[:,2]
if graph==True:
plt.figure(figsize=(9,4))
y_array = np.array(list(range(y_list[1],y_list[-1]+2)))
plt.plot(y_array,tw50_array,label='tw50')
plt.plot(y_array,slct_array,label='自選etf')
plt.title('tw50 v.s 自選etf')
plt.xlabel('Year', fontsize=12)
plt.ylabel('報酬率', fontsize=12)
plt.legend()
def hstRevn(t):
return round((reduce(lambda x,y:(100+x)*(100+y), t)/100**len(t)-1)*100, 2)
print('歷史績效:')
print(f'近三年報酬 tw50={hstRevn(tw50_array[-3:])}% v.s 自選etf={hstRevn(slct_array[-3:])}%')
print(f'近五年報酬 tw50={hstRevn(tw50_array[-5:])}% v.s 自選etf={hstRevn(slct_array[-5:])}%')
print(f'近八年報酬 tw50={hstRevn(tw50_array)}% v.s 自選etf={hstRevn(slct_array)}%')
return data_array,tw50_array,slct_array
@staticmethod
def select(d, cdts):
for i in cdts:
d = d[i]
return d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment