Created
September 3, 2022 13:22
-
-
Save eclipseo/8c499b2a75b0326aadbe4f9d66a1afae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import html | |
import os | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
from typing import Any, Union | |
from urllib.parse import urlparse | |
import helpers.main_helper as main_helper | |
from apis.onlyfans.classes.auth_model import create_auth | |
from apis.onlyfans.classes.hightlight_model import create_highlight | |
from apis.onlyfans.classes.message_model import create_message | |
from apis.onlyfans.classes.post_model import create_post | |
from apis.onlyfans.classes.story_model import create_story | |
from apis.onlyfans.classes.user_model import create_user | |
from apis.onlyfans.onlyfans import start | |
from classes.prepare_metadata import prepare_reformat | |
from modules.module_streamliner import StreamlinedDatascraper | |
class OnlyFansDataScraper(StreamlinedDatascraper): | |
def __init__(self, api: start) -> None: | |
self.api = api | |
StreamlinedDatascraper.__init__(self, self) | |
# Scrapes the API for content | |
async def media_scraper( | |
self, | |
post_result: Union[create_story, create_post, create_message], | |
subscription: create_user, | |
formatted_directory: Path, | |
api_type: str, | |
): | |
authed = subscription.get_authed() | |
api = authed.api | |
site_settings = api.get_site_settings() | |
if not site_settings: | |
return | |
new_set: dict[str, Any] = {} | |
new_set["content"] = [] | |
directories: list[Path] = [] | |
if api_type == "Stories": | |
pass | |
if api_type == "Archived": | |
pass | |
if api_type == "Posts": | |
pass | |
if api_type == "Messages": | |
pass | |
download_path = formatted_directory | |
model_username = subscription.username | |
date_format = site_settings.date_format | |
locations = self.media_types | |
for media_type, alt_media_types in locations.__dict__.items(): | |
date_today = datetime.now() | |
master_date = datetime.strftime(date_today, "%d-%m-%Y %H:%M:%S") | |
file_directory_format = site_settings.file_directory_format | |
post_id = post_result.id | |
new_post = {} | |
new_post["medias"] = [] | |
new_post["archived"] = False | |
rawText = "" | |
text = "" | |
previews = [] | |
date = None | |
price = None | |
if isinstance(post_result, create_story): | |
date = post_result.createdAt | |
if isinstance(post_result, create_post): | |
if post_result.isReportedByMe: | |
continue | |
rawText = post_result.rawText | |
text = post_result.text | |
previews = post_result.preview | |
date = post_result.postedAt | |
price = post_result.price | |
new_post["archived"] = post_result.isArchived | |
if isinstance(post_result, create_message): | |
if post_result.isReportedByMe: | |
continue | |
text = post_result.text | |
previews = post_result.previews | |
date = post_result.createdAt | |
price = post_result.price | |
if api_type == "Mass Messages": | |
media_user = post_result.fromUser | |
media_username = media_user.username | |
if media_username != model_username: | |
continue | |
final_text = rawText if rawText else text | |
if date == "-001-11-30T00:00:00+00:00": | |
date_string = master_date | |
date_object = datetime.strptime(master_date, "%d-%m-%Y %H:%M:%S") | |
else: | |
if not date: | |
date = master_date | |
date_object = datetime.fromisoformat(date) | |
date_string = date_object.replace(tzinfo=None).strftime( | |
"%d-%m-%Y %H:%M:%S" | |
) | |
master_date = date_string | |
new_post["post_id"] = post_id | |
new_post["user_id"] = subscription.id | |
if isinstance(post_result, create_message): | |
new_post["user_id"] = post_result.fromUser.id | |
new_post["text"] = final_text | |
new_post["postedAt"] = date_string | |
new_post["paid"] = False | |
new_post["preview_media_ids"] = previews | |
new_post["api_type"] = api_type | |
new_post["price"] = 0 | |
if price is None: | |
price = 0 | |
if price: | |
if all(media["canView"] for media in post_result.media): | |
new_post["paid"] = True | |
else: | |
new_post["price"] = price | |
for media in post_result.media: | |
media_id = media["id"] | |
preview_link = "" | |
link = await post_result.link_picker(media, site_settings.video_quality) | |
matches = ["us", "uk", "ca", "ca2", "de"] | |
if not link: | |
continue | |
url = urlparse(link) | |
if not url.hostname: | |
continue | |
subdomain = url.hostname.split(".")[0] | |
print(media) | |
if "files" in media: | |
preview_link = media["files"]["preview"]["url"] | |
else: | |
preview_link = media["preview"] | |
if any(subdomain in nm for nm in matches): | |
subdomain = url.hostname.split(".")[1] | |
if "upload" in subdomain: | |
continue | |
if "convert" in subdomain: | |
link = preview_link | |
rules = [link == "", preview_link == ""] | |
if all(rules): | |
continue | |
new_media: dict[str, Any] = dict() | |
new_media["media_id"] = media_id | |
new_media["links"] = [] | |
new_media["media_type"] = media_type | |
new_media["preview"] = False | |
new_media["created_at"] = new_post["postedAt"] | |
if isinstance(post_result, create_story): | |
date_object = datetime.fromisoformat(media["createdAt"]) | |
date_string = date_object.replace(tzinfo=None).strftime( | |
"%d-%m-%Y %H:%M:%S" | |
) | |
new_media["created_at"] = date_string | |
if int(media_id) in new_post["preview_media_ids"]: | |
new_media["preview"] = True | |
for xlink in link, preview_link: | |
if xlink: | |
new_media["links"].append(xlink) | |
break | |
if media["type"] not in alt_media_types: | |
continue | |
matches = [s for s in site_settings.ignored_keywords if s in final_text] | |
if matches: | |
print("Ignoring - ", f"PostID: {post_id}") | |
continue | |
filename = link.rsplit("/", 1)[-1] | |
filename, ext = os.path.splitext(filename) | |
ext = ext.__str__().replace(".", "").split("?")[0] | |
final_api_type = ( | |
os.path.join("Archived", api_type) | |
if new_post["archived"] | |
else api_type | |
) | |
option: dict[str, Any] = {} | |
option = option | new_post | |
option["site_name"] = api.site_name | |
option["media_id"] = media_id | |
option["filename"] = filename | |
option["api_type"] = final_api_type | |
option["media_type"] = media_type | |
option["ext"] = ext | |
option["profile_username"] = authed.username | |
option["model_username"] = model_username | |
option["date_format"] = date_format | |
option["postedAt"] = new_media["created_at"] | |
option["text_length"] = site_settings.text_length | |
option["directory"] = download_path | |
option["preview"] = new_media["preview"] | |
option["archived"] = new_post["archived"] | |
prepared_format = prepare_reformat(option) | |
file_directory = await prepared_format.reformat_2(file_directory_format) | |
prepared_format.directory = file_directory | |
file_path = await prepared_format.reformat_2( | |
site_settings.filename_format | |
) | |
new_media["directory"] = os.path.join(file_directory) | |
new_media["filename"] = os.path.basename(file_path) | |
if file_directory not in directories: | |
directories.append(file_directory) | |
new_media["linked"] = None | |
for k, v in subscription.temp_scraped: | |
if k == api_type: | |
continue | |
if k == "Archived": | |
v = getattr(v, api_type, []) | |
if v: | |
for post in v: | |
found_medias = [] | |
medias = post.media | |
if medias: | |
for temp_media in medias: | |
temp_filename = temp_media.get("filename") | |
if temp_filename: | |
if temp_filename == new_media["filename"]: | |
found_medias.append(temp_media) | |
else: | |
continue | |
# found_medias = [x for x in medias | |
# if x["filename"] == new_media["filename"]] | |
if found_medias: | |
for found_media in found_medias: | |
found_media["linked"] = api_type | |
new_media["linked"] = post["api_type"] | |
new_media[ | |
"filename" | |
] = f"linked_{new_media['filename']}" | |
new_post["medias"].append(new_media) | |
found_post = [x for x in new_set["content"] if x["post_id"] == post_id] | |
if found_post: | |
found_post = found_post[0] | |
found_post["medias"] += new_post["medias"] | |
else: | |
new_set["content"].append(new_post) | |
new_set["directories"] = directories | |
return new_set | |
async def process_mass_messages( | |
authed: create_auth, mass_messages: list[create_message] | |
): | |
def compare_message(queue_id, remote_messages): | |
for message in remote_messages: | |
if "isFromQueue" in message and message["isFromQueue"]: | |
if queue_id == message["queueId"]: | |
return message | |
global_found = [] | |
chats = [] | |
api = authed.get_api() | |
site_settings = api.get_site_settings() | |
config = api.config | |
if not (config and site_settings): | |
return | |
settings = config.settings | |
salt = settings.random_string | |
encoded = f"{salt}" | |
encoded = encoded.encode("utf-8") | |
hash = hashlib.md5(encoded).hexdigest() | |
profile_directory = authed.directory_manager.profile.metadata_directory | |
mass_message_path = profile_directory.joinpath("Mass Messages.json") | |
chats_path = profile_directory.joinpath("Chats.json") | |
if os.path.exists(chats_path): | |
chats = main_helper.import_json(chats_path) | |
date_object = datetime.today() | |
date_string = date_object.strftime("%d-%m-%Y %H:%M:%S") | |
for mass_message in mass_messages: | |
if "status" not in mass_message: | |
mass_message["status"] = "" | |
if "found" not in mass_message: | |
mass_message["found"] = {} | |
if "hashed_ip" not in mass_message: | |
mass_message["hashed_ip"] = "" | |
mass_message["hashed_ip"] = mass_message.get("hashed_ip", hash) | |
mass_message["date_hashed"] = mass_message.get("date_hashed", date_string) | |
if mass_message["isCanceled"]: | |
continue | |
queue_id = mass_message["id"] | |
text = mass_message["textCropped"] | |
text = html.unescape(text) | |
mass_found = mass_message["found"] | |
media_type = mass_message.get("mediaType") | |
media_types = mass_message.get("mediaTypes") | |
if mass_found or (not media_type and not media_types): | |
continue | |
identifier = None | |
if chats: | |
list_chats = chats | |
for chat in list_chats: | |
identifier = chat["identifier"] | |
messages = chat["messages"]["list"] | |
mass_found = compare_message(queue_id, messages) | |
if mass_found: | |
mass_message["found"] = mass_found | |
mass_message["status"] = True | |
break | |
if not mass_found: | |
list_chats = authed.search_messages(text=text, limit=2) | |
if not list_chats: | |
continue | |
for item in list_chats["list"]: | |
user = item["withUser"] | |
identifier = user["id"] | |
messages = [] | |
print("Getting Messages") | |
keep = ["id", "username"] | |
list_chats2 = [x for x in chats if x["identifier"] == identifier] | |
if list_chats2: | |
chat2 = list_chats2[0] | |
messages = chat2["messages"]["list"] | |
messages = authed.get_messages( | |
identifier=identifier, resume=messages | |
) | |
for message in messages: | |
message["withUser"] = {k: item["withUser"][k] for k in keep} | |
message["fromUser"] = { | |
k: message["fromUser"][k] for k in keep | |
} | |
mass_found = compare_message(queue_id, messages) | |
if mass_found: | |
mass_message["found"] = mass_found | |
mass_message["status"] = True | |
break | |
else: | |
item2 = {} | |
item2["identifier"] = identifier | |
item2["messages"] = authed.get_messages(identifier=identifier) | |
chats.append(item2) | |
messages = item2["messages"]["list"] | |
for message in messages: | |
message["withUser"] = {k: item["withUser"][k] for k in keep} | |
message["fromUser"] = { | |
k: message["fromUser"][k] for k in keep | |
} | |
mass_found = compare_message(queue_id, messages) | |
if mass_found: | |
mass_message["found"] = mass_found | |
mass_message["status"] = True | |
break | |
if not mass_found: | |
mass_message["status"] = False | |
main_helper.export_json(chats, chats_path) | |
for mass_message in mass_messages: | |
found = mass_message["found"] | |
if found and found["media"]: | |
user = found["withUser"] | |
identifier = user["id"] | |
date_hashed_object = datetime.strptime( | |
mass_message["date_hashed"], "%d-%m-%Y %H:%M:%S" | |
) | |
next_date_object = date_hashed_object + timedelta(days=1) | |
if mass_message["hashed_ip"] != hash or date_object > next_date_object: | |
print("Getting Message By ID") | |
x = await authed.get_message_by_id( | |
identifier=identifier, identifier2=found["id"], limit=1 | |
) | |
new_found = x["result"]["list"][0] | |
new_found["withUser"] = found["withUser"] | |
mass_message["found"] = new_found | |
mass_message["hashed_ip"] = hash | |
mass_message["date_hashed"] = date_string | |
global_found.append(found) | |
main_helper.export_json(mass_messages, mass_message_path) | |
return global_found | |
async def get_all_stories(self, subscription: create_user): | |
master_set: list[create_highlight | create_story] = [] | |
master_set.extend(await subscription.get_stories()) | |
master_set.extend(await subscription.get_archived_stories()) | |
highlights = await subscription.get_highlights() | |
valid_highlights: list[create_highlight | create_story] = [] | |
for highlight in highlights: | |
highlight = await subscription.get_highlights(hightlight_id=highlight.id) | |
valid_highlights.extend(highlight) | |
master_set.extend(valid_highlights) | |
return master_set | |
async def get_all_subscriptions( | |
self, | |
authed: create_auth, | |
identifiers: list[int | str] = [], | |
refresh: bool = True, | |
): | |
results = await authed.get_subscriptions( | |
identifiers=identifiers, refresh=refresh | |
) | |
results.sort(key=lambda x: x.subscribedByData["expiredAt"]) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment