Skip to content

Instantly share code, notes, and snippets.

@eclipseo
Created September 3, 2022 13:22
Show Gist options
  • Save eclipseo/8c499b2a75b0326aadbe4f9d66a1afae to your computer and use it in GitHub Desktop.
Save eclipseo/8c499b2a75b0326aadbe4f9d66a1afae to your computer and use it in GitHub Desktop.
import hashlib
import html
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Union
from urllib.parse import urlparse
import helpers.main_helper as main_helper
from apis.onlyfans.classes.auth_model import create_auth
from apis.onlyfans.classes.hightlight_model import create_highlight
from apis.onlyfans.classes.message_model import create_message
from apis.onlyfans.classes.post_model import create_post
from apis.onlyfans.classes.story_model import create_story
from apis.onlyfans.classes.user_model import create_user
from apis.onlyfans.onlyfans import start
from classes.prepare_metadata import prepare_reformat
from modules.module_streamliner import StreamlinedDatascraper
class OnlyFansDataScraper(StreamlinedDatascraper):
def __init__(self, api: start) -> None:
self.api = api
StreamlinedDatascraper.__init__(self, self)
# Scrapes the API for content
async def media_scraper(
self,
post_result: Union[create_story, create_post, create_message],
subscription: create_user,
formatted_directory: Path,
api_type: str,
):
authed = subscription.get_authed()
api = authed.api
site_settings = api.get_site_settings()
if not site_settings:
return
new_set: dict[str, Any] = {}
new_set["content"] = []
directories: list[Path] = []
if api_type == "Stories":
pass
if api_type == "Archived":
pass
if api_type == "Posts":
pass
if api_type == "Messages":
pass
download_path = formatted_directory
model_username = subscription.username
date_format = site_settings.date_format
locations = self.media_types
for media_type, alt_media_types in locations.__dict__.items():
date_today = datetime.now()
master_date = datetime.strftime(date_today, "%d-%m-%Y %H:%M:%S")
file_directory_format = site_settings.file_directory_format
post_id = post_result.id
new_post = {}
new_post["medias"] = []
new_post["archived"] = False
rawText = ""
text = ""
previews = []
date = None
price = None
if isinstance(post_result, create_story):
date = post_result.createdAt
if isinstance(post_result, create_post):
if post_result.isReportedByMe:
continue
rawText = post_result.rawText
text = post_result.text
previews = post_result.preview
date = post_result.postedAt
price = post_result.price
new_post["archived"] = post_result.isArchived
if isinstance(post_result, create_message):
if post_result.isReportedByMe:
continue
text = post_result.text
previews = post_result.previews
date = post_result.createdAt
price = post_result.price
if api_type == "Mass Messages":
media_user = post_result.fromUser
media_username = media_user.username
if media_username != model_username:
continue
final_text = rawText if rawText else text
if date == "-001-11-30T00:00:00+00:00":
date_string = master_date
date_object = datetime.strptime(master_date, "%d-%m-%Y %H:%M:%S")
else:
if not date:
date = master_date
date_object = datetime.fromisoformat(date)
date_string = date_object.replace(tzinfo=None).strftime(
"%d-%m-%Y %H:%M:%S"
)
master_date = date_string
new_post["post_id"] = post_id
new_post["user_id"] = subscription.id
if isinstance(post_result, create_message):
new_post["user_id"] = post_result.fromUser.id
new_post["text"] = final_text
new_post["postedAt"] = date_string
new_post["paid"] = False
new_post["preview_media_ids"] = previews
new_post["api_type"] = api_type
new_post["price"] = 0
if price is None:
price = 0
if price:
if all(media["canView"] for media in post_result.media):
new_post["paid"] = True
else:
print
new_post["price"] = price
for media in post_result.media:
media_id = media["id"]
preview_link = ""
link = await post_result.link_picker(media, site_settings.video_quality)
matches = ["us", "uk", "ca", "ca2", "de"]
if not link:
continue
url = urlparse(link)
if not url.hostname:
continue
subdomain = url.hostname.split(".")[0]
print(media)
if "files" in media:
preview_link = media["files"]["preview"]["url"]
else:
preview_link = media["preview"]
if any(subdomain in nm for nm in matches):
subdomain = url.hostname.split(".")[1]
if "upload" in subdomain:
continue
if "convert" in subdomain:
link = preview_link
rules = [link == "", preview_link == ""]
if all(rules):
continue
new_media: dict[str, Any] = dict()
new_media["media_id"] = media_id
new_media["links"] = []
new_media["media_type"] = media_type
new_media["preview"] = False
new_media["created_at"] = new_post["postedAt"]
if isinstance(post_result, create_story):
date_object = datetime.fromisoformat(media["createdAt"])
date_string = date_object.replace(tzinfo=None).strftime(
"%d-%m-%Y %H:%M:%S"
)
new_media["created_at"] = date_string
if int(media_id) in new_post["preview_media_ids"]:
new_media["preview"] = True
for xlink in link, preview_link:
if xlink:
new_media["links"].append(xlink)
break
if media["type"] not in alt_media_types:
continue
matches = [s for s in site_settings.ignored_keywords if s in final_text]
if matches:
print("Ignoring - ", f"PostID: {post_id}")
continue
filename = link.rsplit("/", 1)[-1]
filename, ext = os.path.splitext(filename)
ext = ext.__str__().replace(".", "").split("?")[0]
final_api_type = (
os.path.join("Archived", api_type)
if new_post["archived"]
else api_type
)
option: dict[str, Any] = {}
option = option | new_post
option["site_name"] = api.site_name
option["media_id"] = media_id
option["filename"] = filename
option["api_type"] = final_api_type
option["media_type"] = media_type
option["ext"] = ext
option["profile_username"] = authed.username
option["model_username"] = model_username
option["date_format"] = date_format
option["postedAt"] = new_media["created_at"]
option["text_length"] = site_settings.text_length
option["directory"] = download_path
option["preview"] = new_media["preview"]
option["archived"] = new_post["archived"]
prepared_format = prepare_reformat(option)
file_directory = await prepared_format.reformat_2(file_directory_format)
prepared_format.directory = file_directory
file_path = await prepared_format.reformat_2(
site_settings.filename_format
)
new_media["directory"] = os.path.join(file_directory)
new_media["filename"] = os.path.basename(file_path)
if file_directory not in directories:
directories.append(file_directory)
new_media["linked"] = None
for k, v in subscription.temp_scraped:
if k == api_type:
continue
if k == "Archived":
v = getattr(v, api_type, [])
if v:
for post in v:
found_medias = []
medias = post.media
if medias:
for temp_media in medias:
temp_filename = temp_media.get("filename")
if temp_filename:
if temp_filename == new_media["filename"]:
found_medias.append(temp_media)
else:
continue
# found_medias = [x for x in medias
# if x["filename"] == new_media["filename"]]
if found_medias:
for found_media in found_medias:
found_media["linked"] = api_type
new_media["linked"] = post["api_type"]
new_media[
"filename"
] = f"linked_{new_media['filename']}"
print
print
print
print
new_post["medias"].append(new_media)
found_post = [x for x in new_set["content"] if x["post_id"] == post_id]
if found_post:
found_post = found_post[0]
found_post["medias"] += new_post["medias"]
else:
new_set["content"].append(new_post)
new_set["directories"] = directories
return new_set
async def process_mass_messages(
authed: create_auth, mass_messages: list[create_message]
):
def compare_message(queue_id, remote_messages):
for message in remote_messages:
if "isFromQueue" in message and message["isFromQueue"]:
if queue_id == message["queueId"]:
return message
print
print
global_found = []
chats = []
api = authed.get_api()
site_settings = api.get_site_settings()
config = api.config
if not (config and site_settings):
return
settings = config.settings
salt = settings.random_string
encoded = f"{salt}"
encoded = encoded.encode("utf-8")
hash = hashlib.md5(encoded).hexdigest()
profile_directory = authed.directory_manager.profile.metadata_directory
mass_message_path = profile_directory.joinpath("Mass Messages.json")
chats_path = profile_directory.joinpath("Chats.json")
if os.path.exists(chats_path):
chats = main_helper.import_json(chats_path)
date_object = datetime.today()
date_string = date_object.strftime("%d-%m-%Y %H:%M:%S")
for mass_message in mass_messages:
if "status" not in mass_message:
mass_message["status"] = ""
if "found" not in mass_message:
mass_message["found"] = {}
if "hashed_ip" not in mass_message:
mass_message["hashed_ip"] = ""
mass_message["hashed_ip"] = mass_message.get("hashed_ip", hash)
mass_message["date_hashed"] = mass_message.get("date_hashed", date_string)
if mass_message["isCanceled"]:
continue
queue_id = mass_message["id"]
text = mass_message["textCropped"]
text = html.unescape(text)
mass_found = mass_message["found"]
media_type = mass_message.get("mediaType")
media_types = mass_message.get("mediaTypes")
if mass_found or (not media_type and not media_types):
continue
identifier = None
if chats:
list_chats = chats
for chat in list_chats:
identifier = chat["identifier"]
messages = chat["messages"]["list"]
mass_found = compare_message(queue_id, messages)
if mass_found:
mass_message["found"] = mass_found
mass_message["status"] = True
break
if not mass_found:
list_chats = authed.search_messages(text=text, limit=2)
if not list_chats:
continue
for item in list_chats["list"]:
user = item["withUser"]
identifier = user["id"]
messages = []
print("Getting Messages")
keep = ["id", "username"]
list_chats2 = [x for x in chats if x["identifier"] == identifier]
if list_chats2:
chat2 = list_chats2[0]
messages = chat2["messages"]["list"]
messages = authed.get_messages(
identifier=identifier, resume=messages
)
for message in messages:
message["withUser"] = {k: item["withUser"][k] for k in keep}
message["fromUser"] = {
k: message["fromUser"][k] for k in keep
}
mass_found = compare_message(queue_id, messages)
if mass_found:
mass_message["found"] = mass_found
mass_message["status"] = True
break
else:
item2 = {}
item2["identifier"] = identifier
item2["messages"] = authed.get_messages(identifier=identifier)
chats.append(item2)
messages = item2["messages"]["list"]
for message in messages:
message["withUser"] = {k: item["withUser"][k] for k in keep}
message["fromUser"] = {
k: message["fromUser"][k] for k in keep
}
mass_found = compare_message(queue_id, messages)
if mass_found:
mass_message["found"] = mass_found
mass_message["status"] = True
break
print
print
print
if not mass_found:
mass_message["status"] = False
main_helper.export_json(chats, chats_path)
for mass_message in mass_messages:
found = mass_message["found"]
if found and found["media"]:
user = found["withUser"]
identifier = user["id"]
print
date_hashed_object = datetime.strptime(
mass_message["date_hashed"], "%d-%m-%Y %H:%M:%S"
)
next_date_object = date_hashed_object + timedelta(days=1)
print
if mass_message["hashed_ip"] != hash or date_object > next_date_object:
print("Getting Message By ID")
x = await authed.get_message_by_id(
identifier=identifier, identifier2=found["id"], limit=1
)
new_found = x["result"]["list"][0]
new_found["withUser"] = found["withUser"]
mass_message["found"] = new_found
mass_message["hashed_ip"] = hash
mass_message["date_hashed"] = date_string
global_found.append(found)
print
print
main_helper.export_json(mass_messages, mass_message_path)
return global_found
async def get_all_stories(self, subscription: create_user):
master_set: list[create_highlight | create_story] = []
master_set.extend(await subscription.get_stories())
master_set.extend(await subscription.get_archived_stories())
highlights = await subscription.get_highlights()
valid_highlights: list[create_highlight | create_story] = []
for highlight in highlights:
highlight = await subscription.get_highlights(hightlight_id=highlight.id)
valid_highlights.extend(highlight)
master_set.extend(valid_highlights)
return master_set
async def get_all_subscriptions(
self,
authed: create_auth,
identifiers: list[int | str] = [],
refresh: bool = True,
):
results = await authed.get_subscriptions(
identifiers=identifiers, refresh=refresh
)
results.sort(key=lambda x: x.subscribedByData["expiredAt"])
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment