Last active
November 30, 2023 14:00
-
-
Save tamanobi/e114c32bf4ac8ebfd9440994ef9679d9 to your computer and use it in GitHub Desktop.
Pocket API から取得した Twitter URL をもとにメディアを S3 に保存するやつ。ついでに XMP も仕込んでいる
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from requests.exceptions import HTTPError | |
from pathlib import Path | |
import re | |
import boto3 | |
from typing import List | |
import json | |
from libxmp import XMPFiles, XMPMeta, consts | |
import tempfile | |
pattern = r"https://twitter\.com/([a-zA-Z0-9_]+)/status/(\d+)" | |
# Pocket Consumer Key | |
CONSUMER_KEY = '' | |
def save_access_token(access_token, file_name="access_token.txt"): | |
try: | |
# pathlibを使ってファイルパスを設定 | |
token_file = Path(file_name) | |
# テキストファイルに書き込み | |
token_file.write_text(access_token) | |
print(f"Access token saved to {file_name}") | |
except IOError as e: | |
print(f"An error occurred while saving the access token: {e}") | |
# OAuth認証ステップ1: リクエストトークンの取得 | |
def get_request_token(): | |
url = 'https://getpocket.com/v3/oauth/request' | |
headers = {'X-Accept': 'application/json'} | |
payload = {'consumer_key': CONSUMER_KEY, 'redirect_uri': 'https://example.com'} | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json()['code'] | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') | |
except Exception as err: | |
print(f'Other error occurred: {err}') | |
def access_token_path() -> Path: | |
return Path("access_token.txt") | |
# ユーザーが認証した後、アクセストークンを取得 | |
def get_access_token(request_token): | |
url = 'https://getpocket.com/v3/oauth/authorize' | |
payload = {'consumer_key': CONSUMER_KEY, 'code': request_token} | |
headers = {'X-Accept': 'application/json'} | |
try: | |
response = requests.get(url, headers=headers, params=payload) | |
response.raise_for_status() | |
access_token = response.json()['access_token'] | |
save_access_token(access_token) | |
return access_token | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') | |
except Exception as err: | |
print(f'Other error occurred: {err}') | |
# 保存されたURLの取得 | |
def get_saved_urls(access_token): | |
url = 'https://getpocket.com/v3/get' | |
payload = {'consumer_key': CONSUMER_KEY, 'access_token': access_token, 'state': 'all'} | |
headers = {'X-Accept': 'application/json'} | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json()['list'] | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') | |
except Exception as err: | |
print(f'Other error occurred: {err}') | |
def is_cache_exists(tweet_id: str) -> bool: | |
p = tweet_filepath(tweet_id) | |
if not p.exists(): | |
return False | |
try: | |
json.loads(p.read_text()) | |
except json.decoder.JSONDecodeError: | |
return False | |
return True | |
def tweet_filepath(tweet_id: str) -> Path: | |
saved_dir = Path("tweet") | |
saved_dir.mkdir(exist_ok=True) | |
return (saved_dir / f"{tweet_id}.txt") | |
def save_tweet(tweet_id: str) -> bool: | |
p = tweet_filepath(tweet_id) | |
response = requests.get(f"https://api.vxtwitter.com/x/status/{tweet_id}") | |
if response.status_code != 200: | |
print("skip: response.status_code が 200 以外だったので") | |
return False | |
p.write_text(response.text) | |
return True | |
def get_urls(tweet_id: str) -> List[str]: | |
try: | |
data = json.loads(tweet_filepath(tweet_id).read_text()) | |
except json.decoder.JSONDecodeError: | |
print(f"JSONDecodeError: {tweet_id}") | |
return [] | |
extracted = data['mediaURLs'] + [media['url'] for media in data['media_extended'] if 'url' in media] | |
return list(set(extracted)) # unique | |
def file_name(tweet_id: str, url: str) -> str: | |
stem = url.split('/')[-1].split('?')[0] | |
return f"{tweet_id}_{stem}" | |
def get_media_from_url(url): | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.content | |
else: | |
return None | |
def upload_to_wasabi(content, name): | |
bucket_name = "tweet-douga" | |
s3_file_name = f"tweet/{name}" | |
wasabi_endpoint = "https://s3.ap-northeast-1.wasabisys.com" | |
aws_access_key_id = "" | |
aws_secret_access_key = "" | |
s3 = boto3.client('s3', endpoint_url=wasabi_endpoint, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) | |
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=content) | |
def update_xmp(memory_data: bytes, tweet_id: str, ) -> bytes: | |
with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
temp_file.write(memory_data) | |
temp_file_path = temp_file.name | |
xmpfile = XMPFiles(file_path=temp_file_path, open_forupdate=True) | |
xmp = xmpfile.get_xmp() | |
if xmp is None: | |
xmp = XMPMeta() | |
xmp.set_property(consts.XMP_NS_DC, "tweet_id", tweet_id) | |
xmp.set_property(consts.XMP_NS_DC, "tweet", tweet_filepath(tweet_id).read_text()) | |
if xmpfile.can_put_xmp(xmp): | |
xmpfile.put_xmp(xmp) | |
else: | |
print("XMPを書き込めませんでした") | |
xmpfile.close_file() | |
with open(temp_file_path, 'rb') as f: | |
content = f.read() | |
return content | |
def cli(): | |
if not access_token_path().exists(): | |
# リクエストトークンを取得 | |
request_token = get_request_token() | |
# このURLをブラウザで開き、Pocketで認証 | |
print(f"Please visit the following URL to authorize: https://getpocket.com/auth/authorize?request_token={request_token}&redirect_uri=https://example.com") | |
input("> Press Enter after you've authorized the request token...") | |
# アクセストークンを取得 | |
access_token = get_access_token(request_token) | |
else: | |
access_token = access_token_path().read_text() | |
saved_dir = Path("tweet") | |
saved_dir.mkdir(exist_ok=True) | |
# URLを取得して表示 | |
saved_urls = get_saved_urls(access_token) | |
for item_id, item_details in saved_urls.items(): | |
url = item_details.get('resolved_url') or item_details.get('given_url') | |
match = re.match(pattern, url) | |
if match is None: | |
continue | |
tweet_id = match.group(2) | |
if not is_cache_exists(tweet_id): | |
save_tweet(tweet_id) | |
for url in get_urls(tweet_id): | |
name = file_name(tweet_id, url) | |
maybe_media = get_media_from_url(url) | |
if maybe_media is not None: | |
media: bytes = maybe_media | |
print("メタ情報を書き込みます") | |
updated_media = update_xmp(media, tweet_id) | |
print(f"メタ情報を書き込みました: {tweet_id}") | |
print(f"アップロードします: {url}") | |
upload_to_wasabi(updated_media, name) | |
print(f"アップロードしました: {url}") | |
else: | |
print("メディアを取得できませんでした") | |
# print(f"Twitter: {match.group(1)}, Tweet ID: {tweet_id}") | |
# print(f"URL: {url}") | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment