Created
October 25, 2017 05:54
-
-
Save oiehot/9ca3a0f21ab5636bb6506c4e6f2cb061 to your computer and use it in GitHub Desktop.
clien board watcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import threading | |
import datetime | |
import re | |
import sqlite3 | |
import requests | |
import log | |
from urllib.parse import urljoin, urlparse | |
from bs4 import BeautifulSoup | |
from telegram.bot import Bot | |
class ClienWatcher: | |
def __init__(self, bot, db_path, table_name, url, update_interval_sec, match_func): | |
self.bot = bot | |
self.db_path = db_path | |
self.db_table_name = table_name | |
self.url = url | |
self.update_interval_sec = update_interval_sec | |
self.match_func = match_func | |
self.active = False | |
self._init_db() | |
def _init_db(self): | |
'DB를 초기화한다' | |
conn = sqlite3.connect(self.db_path) | |
cur = conn.cursor() | |
q = 'create table if not exists %s(Date TEXT, Subject TEXT, Contents TEXT, Sent INT)' % (self.db_table_name) | |
cur.execute(q) | |
cur.close() | |
conn.close() | |
def start(self): | |
'감시를 시작한다' | |
if not self.active: | |
self.active = True | |
self._update() | |
def stop(self): | |
'감시를 중단한다' | |
if self.active: | |
self.active = False | |
def status(self): | |
log.info('db_path: %s' % self.db_path) | |
log.info('db_table_name: %s' % self.db_table_name) | |
log.info('watching url: %s' % self.url) | |
log.info('update interval sec: %d' % self.update_interval_sec) | |
def _update(self): | |
'현재 페이지를 크롤링하고 매치되면 추적 단계로 보낸다' | |
if not self.active: | |
return | |
resp = requests.get(self.url) | |
if resp.status_code == 200: | |
html = resp.text | |
soup = BeautifulSoup(html, 'html.parser') | |
items = soup.select('div.item') | |
for item in items: | |
subject_elem = item.select('a.list-subject')[0] | |
subject = subject_elem.string.strip() | |
contents_url = subject_elem['href'] | |
date = item.select('span.timestamp')[0].string.strip() | |
if self.match_func(date, subject): | |
self._on_detect(date, subject, contents_url) | |
threading.Timer(self.update_interval_sec, self._update).start() # 다음 업데이트 예약 | |
def _is_relative_url(self, url): | |
'주어진 url이 상대경로인가 절대경로인가?' | |
if not bool(urlparse(url).netloc): | |
return True | |
else: | |
return False | |
def _get_contents_from_url(self, url): | |
'게시물에서 문자 컨텐츠만 추출한다' | |
if self._is_relative_url(url): | |
url = urljoin(self.url, url) | |
resp = requests.get(url) | |
if resp.status_code == 200: | |
html = resp.text | |
soup = BeautifulSoup(html, 'html.parser') | |
doc = soup.select('div.post-article') | |
plain_text_contents = doc[0].select('body')[0].get_text() | |
return plain_text_contents | |
return '' | |
def _summary(self, contents, length=140): | |
'문장을 짧게 줄이고 줄넘김을 제거한다' | |
summary = '' | |
if len(contents) > length: | |
summary = contents[0:length] | |
else: | |
summary = contents | |
summary = summary.replace('\n', ' ') | |
return summary | |
def _on_detect(self, date, subject, contents_url): | |
'게시물이 추적대상인 경우에 호출된다' | |
conn = sqlite3.connect(self.db_path) | |
cur = conn.cursor() | |
q = 'select count(*) from %s' % (self.db_table_name) + ' where Date=? and Subject=?' | |
cur.execute(q, (date, subject)) | |
(count, ) = cur.fetchone() | |
new_post = count <= 0 | |
if new_post: | |
contents = self._get_contents_from_url(contents_url) # 본문 내용을 읽어온다 | |
# db에 저장 | |
q = 'insert into %s(Date, Subject, Contents, Sent)' % (self.db_table_name) + ' values (?, ?, ?, ?)' | |
cur.execute(q, (date, subject, contents, 1) ) | |
log.info('[INSERT] date: "%s", subject: "%s"' % (date, subject)) | |
conn.commit() | |
# 메시지 전송 | |
bot.broadcast('제목: %s\n%s (...)' % (subject, self._summary(contents))) | |
cur.close() | |
conn.close() | |
def keyword_match_func(date, subject, contents=None): | |
'제목에 특정 키워드가 들어있으면 True' | |
keywords = ['.*토르.*'] | |
for keyword in keywords: | |
if bool(re.match(keyword, subject)): | |
return True | |
return False | |
if __name__ == '__main__': | |
db_path = 'd:/project/a/src/db/clien.db' | |
interval = 60 # sec | |
bot = Bot(token='000000000:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', db_path=db_path, update_interval_sec=interval) | |
bot.status() | |
bot.start() | |
watcher = ClienWatcher(bot=bot, db_path=db_path, table_name='board_park', url='https://www.clien.net/service/board/park', update_interval_sec=interval, match_func=keyword_match_func) | |
watcher.status() | |
watcher.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment