Created
July 2, 2022 09:41
-
-
Save bubthegreat/bbcb4a7ac8c22a884d7b2d4f39c45ccc to your computer and use it in GitHub Desktop.
Gets cigna payer transparency files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import asyncio | |
import re | |
import os | |
import aiohttp | |
import aiofiles | |
import time | |
FILES_PATH = "files" | |
GOOD_FILES = os.path.join(FILES_PATH,"good") | |
BAD_FILES = os.path.join(FILES_PATH,"bad") | |
async def download_file(url, filename): | |
ftype = filename.split('cost-transparency.mrf.')[-1].split('.reporting')[0] | |
if not os.path.exists(os.path.join(GOOD_FILES, ftype)): | |
os.mkdir(os.path.join(GOOD_FILES, ftype)) | |
if not os.path.exists(os.path.join(BAD_FILES, ftype)): | |
os.mkdir(os.path.join(BAD_FILES, ftype)) | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as r: | |
if r.status != 200: | |
fpath = os.path.join(os.path.join(BAD_FILES, ftype), filename) | |
print(f"Error downloading file {filename} from url {url}: http response code {r.status}") | |
else: | |
fpath = os.path.join(os.path.join(GOOD_FILES, ftype), filename) | |
async with aiofiles.open(fpath, mode='wb+') as f: | |
await f.write(await r.read()) | |
# Creating same file name | |
# with _done appended to know that file has been downloaded | |
# print(f'Finished downloading {filename}') | |
def get_file_urls(input_file): | |
with open(input_file, "r") as f: | |
data = json.load(f) | |
for key in data.keys(): | |
d = data.get(key) | |
ktype = type(d) | |
if ktype == list: | |
for fileinfo_dict in d: | |
# print(fileinfo_dict) | |
file_url = get_file_url(fileinfo_dict) | |
if 'company_empty' not in file_url: | |
yield file_url | |
def read_in_chunks(file_object, chunk_size=1024): | |
"""Lazy function (generator) to read a file piece by piece. | |
Default chunk size: 1k.""" | |
data = file_object.read(chunk_size) | |
while data: | |
yield data | |
data = file_object.read(chunk_size) | |
file_object.close() | |
def get_file_url(fileinfo_dict): | |
strdict = str(fileinfo_dict) | |
file_url = strdict.split("location':")[-1].strip().strip("}").strip("'") | |
return file_url | |
def get_filename_from_url(file_url): | |
filename = file_url.split('?')[0].split('cloudfront.net')[-1].strip('/').replace('/', '.').replace("=", '-').replace('_', '-') | |
return filename | |
def print_file(filename): | |
# await download_file(file_loc, filename) | |
print(f"Downloaded file {filename}") | |
with open(filename, "rb") as f: | |
try: | |
fdata = json.load(f) | |
print(fdata) | |
except UnicodeDecodeError: | |
# Some of these come across as gzipped files, possibly based on file sizes | |
print("Could not parse this one in json:") | |
fdata = json.loads(gzip.decompress(open(filename, 'rb').read())) | |
print(fdata) | |
finally: | |
print("Some wonky shit here yo") | |
for chonk in read_in_chunks(f): | |
print(chonk) | |
async def download_file_from_url(url): | |
filename = get_filename_from_url(url) | |
await download_file(url, filename) | |
return filename | |
async def gather_with_concurrency(n, *tasks): | |
semaphore = asyncio.Semaphore(n) | |
async def sem_task(task): | |
async with semaphore: | |
return await task | |
return await asyncio.gather(*(sem_task(task) for task in tasks)) | |
if __name__ == "__main__": | |
metadata_file = "2022-07-01_cigna-health-life-insurance-company_index.json" | |
metadata_file_url = "https://d25kgz5rikkq4n.cloudfront.net/cost_transparency/mrf/table-of-contents/reporting_month=2022-07/2022-07-01_cigna-health-life-insurance-company_index.json?Expires=1660438220&Policy=eyJTdGF0ZW1lbnQiOlt7IlJlc291cmNlIjoiaHR0cHM6Ly9kMjVrZ3o1cmlra3E0bi5jbG91ZGZyb250Lm5ldC9jb3N0X3RyYW5zcGFyZW5jeS9tcmYvdGFibGUtb2YtY29udGVudHMvcmVwb3J0aW5nX21vbnRoPTIwMjItMDcvMjAyMi0wNy0wMV9jaWduYS1oZWFsdGgtbGlmZS1pbnN1cmFuY2UtY29tcGFueV9pbmRleC5qc29uIiwiQ29uZGl0aW9uIjp7IkRhdGVMZXNzVGhhbiI6eyJBV1M6RXBvY2hUaW1lIjoxNjYwNDM4MjIwfX19XX0_&Signature=LMaexmKyE5pVwnkPZY-JbpIJdVBFO9UAuX1bxAi4lG7epYGNeagBhu-x5SzUZroH~Bo3MwfEOj0hYQWzJi4Ir3y5GRhcjgoLmBaK5eGAChd~QuVMG9kdAst529mCichuXGqkeJLybFSf~1GBdOP1qpoBhHAeDuQqLDRMfQ991WtpGPw-xYctFpZ6hcmf1BTnlcbkjb8YzmIVOUmEpdhoqmtegoSbCVRtyM-t~VRlSmrqsckrds9reneY6lojMwzPZn0vRMMif0ttC9l059HC8iP4~9douwNYLx2ONqljrfXxv5uXjcOkq4iK~jidt0GtLJCIOxd3p3dlUx~j8BL~-Q__&Key-Pair-Id=K1NVBEPVH9LWJP" | |
if not os.path.exists(FILES_PATH): | |
os.mkdir(FILES_PATH) | |
if not os.path.exists(GOOD_FILES): | |
os.mkdir(GOOD_FILES) | |
if not os.path.exists(BAD_FILES): | |
os.mkdir(BAD_FILES) | |
if not os.path.exists(metadata_file): | |
await download_file(metadata_file_url, metadata_file) | |
file_urls_list = list(get_file_urls(metadata_file)) | |
file_futures = [download_file_from_url(url) for url in file_urls_list] | |
count = len(file_futures) | |
print(f"Starting download of {count} files.") | |
start = time.time() | |
result = await gather_with_concurrency(100, *file_futures) | |
finish = time.time() | |
total = finish - start | |
print(f"Finished download of {len(file_urls_list)} files in {total} seconds") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment