Created
October 5, 2022 03:39
-
-
Save thevar1able/fdd5925f8f42079f6edfe7c44c569c3b to your computer and use it in GitHub Desktop.
2ch webm scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sqlite3 | |
import requests | |
from collections import defaultdict | |
def create_schema(): | |
conn = sqlite3.connect('db.sqlite') | |
c = conn.cursor() | |
c.execute('''CREATE TABLE posts | |
(num integer unique, parent integer, op integer, sticky integer, closed integer, endless integer, banned integer, timestamp integer, lasthit integer, name text, trip text, email text, subject text, comment text, tags text)''') | |
c.execute('''CREATE TABLE files | |
(num integer, name text, fullname text, displayname text, path text, thumbnail text, md5 text unique, type integer, size integer, width integer, height integer, tn_width integer, tn_height integer, nsfw integer)''') | |
conn.commit() | |
conn.close() | |
def load_thread(thread_id='46440864'): | |
post_counter, file_counter = 0, 0 | |
conn = sqlite3.connect('db.sqlite') | |
c = conn.cursor() | |
r = requests.get('https://2ch.hk/po/res/{}.json'.format(thread_id)) | |
for post in r.json()['threads'][0]['posts']: | |
post = defaultdict(lambda: None, post) | |
c.execute('''SELECT num FROM posts WHERE num=?''', (post['num'],)) | |
if c.fetchone(): | |
continue | |
c.execute('''INSERT INTO posts VALUES | |
(:num, :parent, :op, :sticky, :closed, :endless, :banned, :timestamp, :lasthit, :name, :trip, :email, :subject, :comment, :tags)''', post) | |
post_counter += 1 | |
if not post['files']: | |
continue | |
for file in post['files']: | |
file = defaultdict(lambda: None, file) | |
c.execute('''SELECT md5 FROM files WHERE md5=?''', (file['md5'],)) | |
if c.fetchone(): | |
continue | |
c.execute('''INSERT OR IGNORE INTO files VALUES | |
(:num, :name, :fullname, :displayname, :path, :thumbnail, :md5, :type, :size, :width, :height, :tn_width, :tn_height, :nsfw)''', file) | |
file_counter += 1 | |
conn.commit() | |
conn.close() | |
print('Loaded {} posts'.format(len(r.json()['threads'][0]['posts']))) | |
print('Added {} posts, {} files'.format(post_counter, file_counter)) | |
def download_file(file_name): | |
r = requests.get('https://2ch.hk{}'.format(file_name), stream=True) | |
with open('webm{}'.format(file_name), 'wb') as f: | |
for chunk in r.iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
print('Downloaded {}'.format(file_name)) | |
def get_files(): | |
conn = sqlite3.connect('db.sqlite') | |
c = conn.cursor() | |
c.execute('''SELECT path FROM files''') | |
return [i[0] for i in c.fetchall()] | |
def get_webms(): | |
filetypes = ('.webm', '.mp4') | |
return [i for i in get_files() if i.endswith(filetypes)] | |
def form_aria2c_download_file(): | |
# filter already downloaded files | |
webms = get_webms() | |
webms = [i for i in webms if not os.path.isfile('webm{}'.format(i))] | |
with open('aria2c.txt', 'w') as f: | |
for webm in webms: | |
f.write('https://2ch.hk{}\n'.format(webm)) | |
if __name__ == '__main__': | |
conn = sqlite3.connect('db.sqlite') | |
c = conn.cursor() | |
c.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='posts' ''') | |
if not c.fetchone(): | |
create_schema() | |
load_thread() | |
form_aria2c_download_file() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment