Created
May 17, 2023 11:44
-
-
Save jerryan999/c07e150d4f09503e0a7792ffe2ba3a8d to your computer and use it in GitHub Desktop.
how to use duckdb to query local or s3 files and then do some analysis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import duckdb | |
import json | |
import os | |
import shutil | |
import time | |
import gzip | |
import os | |
conn = duckdb.connect(database="cloudfront-log2.db", read_only=False) | |
# 安装插件,配置S3等 | |
conn.execute(''' | |
INSTALL 'httpfs'; | |
LOAD 'httpfs'; | |
INSTALL json; | |
LOAD json; | |
SET s3_region='ap-northeast-1'; | |
SET s3_access_key_id=''; | |
SET s3_secret_access_key=''; | |
SET enable_progress_bar = true; | |
''') | |
# create table | |
conn.execute(''' | |
CREATE TABLE logs ( | |
date STRING, | |
time STRING, | |
x_edge_location STRING, | |
sc_bytes STRING, | |
c_ip STRING, | |
cs_method STRING, | |
cs_host STRING, | |
cs_uri_stem STRING, | |
sc_status STRING, | |
cs_Referer STRING, | |
cs_User_Agent STRING, | |
cs_uri_query STRING, | |
cs_Cookie STRING, | |
x_edge_result_type STRING, | |
x_edge_request_id STRING, | |
x_host_header STRING, | |
cs_protocol STRING, | |
cs_bytes STRING, | |
time_taken STRING, | |
x_forwarded_for STRING, | |
ssl_protocol STRING, | |
ssl_cipher STRING, | |
x_edge_response_result_type STRING, | |
cs_protocol_version STRING, | |
fle_status STRING, | |
fle_encrypted_fields STRING, | |
c_port STRING, | |
time_to_first_byte STRING, | |
x_edge_detailed_result_type STRING, | |
sc_content_type STRING, | |
sc_content_len STRING, | |
sc_range_start STRING, | |
sc_range_end STRING | |
) | |
''') | |
file_regs = [ | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-24*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-25*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-26*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-27*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-28*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-29*.gz', | |
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-30*.gz', | |
] | |
# insert data | |
for file_reg in file_regs: | |
print("file_reg: ", file_reg) | |
sql = f"INSERT INTO logs SELECT * FROM read_csv('{file_reg}', skip=2, sep='\t', ignore_errors=true, auto_detect=TRUE)" | |
conn.execute(sql) | |
# fetch some data | |
df = conn.execute(f"SELECT * FROM logs limit 10").fetchdf() | |
print(df.head(10)) | |
# total count | |
d = conn.execute(f"SELECT count(*) FROM logs").fetchall() | |
print(d) | |
# group by ( Edge_Location ) and (uri) and order by count desc | |
locations = ['DUB56-P1','DUB2-C1','BAH52-C1','BAH53-C1'] | |
for loc in locations: | |
sql = f"SELECT \ | |
x_edge_location, \ | |
cs_uri_stem, \ | |
count(*) as cnt \ | |
FROM logs \ | |
WHERE x_edge_location ='{loc}' \ | |
GROUP BY 1, 2 \ | |
ORDER BY cnt DESC" | |
df = conn.execute(sql).fetchdf() | |
print('------------------ {} start ---------------------'.format(loc)) | |
print(df.head(30)) | |
print('------------------- {} end ----------------------'.format(loc)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment