Skip to content

Instantly share code, notes, and snippets.

@brevityinmotion
Created June 9, 2021 04:58
Show Gist options
  • Save brevityinmotion/0d3a79c3d81f4afa3f3f426451544a01 to your computer and use it in GitHub Desktop.
Save brevityinmotion/0d3a79c3d81f4afa3f3f426451544a01 to your computer and use it in GitHub Desktop.
Functions to parse and normalize URLs and domains
import tldextract
import pandas as pd
def parseRootDomains(refinedBucketPath, programName):
storePathInitial = refinedBucketPath + programName + '/' + programName + '-domains.csv'
dfAllDomains = pd.read_csv(storePathInitial)
allDomains = dfAllDomains['domain'].unique().tolist()
domainEdges = []
for val in allDomains:
domainEdges.append(processDomainRoots(val))
# Convert to set to remove duplicates from list
setUniqueRoots = set(domainEdges)
# Copy set into new list
lstUniqueRoots = (list(setUniqueRoots))
dfRoots = pd.DataFrame(lstUniqueRoots)
dfRoots.columns = ['domain']
storePathRoots = refinedBucketPath + programName + '/' + programName + '-domains-roots.csv'
try:
dfInitialRoots = pd.read_csv(storePathRoots)
except:
lstEmpty = []
dfInitialRoots = pd.DataFrame(lstEmpty,columns=['domain'])
dfNewRoots = dfInitialRoots.merge(dfRoots, how ='outer',indicator=True).loc[lambda x : x['_merge']=='right_only']
dfNewRoots = pd.DataFrame(dfNewRoots['domain'])
if (len(dfNewRoots) > 0):
dfRoots = dfNewRoots.append(dfInitialRoots)
dfRoots = dfRoots.drop_duplicates()
dfRoots.to_csv(storePathRoots, index=False)
return dfNewRoots
# Return only the root domains
def processDomainRoots(domainName):
ext = tldextract.extract(domainName)
if (ext.suffix is not ''):
rootDomain = ext.domain + '.' + ext.suffix
else:
rootDomain = ext.domain
subs = ext.subdomain.split('.')
subLength = len(subs) - 1
rootDomain = subs[subLength] + '.' + rootDomain
return rootDomain
# Generate a list of all of the unique domains while parsing potentially missing child domains
def processBulkDomains(dfDomains):
listDomains = []
try:
listDomains = dfDomains['subdomain'].unique().tolist()
except:
print('No subdomain column')
try:
listDomains += dfDomains['domain'].unique().tolist()
except:
print('No domain column')
tempDomains = []
for val in listDomains:
tempDomains = processSingleDomain(val)
listDomains = listDomains + tempDomains
setDomains = set(listDomains)
return setDomains
def storeAllDomains(programName, refinedBucketPath, lstDomains, programInputBucketPath):
dfDomains = pd.DataFrame(lstDomains)
print('Length of scope domains: ' + str(len(dfDomains)))
storePathUnique = programInputBucketPath + programName + '/' + programName + '-domains-all.csv'
storePathNew = programInputBucketPath + programName + '/' + programName + '-domains-new.csv'
dfDomains.rename({0: 'domain'}, axis=1, inplace=True)
try:
storePath = refinedBucketPath + programName + '/' + programName + '-domains.csv'
dfExistingDomains = pd.read_csv(storePath)
except:
lstEmpty = []
dfExistingDomains = pd.DataFrame(lstEmpty,columns=['domain'])
print('Initial length of unique subdomains: ' + str(len(dfExistingDomains)))
dfNewDomains = dfExistingDomains.merge(dfDomains, how ='outer',indicator=True).loc[lambda x : x['_merge']=='right_only']
dfNewDomains = pd.DataFrame(dfNewDomains['domain'])
print('Length of dfNewDomains ' + str(len(dfNewDomains)))
if (len(dfNewDomains) > 0):
dfNewDomains.to_csv(storePathNew, header=False, index=False, sep='\n')
dfDomains = dfDomains.append(dfExistingDomains)
dfDomains = dfDomains.drop_duplicates()
dfDomains.to_csv(storePath, index=False)
dfDomains.to_csv(storePathUnique, header=False, index=False, sep='\n')
print('Updated length of unique subdomains: ' + str(len(dfDomains)))
# Every time the domain all list is updated, also update the roots list
dfDomainRoots = parseRootDomains(refinedBucketPath, programName)
return 'Success'
def processSingleDomain(domainName):
domainList = []
ext = tldextract.extract(domainName)
if (ext.suffix is not ''):
rootDomain = ext.domain + '.' + ext.suffix
domainList.append(rootDomain)
if (ext.subdomain is not ''):
subDomain = ext.subdomain
subs = subDomain.split('.')
subLength = len(subs) - 1
while subLength >= 0:
rootDomain = subs[subLength] + '.' + rootDomain
domainList.append(rootDomain)
subLength = subLength - 1
else:
rootDomain = ext.domain
domainList.append(rootDomain)
subDomain = ext.subdomain
subs = subDomain.split('.')
subLength = len(subs) - 1
while subLength >= 0:
rootDomain = subs[subLength] + '.' + rootDomain
domainList.append(rootDomain)
subLength = subLength - 1
return domainList
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment