Last active
June 4, 2022 10:22
-
-
Save kamoo1/af655f05700eb76bb29aec876493ed90 to your computer and use it in GitHub Desktop.
get complete review from google play store (>4400 limit)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from enum import Enum | |
from utils.logging import get_logger | |
logger = get_logger(__name__) | |
class Pattern(Enum): | |
# OPTIONAL = 0 | |
SINGLE = 1 | |
LIST = 2 | |
class Parser(object): | |
def __init__(self, raw): | |
self._items = raw | |
self._cursor = 0 | |
self._result = dict() | |
pass | |
def _next_item(self): | |
ret = self._items[self._cursor] | |
self._cursor += 1 | |
return ret | |
def _remaining_item_count(self): | |
return len(self._items) - self._cursor | |
def _parse_field(self, key, pattern, _type): | |
if pattern is Pattern.SINGLE: | |
if not self._remaining_item_count(): | |
# Field not exist. | |
self._result[key] = None | |
else: | |
item = self._next_item() | |
self._result[key] = None if item is None else _type(item).parse() | |
else: | |
self._result[key] = list() | |
cursor_list = self._next_item() if self._remaining_item_count() else [] | |
if cursor_list is None: | |
cursor_list = [] | |
for item in cursor_list: | |
self._result[key].append(_type(item).parse()) | |
def parse(self): | |
fields = {k: v for k, v in self.__class__.__dict__.items() if not k.startswith("_")} | |
for key, pattern_and_type in fields.items(): | |
if isinstance(pattern_and_type, tuple): | |
pattern, _type = pattern_and_type | |
else: | |
pattern, _type = (pattern_and_type, Value) | |
self._parse_field(key, pattern, _type) | |
if self._remaining_item_count(): | |
logger.debug( | |
"item left un-parsed\n" | |
"self_type : {_class}\n" | |
"item left :\n" | |
"{items}".format( | |
_class=self.__class__.__name__, | |
items=self._items[self._cursor:] | |
) | |
) | |
return self._result | |
class Value(Parser): | |
def parse(self): | |
return self._items | |
class TokenInfo(Parser): | |
unknown = Pattern.SINGLE | |
token = Pattern.SINGLE | |
class PictureInfo(Parser): | |
unknown1 = Pattern.SINGLE | |
array_offset = Pattern.SINGLE | |
unknown2 = Pattern.SINGLE | |
array = Pattern.LIST | |
class UserInfo1(Parser): | |
user_name = Pattern.SINGLE | |
user_avatar_info = (Pattern.SINGLE, PictureInfo) | |
class TimeInfo(Parser): | |
timestamp = Pattern.SINGLE | |
unknown = Pattern.SINGLE | |
class AvatarInfoWrapper(Parser): | |
user_avatar_info = (Pattern.SINGLE, PictureInfo) | |
unknown = Pattern.SINGLE | |
class UserInfo2(Parser): | |
user_id = Pattern.SINGLE | |
user_name = Pattern.SINGLE | |
unknown = Pattern.SINGLE | |
user_avatar_info_wrapper = (Pattern.SINGLE, AvatarInfoWrapper) | |
user_banner_info = (Pattern.SINGLE, PictureInfo) | |
class FeatureInfo(Parser): | |
feature_name = Pattern.SINGLE | |
feature_scores_0 = Pattern.LIST | |
feature_scores_1 = Pattern.LIST | |
class FeaturesInfoWrapper(Parser): | |
features_info = (Pattern.LIST, FeatureInfo) | |
class ReplyInfo(Parser): | |
reply_title = Pattern.SINGLE | |
reply_content = Pattern.SINGLE | |
reply_time_info = (Pattern.SINGLE, TimeInfo) | |
class ReviewInfo(Parser): | |
review_id = Pattern.SINGLE | |
user_info1 = (Pattern.SINGLE, UserInfo1) | |
review_app_rating = Pattern.SINGLE | |
review_title = Pattern.SINGLE | |
review_content = Pattern.SINGLE | |
review_time_info = (Pattern.SINGLE, TimeInfo) | |
review_rating = Pattern.SINGLE | |
reply_info = (Pattern.SINGLE, ReplyInfo) | |
unknown1 = Pattern.SINGLE | |
user_info2 = (Pattern.SINGLE, UserInfo2) | |
app_version = Pattern.SINGLE | |
unknown2 = Pattern.SINGLE | |
features_info_wrapper = (Pattern.SINGLE, FeaturesInfoWrapper) | |
unknown3 = Pattern.SINGLE | |
unknown4 = Pattern.LIST | |
class ReviewsResponse(Parser): | |
reviews_info = (Pattern.LIST, ReviewInfo) | |
token_info = (Pattern.SINGLE, TokenInfo) | |
def req_reviews(app_package_name, app_locale, token="null", count=100, sort=1): | |
""" | |
:param app_package_name: | |
:param app_locale: | |
:param token: token needed for page > 1, each response contains token for next page | |
:param count: page size | |
:param sort: 1 = most hopeful, 2 = newest , 3 = rating | |
:return: url and kwargs for Scrapy request | |
""" | |
lang = app_locale.split("_")[0] | |
url = "https://play.google.com/_/PlayStoreUi/data/batchexecute?hl={lang}".format(lang=lang) | |
if token != "null": | |
token = '\\"' + token + '\\"' | |
magic = '[[["UsvDTd","[null,null,[2,' + str(sort) + \ | |
',[' + str(count) + \ | |
',null,' + token + \ | |
']],[\\"' + app_package_name + \ | |
'\\",7]]",null,"generic"]]]' | |
kwargs = { | |
"formdata": {"f.req": magic}, | |
"method": "POST", | |
"headers": {"accept-encoding": "gzip, deflate, br"} | |
} | |
return url, kwargs | |
def rsp_reviews(body): | |
cur = json.loads(body[body.index("[["):])[0][2] | |
cur = json.loads(cur) | |
parse = ReviewsResponse(cur).parse() | |
reviews_info = parse["reviews_info"] | |
next_token = parse["token_info"]["token"] if parse["token_info"] else None | |
return reviews_info, next_token |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hi. can you show an example code of fetching a package's reviews
Thanks