スクレイピングなどで集めたcsvファイルを分析のためにRDBに入れたいということはよくありますね。 ただし、いちいちインポートしたりbulk insertするのも面倒です。そして、なんなら収集と同時に取り込みも自動でやりたいところ。 そういう場合に役立つ道具がほしかったので作りました。
sawadyrr5/CsvImporter: Easy Csv to RDB Importer.
- 最低限, データソースのcsvファイル名と投入先テーブルを渡すと動作します.
- 取り込み時に列名の変換や, 元データに無い列の追加ができます.(データ基準日など)
- 処理速度対策のためにmultiple INSERTを使っています. ローカル環境で109万行を110~140秒程度で処理します.
DB-APIのconnectionオブジェクトを渡して, インスタンス化します.
import pymssql
from csvimporter import CsvImporter
MyConnection = pymssql.connect(
host="127.0.0.1",
user="hoge",
password="fuga",
database="db"
)
importer = CsvImporter(MyConnection)
.read_csv()
メソッドで行います.
importer.read_csv(file, table, header=False, mapping=None, skiprows=0)
file
: csvファイルパスを指定table
: テーブル名を指定header
: csvファイルのヘッダー有無を指定mapping
: csvファイルのヘッダー名変更を指定skiprows
: 先頭から読み飛ばす行数を指定
headerとmappingの可能な組み合わせは次の通りです.
csvファイルの列順がテーブルの列順と同じであるものとしてINSERTを行います. csvファイルとテーブルの列順が異なる場合は正常に動作しません.
指定できません(エラー)
csvファイルのヘッダーをもとにINSERTを行います. csvファイルとテーブルの列順が異なっていても対応できます. csvファイルの列とテーブル列に不一致がある場合は正常に動作しません.
csvファイルのヘッダーをmappingにしたがって変換した列名でINSERTを行います. csvファイルとテーブルの列順が異なっていても対応できます. 変換は{"旧列名": '新列名'}の形式で辞書を渡して行います. 変換後の列とテーブル列に不一致がある場合は正常に動作しません. 新列名が重複する場合はエラーとなります. mappingのkeyに無い列は無視されます.
.read_csv()
でファイルを読み込み後、列を追加することができます.
importer.add_column(option, name, type_code, value=None)
option
: 追加モードを指定(現在'f'(固定値)のみ対応)name
: 列名を指定type_code
: データ型を指定(1=char/varchar, 3=int/float, 4=datetime, 5=decimal/numeric)value
: 値を指定
例えば, 以下のようにすると, INSERT時に固定値の基準日列を付加することができます. 日次配信されるcsvデータに基準日を付けて保存するような場合に便利です.
importer.add_column(option='f', name="基準日", type_code=4, value='2018-02-11')
SQLを発行してRDBに書き込みを行います. 内部的にmultiple insertを利用しており, ローカル環境では109万行を110~140秒程度で処理できました.
importer.execute()
まとめると, 以下のようになります.
import pymssql
from csvimporter import CsvImporter
MyConnection = pymssql.connect(
host="127.0.0.1",
user="hoge",
password="fuga",
database="db"
)
file = 'hoge.csv' # ヘッダ有 列A/B/Cから成る
table = '[db].[dbo].[fuga]' # 列A/B/D/基準日から成る(列Dの内容はcsvの列Cと同じ)
mapping = {
"C": 'D'
}
importer = CsvImporter(MyConnection)
importer.read_csv(file, table, True, mapping)
importer.add_column(option='f', name="基準日", type_code=4, value='2018-02-11')
importer.execute()