Last active
December 19, 2018 08:56
-
-
Save timfeirg/65b6821af0589bd5d1da51bb70af4a5b to your computer and use it in GitHub Desktop.
scrapy HBase cache storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import happybase | |
import pymysql | |
from impala.dbapi import connect as hive_connect | |
from impala.util import as_pandas | |
from pymysql.cursors import DictCursor | |
from retrying import retry | |
from thriftpy.transport import TTransportException | |
from beehive.settings import HBASE_CONFIG | |
from beehive.utils import decode_bytes, convert_to_string | |
class HBaseClient(object): | |
def __init__(self, hive_config): | |
self.hbase_pool = happybase.ConnectionPool(**hive_config) | |
def write(self, table_name, row_key, data): | |
"""Directly write data to hbase | |
Raises: | |
TTransportException: When shit goes wrong with thrift, there's nothing | |
we can do about it, thrift is a piece of shit. | |
""" | |
with self.hbase_pool.connection() as con: | |
table = con.table(table_name) | |
table.put(row_key, data) | |
@staticmethod | |
def get_modified_time(data): | |
"""Get max timestamp from one hbase row. | |
Args: | |
data (dict): raw hbase row from `table.row`, must include timestamp. | |
Returns: | |
datetime: python datetime object. | |
>>> HBaseClient.get_modified_time({'name': ('some restaurant', 1489147687495)}) | |
datetime.datetime(2017, 3, 10, 20, 8, 7, 495000) | |
""" | |
unix_timestamp = max([t[1] for t in data.values()]) * 0.001 | |
return datetime.fromtimestamp(unix_timestamp) | |
@staticmethod | |
def prepare_data(data, column_family): | |
"""Prepend column family to dict keys, and ensure all values are either | |
bytes or str, if there's numbers among values, try to convert them to | |
str | |
>>> HBaseClient.prepare_data({'name': '良好'}, 'r') | |
{'r:name': '良好'} | |
>>> HBaseClient.prepare_data({'average_cost': 95.0}, 'r') | |
{'r:average_cost': '95.0'} | |
""" | |
return dict( | |
('{}:{}'.format(column_family, k), convert_to_string(v)) | |
for k, v in data.items() | |
) | |
@staticmethod | |
def unprepare_data(data, column_family): | |
"""Remove column_family and timestamps (if present) from dict keys, and | |
try to decode values using utf-8 (not guaranteed to decode though) | |
>>> HBaseClient.unprepare_data({b'column_family:average_cost': (b'95.0', 1489147687495)}, 'column_family') | |
{'average_cost': '95.0'} | |
>>> HBaseClient.unprepare_data({b'column_family:average_cost': b'95.0'}, 'column_family') | |
{'average_cost': '95.0'} | |
""" | |
if not data: | |
return data | |
prefix_length = len(column_family) + 1 | |
random_value = next(iter(data.values())) | |
if isinstance(random_value, tuple): | |
return {decode_bytes(k[prefix_length:]): decode_bytes(t[0]) for k, t in data.items()} | |
return {decode_bytes(k[prefix_length:]): decode_bytes(v) for k, v in data.items()} | |
@retry(stop_max_attempt_number=2, retry_on_exception=lambda e: isinstance(e, TTransportException)) | |
def get_row(self, table_name, row_key, **kwargs): | |
"""Get a single row from hbase, any kwargs will get passed to `table.get`""" | |
with self.hbase_pool.connection() as con: | |
table = con.table(table_name) | |
row = table.row(row_key, **kwargs) | |
return row | |
def scan(self, table_name, **kwargs): | |
with self.hbase_pool.connection() as con: | |
table = con.table(table_name) | |
return table.scan(**kwargs) | |
def delete(self, table_name, row_key, **kwargs): | |
with self.hbase_pool.connection() as con: | |
table = con.table(table_name) | |
table.delete(row_key, **kwargs) | |
hbase = HBaseClient(HBASE_CONFIG) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from scrapy.responsetypes import responsetypes | |
from scrapy.utils.python import to_bytes | |
from scrapy.utils.request import request_fingerprint | |
from six.moves import cPickle as pickle | |
from beehive.db import hbase | |
class HBaseCacheStorage(object): | |
"""Store cache in HBase, TTL is managed by HBase itself (specify TTL when | |
create HBase table) | |
literal blocks:: | |
create 'beehive_cache', { NAME => 'p', COMPRESSION => 'SNAPPY' , TTL => 259200} | |
""" | |
def __init__(self, settings): | |
self.table_name = settings['HBASE_CACHE_TABLE_NAME'] | |
self.column_family = settings['HBASE_CACHE_COLUMN_FAMILY'] | |
def open_spider(self, spider): | |
self.db = hbase | |
def close_spider(self, spider): | |
pass | |
def retrieve_response(self, spider, request): | |
key = self._request_key(request) | |
spider.logger.debug('Retriving cache key %s', key) | |
hbase_data = self.db.get_row(self.table_name, key) | |
if not hbase_data: | |
return # not cached | |
data = pickle.loads(self.db.unprepare_data(hbase_data, self.column_family)['data']) | |
url = data['url'] | |
status = data['status'] | |
headers = data['headers'] | |
body = data['body'] | |
respcls = responsetypes.from_args(headers=headers, url=url) | |
spider.logger.debug('Request cache %s: body type %s, header %s, respcls %s', request, type(data['body']), headers, respcls) | |
try: | |
response = respcls(url=url, headers=headers, status=status, body=body) | |
except TypeError as e: | |
spider.logger.exception(e) | |
from scrapy.shell import Shell | |
import sys | |
sh = Shell(spider.crawler) | |
sh.vars['data'] = data | |
sh.vars['responsetypes'] = responsetypes | |
sh.start(url=data['url'], request=request, spider=spider) | |
sys.exit(1) | |
return response | |
def store_response(self, spider, request, response): | |
"""Writing picklized object into hbase is probably not a good idea, but | |
dealing with scrapy request object is incredibly difficult""" | |
key = self._request_key(request) | |
data = { | |
'status': response.status, | |
'url': response.url, | |
'headers': response.headers, | |
'body': response.body, | |
} | |
wrapped = {'data': pickle.dumps(data, protocol=2)} | |
hbase_data = self.db.prepare_data(wrapped, self.column_family) | |
self.db.write(self.table_name, key, hbase_data) | |
def _request_key(self, request): | |
return to_bytes(request_fingerprint(request)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment