Skip to content

Instantly share code, notes, and snippets.

@vwrs
Last active May 22, 2019 07:20
Show Gist options
  • Select an option

  • Save vwrs/a5941c9d64fa8b1303d0df826af4face to your computer and use it in GitHub Desktop.

Select an option

Save vwrs/a5941c9d64fa8b1303d0df826af4face to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import logging
import sys
from nvdloader import NVDLoader
def main():
argv = sys.argv
if len(argv) != 2 or argv[1] not in ['2', '3']:
print('Usage: $ python %s 2|3 (CVSS ver.)' % argv[0])
sys.exit(1)
nvdloader = NVDLoader(datadir='../data/raw/', ver=int(argv[1]))
saveas = None # filename
print('preprocessing ...')
nvdloader.nvd = nvdloader.preprocessing()
prefix = f'nvd-v{nvdloader.ver}' if saveas is None else saveas
nvdloader.save_dictionary(prefix + '.dict')
# drop all columns excluding timestamp, descriptions and CVSS
print('\ndropping extra columns ...')
nvdloader.nvd.drop(
labels=['configurations', 'cve', 'impact', 'description',
'lastModifiedDate'], axis=1, inplace=True)
nvdloader.nvd.to_pickle(prefix + '.pkl')
print('---\nsaved to %s.' % (prefix + '.pkl'))
if __name__ == '__main__':
logging.basicConfig(format='%(message)s', level=logging.INFO)
main()
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# -*- coding: utf-8 -*-
import time
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from gensim.corpora import Dictionary
from gensim.parsing import preprocess_documents
class NVDLoader:
def __init__(self, datadir, ver):
assert ver in [2, 3]
self.datadir = datadir
self.ver = ver
self.nvd = self.load_jsons()
print('CVSS version: ', self.ver)
print('\nverifying ...')
self.drop_extra_rows()
self.metrics = self.get_metrics(ver)
def load_jsons(self):
'''concat all json files in the directory `self.datadir`.'''
p = Path(self.datadir)
globpath = '*.json'
print('loading %s/%s ...' % (str(p.resolve()), globpath))
jsons = list(p.glob(globpath))
time.sleep(1)
list_ = [
pd.DataFrame.from_dict(
pd.read_json(j).CVE_Items.to_dict()
).transpose()
for j in tqdm(jsons)]
return pd.concat(list_)
def drop_extra_rows(self):
len_orig = len(self.nvd)
# remove rows which haven't evaluated yet
empty = dict()
nvd = self.nvd.query('impact != @empty')
# remove rows which don't have CVSS v2 or v3 Score
nvd = nvd[nvd.impact.apply(
lambda x: True if f'baseMetricV{self.ver}' in x.keys() else False)]
nvd = nvd.reset_index(drop=True)
print('removed %d rows.' % (len_orig-len(nvd)))
# drop description duplicates
print('dropping duplicates (description) ...')
nvd['description'] = nvd.cve.apply(
lambda x: x['description']['description_data'][0]['value'])
print('%d duplicates found.' %
len(nvd[nvd.duplicated(subset='description')]))
nvd = nvd.drop_duplicates(subset='description',
keep='last').reset_index(drop=True)
print('\n# of rows: %d (original) -> %d (processed)'
% (len_orig, len(nvd)))
self.nvd = nvd
def cvss2label(self, cvss):
'''
convert CVSS Score into labels
x < 4.0 ... 0
4.0 <= x < 7.0 ... 1
7.0 <= x < 9.0 ... 2
9.0 <= x ... 3
'''
if cvss < 4.0:
return 0
elif cvss < 7.0:
return 1
elif cvss < 9.0:
return 2
else:
return 3
@staticmethod
def get_metrics(ver):
if ver == 2:
metricdict = dict(
AV=['L', 'A', 'N'],
AC=['H', 'M', 'L'],
Au=['M', 'S', 'N'],
C=['N', 'P', 'C'],
I=['N', 'P', 'C'],
A=['N', 'P', 'C']
)
else:
raise NotImplementedError()
metrics = metricdict.keys()
print('CVSS metrics:\n', metrics)
return metrics
def append_metrics(self, df, vector_ser):
if self.ver == 2:
vector_ser = vector_ser.apply(lambda x: x.strip('()').split('/'))
else:
raise NotImplementedError()
# append labels as new columns of df
for i, m in enumerate(self.metrics):
df[m] = vector_ser.map(lambda x: x[i].split(':')[1])
return df
def preprocessing(self):
# add columns
nvd = self.nvd
cvss = self.nvd.impact.apply(
lambda x: x[f'baseMetricV{self.ver}'][f'cvssV{self.ver}'])
nvd['cvss_score'] = cvss.map(lambda x: round(x['baseScore'], 1))
nvd = nvd[nvd.cvss_score != 0.0]
print(f'''
CVSS Score
Min: {nvd.cvss_score.min():.1f}
Max: {nvd.cvss_score.max():.1f}
Mean: {nvd.cvss_score.mean():.1f}
''')
print('converting CVSS score into four levels ...')
nvd['cvss_level'] = nvd.cvss_score.map(self.cvss2label)
print('parsing CVSS metrics ...')
nvd = self.append_metrics(nvd, cvss.map(lambda x: x['vectorString']))
nvd['desc_orig'] = nvd.description
print('preprocessing descriptions ...')
# apply all functions in `gensim.parsing.preprocessing.DEFAULT_FILTERS`
nvd['desc_processed'] = preprocess_documents(nvd.description)
print('constructing a word dictionary ...')
self.create_dictionary(nvd.desc_processed)
nvd['desc_processed'] = nvd.desc_processed.apply(
lambda x: [xx for xx in x if xx in self.dct.token2id.keys()])
nvd['desc_id'] = nvd.desc_processed.copy().apply(
lambda x: [self.dct.token2id[xx] for xx in x])
print('removing len(desc_id) <= 5')
nvd = nvd[nvd.desc_id.apply(lambda x: len(x) > 5)]\
.reset_index(drop=True)
return nvd
def create_dictionary(self, documents):
dct = Dictionary(documents)
len_orig = max(dct.token2id.values())
print('filtering by following conditions ...')
print('document frequencies >= 5 or appeared in 50% of all documents')
dct.filter_extremes(no_below=5, no_above=0.5)
print('# of vocabularies: %d (original) -> %d (processed)'
% (len_orig, max(dct.token2id.values())))
self.dct = dct
def save_dictionary(self, filename):
self.dct.save(filename)
def load_dictionary(self, filename):
self.dct = Dictionary.load(filename)
def my_preprocess_documents(self, series):
print('converting to lower case ..')
descs = series.str.lower()
print('removing stop words ...')
from gensim.parsing.preprocessing import remove_stopwords
descs = descs.apply(remove_stopwords)
print('removing parentheses ...')
import re
symbols = '(){}<>\*'
rgx = re.compile(f'[{symbols}]')
descs = [rgx.sub('', d) for d in descs]
print('stemming ...')
from gensim.parsing.porter import PorterStemmer
p = PorterStemmer()
descs = p.stem_documents(descs)
print('splitting ...')
descs = list(map(lambda x: x.split(), descs))
print('done.')
print(descs[:5])
return descs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment