Last active
January 11, 2024 03:56
-
-
Save jimwhite/df999c61f4c71a23e32a21979eb0c136 to your computer and use it in GitHub Desktop.
Fetch history news using Alpaca API, raw JSON format, one article per file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
from datetime import datetime, timezone | |
import time | |
import argparse | |
from alpaca.data.historical.news import NewsClient | |
from alpaca.data.requests import NewsRequest | |
from alpaca.data.timeframe import TimeFrame | |
def isoformatutc(d): | |
return d.isoformat().replace("+00:00", "Z") | |
def write_news_to_json(n): | |
n["fetched_at"] = isoformatutc(datetime.now(timezone.utc)) | |
year_mo = n["updated_at"][:7] | |
news_data_path = os.path.abspath(f"data/news_data_{year_mo}") | |
if not os.path.exists(news_data_path): | |
os.makedirs(news_data_path) | |
with open( | |
os.path.join(news_data_path, f"{n['id']}_{n['updated_at']}.json"), "wt" | |
) as outfile: | |
json.dump(n, outfile) | |
def get_all_the_news(start, end, timeframe: TimeFrame = TimeFrame.Day): | |
""" | |
base function to use with all | |
:param start: | |
:param end: | |
:param timeframe: | |
:return: | |
""" | |
msg = f"Getting news " | |
msg += f", timeframe: {timeframe}" if timeframe else "" | |
msg += f" between dates: start={start}, end={end}" | |
print(msg) | |
step_size = 50 | |
news_client = NewsClient( | |
api_key=os.environ["APCA_API_KEY_ID"], | |
secret_key=os.environ["APCA_API_SECRET_KEY"], | |
raw_data=True, | |
) | |
request_params = NewsRequest( | |
include_content=True, sort="asc", | |
timeframe=timeframe, | |
start=start, | |
end=end, | |
limit=step_size, | |
) | |
print(f"getting news from {start}") | |
news_response = news_client.get_news(request_params) | |
news_list = news_response["news"] | |
print(len(news_list)) | |
if not news_list: | |
print("empty news list") | |
return | |
for news in news_list: | |
write_news_to_json(news) | |
start = datetime.fromisoformat(news["updated_at"]) | |
next_page_token = news_response.get("next_page_token") | |
while next_page_token and start <= end: | |
# print(f"page token: {next_page_token}") | |
request_params = NewsRequest(include_content=True, sort="asc", page_token=next_page_token, limit=step_size) | |
news_response = news_client.get_news(request_params) | |
news_list = news_response.get("news") | |
# print(len(news_list)) | |
if not news_list: | |
print("empty news list") | |
break | |
for news in news_list: | |
write_news_to_json(news) | |
start = datetime.fromisoformat(news["updated_at"]) | |
next_page_token = news_response["next_page_token"] | |
def download_news_year(year): | |
start = datetime(year, 1, 1, tzinfo=timezone.utc) | |
end = datetime(year+1, 1, 1, tzinfo=timezone.utc) | |
timeframe: TimeFrame = TimeFrame.Day | |
get_all_the_news(start, end, timeframe) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--year", help="Enter the year", type=int) | |
args = parser.parse_args() | |
if args.year: | |
print(f"year: {args.year}") | |
start_time = time.time() | |
download_news_year(args.year) | |
print(f"{args.year} took {time.time() - start_time} sec") | |
else: | |
print("No year provided") | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment