Last active
April 18, 2022 15:15
-
-
Save KokoseiJ/74250858daf092dffa8124fe2a5323db to your computer and use it in GitHub Desktop.
문갤문학 검색기 v3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Copyright (C) 2022 파이썬 (KokoseiJ) | |
# | |
# mungalparser is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
# | |
import requests | |
from bs4 import BeautifulSoup as bs | |
import re | |
import sys | |
import time | |
import datetime | |
import threading | |
from queue import Queue | |
from functools import cached_property | |
FILE_NAME = "mungalsearch.txt" | |
GALL_ID = "dokidokilc" | |
GALL_URL = "https://gall.dcinside.com/mgallery/board/lists?id={}" | |
GALL_VIEW_URL = "https://gall.dcinside.com/mgallery/board/view/?id={}&no={}" | |
USER_AGENT = "Just_Monika" | |
THREAD_NUM = 16 | |
ERROR_WAIT = 5 | |
PATTERNS = [ | |
re.compile(r"[\[\(]?문갤 ?(?:릴레이 ?)?문학(?:/[^\t]+)?[\]\)]"), | |
re.compile(r"[\[\(]?문예 ?대회[\]\)]"), | |
# re.compile(r"[\[\(]?시인 ?(?:대회)?[\]\)]") | |
] | |
SPLASH = """\ | |
=============== | |
= 문갤문학 검색기 v3 = | |
= by 파이썬 = | |
=============== | |
과도한 사용은 디시인사이드 서버에 부담을 줄 수 있습니다. | |
""" | |
class Gallery: | |
def __init__(self, gallid=GALL_ID, ua=USER_AGENT, threads=THREAD_NUM): | |
self.gallid = gallid | |
self.url = GALL_URL.format(gallid) | |
self.session = requests.session() | |
self.session.headers.update({ | |
"User-Agent": ua | |
}) | |
self.threadnum = threads | |
self.queue = Queue() | |
self.lock = threading.Lock() | |
self.finished = 0 | |
self.current_page = 0 | |
self.threads = [ | |
threading.Thread(target=self.run) | |
for _ in range(threads) | |
] | |
@cached_property | |
def last_page(self): | |
r = self.session.get(f"{self.url}&page=999999999") | |
page = re.search(r"&page=([0-9]+)", r.url).group(1) | |
return int(page) | |
def _get_page(self, page): | |
try: | |
r = self.session.get(f"{self.url}&page={page}") | |
except Exception as e: | |
print( | |
f"\n경고! 페이지 {page} 색인 중 {type(e)} 오류가 발생하였습니다. " | |
f"{ERROR_WAIT}초 후 재시도합니다..." | |
) | |
time.sleep(ERROR_WAIT) | |
return self._get_page(page) | |
return r | |
def get_page(self, page): | |
r = self._get_page(page) | |
soup = bs(r.content, features="lxml") | |
td_list = soup.find_all("tr", {"class": "ub-content us-post"}) | |
return [ | |
GallEntry(self, entry) for entry in td_list | |
if entry['data-type'] != "icon_notice" | |
] | |
def gen_page(self, start=1): | |
for current_page in range(start, self.last_page + 1): | |
yield (current_page, self.get_page(current_page)) | |
def get_current_page(self): | |
with self.lock: | |
page = self.current_page | |
self.current_page += 1 | |
return page | |
def report_finish(self): | |
with self.lock: | |
self.finished += 1 | |
def run(self): | |
while True: | |
page = self.get_current_page() | |
if page > self.last_page: | |
break | |
self.queue.put((page, self.get_page(page))) | |
self.report_finish() | |
return | |
def start(self): | |
[thread.start() for thread in self.threads] | |
def gen_page_threaded(self, start=1): | |
self.current_page = start | |
self.start() | |
while self.finished != self.threadnum or not self.queue.empty: | |
yield self.queue.get() | |
class GallEntry: | |
def __init__(self, gall, entry): | |
self.gall = gall | |
self.entry = entry | |
@cached_property | |
def url(self): | |
return GALL_VIEW_URL.format(self.gall.gallid, self.id) | |
@cached_property | |
def id(self): | |
return int(self._find_text("num")) | |
@cached_property | |
def subject(self): | |
return self._find_text("subject") | |
@cached_property | |
def title(self): | |
return self._find_class("tit").find("a").text.strip() | |
@cached_property | |
def author(self): | |
return self._find_text("writer").strip() | |
@cached_property | |
def nick(self): | |
return self._find_class("writer")['data-nick'] | |
@cached_property | |
def userid(self): | |
return self._find_class("writer")['data-uid'] | |
@cached_property | |
def ip(self): | |
return self._find_class("writer")['data-ip'] | |
@cached_property | |
def is_anon(self): | |
return bool(self.ip) | |
@cached_property | |
def is_halfanon(self): | |
writer_elem = self._find_class("writer") | |
nikcon_elem = writer_elem.find("a", {"class": "writer_nikcon"}) | |
if nikcon_elem is None: | |
return False | |
return nikcon_elem.find("img")['src'].rsplit("/")[-1].startswith("fix") | |
@cached_property | |
def is_gonik(self): | |
return not self.is_anon and not self.is_halfanon | |
@cached_property | |
def nick_format(self): | |
if not self.is_anon: | |
return f"{self.nick} ({self.userid})" | |
else: | |
return f"{self.nick} ({self.ip})" | |
@cached_property | |
def date(self): | |
timestamp = self._find_class("date")['title'] | |
return datetime.datetime.fromisoformat(timestamp) | |
@cached_property | |
def date_format(self): | |
return self.date.strftime("%y/%m/%d %H:%M:%S") | |
@cached_property | |
def read_count(self): | |
return int(self._find_text("count")) | |
@cached_property | |
def recommend(self): | |
return int(self._find_text("recommend")) | |
def _find_class(self, classname): | |
return self.entry.find("td", {"class": f"gall_{classname}"}) | |
def _find_text(self, classname): | |
return self._find_class(classname).text | |
def filter(entry): | |
if entry.subject == "문학": | |
return True | |
for pattern in PATTERNS: | |
if pattern.match(entry.title): | |
return True | |
return False | |
def run(): | |
gall = Gallery() | |
filtered = [] | |
ids = [] | |
total = 0 | |
for pagenum, entries in gall.gen_page_threaded(): | |
print(f"[*] {pagenum} 페이지 처리중...", end="\r") | |
total += len(entries) | |
for entry in entries: | |
if filter(entry) and entry.id not in ids: | |
filtered.append(entry) | |
ids.append(entry.id) | |
return total, filtered | |
def format(num, entry): | |
return f"{num}. {entry.nick_format} - {entry.title} " \ | |
f"| {entry.date_format} | {entry.recommend}\n{entry.url}\n\n" | |
def main(): | |
print(SPLASH) | |
patternstr = "\n".join([f"r\"{pattern}\"" for pattern in PATTERNS]) | |
print( | |
f"갤러리 ID: {GALL_ID}\n" | |
f"User-Agent: {USER_AGENT}\n" | |
f"스레드 개수: {THREAD_NUM}\n\n" | |
f"정규식 검색 패턴:\n", | |
patternstr + "\n" | |
) | |
for i in range(5, 0, -1): | |
print(f"{i}초 뒤 검색을 시작합니다" + "." * (6 - i), end="\r") | |
time.sleep(1) | |
start_time = time.time() | |
strftime = time.strftime("%y/%m/%d %H:%M:%S", time.localtime(start_time)) | |
print(f"\n시작 시간: {strftime}\n") | |
total, filtered = run() | |
elapsed_time = time.time() - start_time | |
elapsed_format = time.strftime("%H:%M:%S", time.gmtime(elapsed_time)) | |
print("\n\n색인이 완료되었습니다.\n") | |
print(f"소요시간: {elapsed_format}") | |
print(f"총 게시글수: {total}, 검색된 게시글: {len(filtered)}") | |
print("정렬을 시작합니다...") | |
filtered.sort(key=lambda x: x.id) | |
print("정렬이 완료되었습니다. 파일을 저장중입니다...") | |
with open(FILE_NAME, "w") as f: | |
f.write( | |
"문갤문학 검색기 v3 by 파이썬\n\n" | |
f"현재 시간: {strftime}\n" | |
"정규식 검색 패턴:\n" + | |
patternstr + "\n\n" | |
) | |
for i, entry in enumerate(filtered): | |
f.write(format(i+1, entry)) | |
print("파일을 저장했습니다. 프로그램을 종료합니다.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment