Last active
February 6, 2024 20:35
-
-
Save lachesis/14c42da196e34f01ffd188f8b7ffd4e1 to your computer and use it in GitHub Desktop.
BulkVS bulk number download tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# downloads a list of BulkVS phone numbers and writes them out as a CSV to stdout | |
# developed on python3.11, but should work in any modern python3 | |
# before running, install deps: | |
# virtualenv venv | |
# source ./venv/bin/activate | |
# pip install --upgrade aiohttp aiohttp-client-cache | |
# supply env vars with your BULKVS_EMAIL and BULKVS_APIKEY | |
# delete "cache.sqlite" to clear the HTTP cache, otherwise it's indefinite | |
# this takes hours to run, downloads 500MB to 30GB, and makes 30k+ network requests to BulkVS | |
# run it sparingly! | |
# License: MIT | |
# Copyright (c) 2024 Eric Swanson | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import time | |
import logging | |
import os | |
import re | |
import sys | |
import asyncio | |
import aiohttp | |
AUTH = aiohttp.BasicAuth(os.getenv('BULKVS_EMAIL'), os.getenv('BULKVS_APIKEY')) | |
assert AUTH[0], "Supply your BulkVS account email in BULKVS_EMAIL env var" | |
assert AUTH[1], "Supply your BulkVS api key in BULKVS_APIKEY env var" | |
CONCURRENCY = 5 # max concurrent NPAs processing at a time | |
MAX_LIMIT = 5000 # this is max they will actually return | |
N_DIGITS = '23456789' # valid first digits of NPA or NXX | |
# used to track bandwidth and network calls into BulKVS | |
call_count = 0 | |
bytes_downloaded = 0 | |
# cache library is optional so make code that works without it too | |
try: | |
import aiohttp_client_cache | |
except ImportError: | |
print("Warning: unable to import aiohttp_client_cache", file=sys.stderr) | |
def make_session(): | |
return aiohttp.ClientSession() | |
else: | |
cache = aiohttp_client_cache.SQLiteBackend( | |
cache_name='cache', | |
allowable_codes=[200,400,404], | |
expire_after=-1, | |
autoclose=False, | |
) | |
def make_session(): | |
return aiohttp_client_cache.CachedSession(cache=cache) | |
def deduplicate_list(l, key=lambda x:x): # preserves order | |
seen = set() | |
out = [] | |
for x in l: | |
if key(x) in seen: | |
continue | |
out.append(x) | |
seen.add(key(x)) | |
return out | |
# ask BulkVS for numbers in an NPA/NXX | |
async def query(npa, nxx=None): | |
global call_count, bytes_downloaded | |
call_count += 1 | |
npa = int(npa) | |
if nxx: | |
nxx = int(nxx) | |
assert npa >= 200 and npa < 1000, "NPA invalid" | |
assert nxx is None or (nxx >= 200 and nxx < 1000), "NXX invalid" | |
params = { | |
'Limit': MAX_LIMIT, | |
'Npa': npa, | |
} | |
if nxx: | |
params['Nxx'] = nxx | |
url = 'https://portal.bulkvs.com/api/v1.0/orderTn' | |
async with make_session() as session: | |
try: | |
async with session.get( | |
url, | |
auth=AUTH, | |
params=params, | |
timeout=31, | |
) as resp: | |
if resp.status == 404: | |
return [] # empty list for 404 | |
resp.raise_for_status() | |
js = await resp.json() | |
bytes_downloaded += resp.content_length | |
except aiohttp.ClientError as e: | |
raise | |
for x in js: | |
# TF numbers are given as 18XXNXXXXXX for some reason | |
# so remove leading 1 | |
x['TN'] = x['TN'].lstrip('1') | |
# weird bug, they don't actually filter to NPA, so do it for them | |
# note, they also don't filter to NXX but we take advantage of that later | |
js = [x for x in js if x['TN'].startswith(str(npa))] | |
return js | |
# find the area codes and/or exchanges with a given string prefix | |
async def find_npas(prefix): | |
# note: prefix is a string, may have any number of digits between 1 and 6 inclusive | |
global call_count, bytes_downloaded | |
call_count += 1 | |
url = 'https://portal.bulkvs.com/tninv/' | |
params = {'q':prefix} | |
async with make_session() as session: | |
try: | |
async with session.get( | |
url, | |
auth=AUTH, | |
params=params, | |
timeout=91, | |
) as resp: | |
resp.raise_for_status() | |
js = await resp.json() | |
bytes_downloaded += resp.content_length | |
except aiohttp.ClientError as e: | |
raise | |
area_codes = {} | |
exchanges = {} | |
# "js" will be list of some objects, each of which has two subobjects, header and data | |
# header has title, num, and limit | |
# if num >= limit, we missed some data and need a tighter prefix search | |
# title will be Area Code or Exchanges | |
# data is list of more objects, each of which has properties: | |
# primary: the NPA for Area Code OR NPANXX for Exchanges | |
# secondary: string like "1043 Numbers available" | |
fae = faac = None # dumb var names sorry | |
for obj in js: | |
if not obj['header']: continue | |
title = obj['header']['title'] | |
data = obj['data'] | |
if title == 'Area Code': | |
faac = obj['header']['num'] < obj['header']['limit'] | |
for item in data: | |
area_codes[item['primary']] = int(item['secondary'].split(' ')[0]) | |
elif title == 'Exchanges': | |
fae = obj['header']['num'] < obj['header']['limit'] | |
for item in data: | |
exchanges[item['primary']] = int(item['secondary'].split(' ')[0]) | |
if not faac and area_codes: | |
print("Warning: missed some area codes with prefix", prefix, file=sys.stderr) | |
if not fae and exchanges: | |
print("Warning: missed some exchanges with prefix", prefix, file=sys.stderr) | |
return dict(area_codes=area_codes, exchanges=exchanges, found_all_area_codes=faac, found_all_exchanges=fae) | |
# handle a given NPA - this is our unit of parallelism | |
async def _process_npa(npa, expected): | |
our_numbers = list() # list of JSON objects | |
numbers_set = set() # set of just the telephone number strings | |
num = expected # also dumb var name, oh well | |
# number of available telephone numbers | |
print("Scraping NPA", npa, "with", num, "expected telephone numbers", file=sys.stderr) | |
# if number of available numbers is < MAX_LIMIT, we can query in one shot | |
res = await query(npa=npa) | |
our_numbers.extend(res) | |
numbers_set.update(set(x['TN'] for x in res)) | |
# XXX seems like expections don't match for some NPAs | |
# XXX tried hitting this code path every time but it didn't help | |
if num >= MAX_LIMIT: | |
# otherwise we must query individual exchanges in that area code, so find them all | |
exchanges = {} | |
res = await find_npas(npa) | |
exchanges.update(res['exchanges']) | |
# possibly an area code contains more exchanges than we can actually get in one query | |
# if that happens, we'll try a one-digit tighter prefix | |
# i guess theoretically this could happen even with NPA-N prefix but limit is 250 so it won't | |
if not res['found_all_exchanges']: | |
for digit in N_DIGITS: | |
res = await find_npas(npa + digit) | |
exchanges.update(res['exchanges']) | |
# strip the NPA from the front of the exchange | |
assert all(x.startswith(npa) for x in exchanges) | |
exchanges = {x[3:]: y for x,y in exchanges.items()} | |
# now query each npa / nxx to get numbers | |
for nxx in sorted(list(exchanges)): | |
prefix = str(npa) + str(nxx) | |
nxx_num = exchanges[nxx] | |
# set comprehension, find all numbers that we know already and are in this NPA/NXX | |
known_numbers = {x for x in numbers_set if x.startswith(prefix)} | |
# in theory one NXX could have 10,000 numbers and then we'd be hooped as max limit is 5000 | |
# for now just throw a warning, not sure what else to do | |
if nxx_num >= MAX_LIMIT: | |
print("Warning: expecting", nxx_num, "numbers for", npa, nxx, file=sys.stderr) | |
# hey we already found the expected number or more, skip scraping this NXX explicitly | |
if len(known_numbers) >= nxx_num: | |
print("Scraped NPA", npa, "exchange", nxx, "with", nxx_num, | |
"expected numbers, already know", len(known_numbers), file=sys.stderr) | |
continue | |
# scrape the NPA/NXX pair, which can return numbers from other NXXes | |
# don't filter NXX on our side, see if we can resolve more than one at a time | |
res = await query(npa=npa, nxx=nxx) | |
our_numbers.extend(res) | |
numbers_set.update(set(x['TN'] for x in res)) | |
known_numbers = {x for x in numbers_set if x.startswith(prefix)} | |
print("Scraped NPA", npa, "exchange", nxx, "with", nxx_num, | |
"expected numbers, got", len(known_numbers), file=sys.stderr) | |
#print(prefix, nxx_num, len(known_numbers), sep=",") | |
# uniqify the numbers, as we might get multiple copies when we scrape multiple NXXes | |
our_numbers = deduplicate_list(our_numbers, key=lambda x: x['TN']) | |
print("Finished NPA", npa, "with", len(our_numbers), "numbers of expected", num, file=sys.stderr) | |
return our_numbers | |
async def process_npa(semaphore, npa, expected): | |
# manage concurrency limit right here with this semaphore | |
async with semaphore: | |
try: | |
return await _process_npa(npa, expected) | |
except Exception as exc: | |
print("EXCEPTION: Something went wrong with NPA", npa, repr(exc), file=sys.stderr) | |
num_completed = 0 | |
num_npas = 0 | |
def handle_job_results(results): | |
global num_completed | |
try: | |
num_completed += 1 | |
npa = None | |
if results: | |
npa = results[0]['TN'][:3] | |
print("*** Completed NPA {} #{:3d} of {}".format(npa, num_completed, num_npas), file=sys.stderr) | |
for rec in results: | |
try: | |
print(",".join([ | |
rec['TN'], | |
rec['Tier'], | |
rec['Rate Center'], | |
rec['State'], | |
rec['Per Minute Rate'] or "", | |
rec['Mrc'] or "", | |
rec['Nrc'] or "", | |
])) | |
except Exception: | |
print("EXCEPTION: Skipping record", rec, file=sys.stderr) | |
except Exception as exc: | |
print("EXCEPTION: Something went wrong with results!", repr(exc), file=sys.stderr) | |
async def main(): | |
logging.basicConfig(level=logging.WARNING) # we don't really use this... oh well | |
global num_npas | |
# find all area codes and the number of available numbers in that code | |
area_codes = {} | |
for digit in N_DIGITS: | |
res = await find_npas(digit) | |
area_codes.update(res['area_codes']) | |
total_expected_numbers = sum(area_codes.values()) | |
print("We expect to find", total_expected_numbers, "total telephone numbers in", | |
len(area_codes), "distinct NPAs", file=sys.stderr) | |
num_npas = len(area_codes) | |
todo = sorted(list(area_codes.items())) | |
if CONCURRENCY > 1: | |
# the faster path | |
sem = asyncio.Semaphore(CONCURRENCY) | |
jobs = [ | |
process_npa(sem, npa, expected) | |
for npa, expected in todo | |
] | |
num_completed = 0 | |
print("number,tier,rate center,state,per minute rate,mrc,nrc") | |
for job in asyncio.as_completed(jobs): | |
handle_job_results(await job) | |
print("*** DOWNLOADED {} bytes with {} requests so far".format(bytes_downloaded, call_count), file=sys.stderr) | |
else: | |
# the more easily debuggable path for development | |
for npa, expected in todo: | |
try: | |
results = await _process_npa(npa, expected) | |
handle_job_results(results) | |
print("*** DOWNLOADED {} bytes with {} requests so far".format(bytes_downloaded, call_count), file=sys.stderr) | |
except Exception as exc: | |
print("EXCEPTION: Something went wrong with one job at a time", repr(exc), file=sys.stderr) | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment