Last active
May 22, 2019 07:20
-
-
Save vwrs/a5941c9d64fa8b1303d0df826af4face to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import logging | |
| import sys | |
| from nvdloader import NVDLoader | |
| def main(): | |
| argv = sys.argv | |
| if len(argv) != 2 or argv[1] not in ['2', '3']: | |
| print('Usage: $ python %s 2|3 (CVSS ver.)' % argv[0]) | |
| sys.exit(1) | |
| nvdloader = NVDLoader(datadir='../data/raw/', ver=int(argv[1])) | |
| saveas = None # filename | |
| print('preprocessing ...') | |
| nvdloader.nvd = nvdloader.preprocessing() | |
| prefix = f'nvd-v{nvdloader.ver}' if saveas is None else saveas | |
| nvdloader.save_dictionary(prefix + '.dict') | |
| # drop all columns excluding timestamp, descriptions and CVSS | |
| print('\ndropping extra columns ...') | |
| nvdloader.nvd.drop( | |
| labels=['configurations', 'cve', 'impact', 'description', | |
| 'lastModifiedDate'], axis=1, inplace=True) | |
| nvdloader.nvd.to_pickle(prefix + '.pkl') | |
| print('---\nsaved to %s.' % (prefix + '.pkl')) | |
| if __name__ == '__main__': | |
| logging.basicConfig(format='%(message)s', level=logging.INFO) | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import time | |
| import pandas as pd | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| from gensim.corpora import Dictionary | |
| from gensim.parsing import preprocess_documents | |
| class NVDLoader: | |
| def __init__(self, datadir, ver): | |
| assert ver in [2, 3] | |
| self.datadir = datadir | |
| self.ver = ver | |
| self.nvd = self.load_jsons() | |
| print('CVSS version: ', self.ver) | |
| print('\nverifying ...') | |
| self.drop_extra_rows() | |
| self.metrics = self.get_metrics(ver) | |
| def load_jsons(self): | |
| '''concat all json files in the directory `self.datadir`.''' | |
| p = Path(self.datadir) | |
| globpath = '*.json' | |
| print('loading %s/%s ...' % (str(p.resolve()), globpath)) | |
| jsons = list(p.glob(globpath)) | |
| time.sleep(1) | |
| list_ = [ | |
| pd.DataFrame.from_dict( | |
| pd.read_json(j).CVE_Items.to_dict() | |
| ).transpose() | |
| for j in tqdm(jsons)] | |
| return pd.concat(list_) | |
| def drop_extra_rows(self): | |
| len_orig = len(self.nvd) | |
| # remove rows which haven't evaluated yet | |
| empty = dict() | |
| nvd = self.nvd.query('impact != @empty') | |
| # remove rows which don't have CVSS v2 or v3 Score | |
| nvd = nvd[nvd.impact.apply( | |
| lambda x: True if f'baseMetricV{self.ver}' in x.keys() else False)] | |
| nvd = nvd.reset_index(drop=True) | |
| print('removed %d rows.' % (len_orig-len(nvd))) | |
| # drop description duplicates | |
| print('dropping duplicates (description) ...') | |
| nvd['description'] = nvd.cve.apply( | |
| lambda x: x['description']['description_data'][0]['value']) | |
| print('%d duplicates found.' % | |
| len(nvd[nvd.duplicated(subset='description')])) | |
| nvd = nvd.drop_duplicates(subset='description', | |
| keep='last').reset_index(drop=True) | |
| print('\n# of rows: %d (original) -> %d (processed)' | |
| % (len_orig, len(nvd))) | |
| self.nvd = nvd | |
| def cvss2label(self, cvss): | |
| ''' | |
| convert CVSS Score into labels | |
| x < 4.0 ... 0 | |
| 4.0 <= x < 7.0 ... 1 | |
| 7.0 <= x < 9.0 ... 2 | |
| 9.0 <= x ... 3 | |
| ''' | |
| if cvss < 4.0: | |
| return 0 | |
| elif cvss < 7.0: | |
| return 1 | |
| elif cvss < 9.0: | |
| return 2 | |
| else: | |
| return 3 | |
| @staticmethod | |
| def get_metrics(ver): | |
| if ver == 2: | |
| metricdict = dict( | |
| AV=['L', 'A', 'N'], | |
| AC=['H', 'M', 'L'], | |
| Au=['M', 'S', 'N'], | |
| C=['N', 'P', 'C'], | |
| I=['N', 'P', 'C'], | |
| A=['N', 'P', 'C'] | |
| ) | |
| else: | |
| raise NotImplementedError() | |
| metrics = metricdict.keys() | |
| print('CVSS metrics:\n', metrics) | |
| return metrics | |
| def append_metrics(self, df, vector_ser): | |
| if self.ver == 2: | |
| vector_ser = vector_ser.apply(lambda x: x.strip('()').split('/')) | |
| else: | |
| raise NotImplementedError() | |
| # append labels as new columns of df | |
| for i, m in enumerate(self.metrics): | |
| df[m] = vector_ser.map(lambda x: x[i].split(':')[1]) | |
| return df | |
| def preprocessing(self): | |
| # add columns | |
| nvd = self.nvd | |
| cvss = self.nvd.impact.apply( | |
| lambda x: x[f'baseMetricV{self.ver}'][f'cvssV{self.ver}']) | |
| nvd['cvss_score'] = cvss.map(lambda x: round(x['baseScore'], 1)) | |
| nvd = nvd[nvd.cvss_score != 0.0] | |
| print(f''' | |
| CVSS Score | |
| Min: {nvd.cvss_score.min():.1f} | |
| Max: {nvd.cvss_score.max():.1f} | |
| Mean: {nvd.cvss_score.mean():.1f} | |
| ''') | |
| print('converting CVSS score into four levels ...') | |
| nvd['cvss_level'] = nvd.cvss_score.map(self.cvss2label) | |
| print('parsing CVSS metrics ...') | |
| nvd = self.append_metrics(nvd, cvss.map(lambda x: x['vectorString'])) | |
| nvd['desc_orig'] = nvd.description | |
| print('preprocessing descriptions ...') | |
| # apply all functions in `gensim.parsing.preprocessing.DEFAULT_FILTERS` | |
| nvd['desc_processed'] = preprocess_documents(nvd.description) | |
| print('constructing a word dictionary ...') | |
| self.create_dictionary(nvd.desc_processed) | |
| nvd['desc_processed'] = nvd.desc_processed.apply( | |
| lambda x: [xx for xx in x if xx in self.dct.token2id.keys()]) | |
| nvd['desc_id'] = nvd.desc_processed.copy().apply( | |
| lambda x: [self.dct.token2id[xx] for xx in x]) | |
| print('removing len(desc_id) <= 5') | |
| nvd = nvd[nvd.desc_id.apply(lambda x: len(x) > 5)]\ | |
| .reset_index(drop=True) | |
| return nvd | |
| def create_dictionary(self, documents): | |
| dct = Dictionary(documents) | |
| len_orig = max(dct.token2id.values()) | |
| print('filtering by following conditions ...') | |
| print('document frequencies >= 5 or appeared in 50% of all documents') | |
| dct.filter_extremes(no_below=5, no_above=0.5) | |
| print('# of vocabularies: %d (original) -> %d (processed)' | |
| % (len_orig, max(dct.token2id.values()))) | |
| self.dct = dct | |
| def save_dictionary(self, filename): | |
| self.dct.save(filename) | |
| def load_dictionary(self, filename): | |
| self.dct = Dictionary.load(filename) | |
| def my_preprocess_documents(self, series): | |
| print('converting to lower case ..') | |
| descs = series.str.lower() | |
| print('removing stop words ...') | |
| from gensim.parsing.preprocessing import remove_stopwords | |
| descs = descs.apply(remove_stopwords) | |
| print('removing parentheses ...') | |
| import re | |
| symbols = '(){}<>\*' | |
| rgx = re.compile(f'[{symbols}]') | |
| descs = [rgx.sub('', d) for d in descs] | |
| print('stemming ...') | |
| from gensim.parsing.porter import PorterStemmer | |
| p = PorterStemmer() | |
| descs = p.stem_documents(descs) | |
| print('splitting ...') | |
| descs = list(map(lambda x: x.split(), descs)) | |
| print('done.') | |
| print(descs[:5]) | |
| return descs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment