Created
July 13, 2021 07:25
-
-
Save enijkamp/fe62ec220c3cb8e6a5df098c9e470036 to your computer and use it in GitHub Desktop.
create_tf_records.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import gzip | |
import json | |
import io | |
import argparse | |
import concurrent.futures | |
from tokenizers import Tokenizer | |
import tensorflow as tf | |
from google.cloud import storage | |
def create_args(args=argparse.Namespace()): | |
args.data_bucket = 'sfr-tpu-prem2-bucket' | |
args.data_bucket_path = 'enijkamp/bigquery_json' | |
args.data_langs = ['c', 'cpp', 'cs', 'css', 'go', 'html', 'java', 'js', 'obj', 'perl', 'php', 'py', 'ruby'] | |
args.tokenizer_file = '/export/home/gptc/byte-level-bpe.tokenizer.json' | |
args.run_num_processes = 96 | |
# dataset size | |
# 50,000 samples -> 500MB | |
# 2 TB / 500 MB * 50,000 samples = (2 * 1000000) MB / 500 MB * 50,000 samples = 200,000,000 total samples | |
# 200,000,000 total samples / 13 languages = 15,000,000 samples per lang | |
# 15,000,000 / 50,000 = 300 files per language | |
args.out_gs = False | |
args.out_bucket_path = '/export/home/gptc/bigcode_bpe' | |
args.out_records_per_lang = 15_000_000 | |
args.out_min_len = 128 | |
args.out_max_len = 16_384 | |
return args | |
def process_files(args): | |
def to_gs_out_file(f): | |
return f'gs://{args.data_bucket}/{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}' | |
def to_local_out_file(f): | |
path = f'{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}' | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
return path | |
def process(lang, files_chunks, do_map): | |
to_out_file = to_gs_out_file if args.out_gs else to_local_out_file | |
total_num_records_for_lang = 0 | |
for files_chunk in files_chunks: | |
if total_num_records_for_lang > args.out_records_per_lang: | |
break | |
for f, (num_records, mean_len_record) in zip(files_chunk, do_map(process_file, [(args, f, to_out_file(f)) for f in files_chunk])): | |
print(f, num_records, mean_len_record) | |
print(f'lang={lang} file={f} mean_len_record={int(mean_len_record)} num_records={num_records} total_num_records_for_lang={total_num_records_for_lang}', flush=True) | |
total_num_records_for_lang += num_records | |
print(f'finished {lang} with {total_num_records_for_lang} total records', flush=True) | |
def process_chunks(lang, files_chunks): | |
if args.run_num_processes == 1: | |
def do_map(f, args_list): | |
for args in args_list: | |
yield f(*args) | |
process(lang, files_chunks, do_map) | |
else: | |
with concurrent.futures.ProcessPoolExecutor(args.run_num_processes) as executor: | |
def map_with_zip_args(f, args): | |
for result in executor.map(f, *zip(*args)): | |
yield result | |
process(lang, files_chunks, map_with_zip_args) | |
def yield_chunks(lang): | |
storage_client = storage.Client() | |
files = [] | |
for blob in storage_client.list_blobs(args.data_bucket, prefix=f'{args.data_bucket_path}/{lang}/'): | |
if '.json' in blob.name: | |
files.append(blob.name) | |
for files_chunk in partition(l=files, n=args.run_num_processes): | |
yield files_chunk | |
for lang in args.data_langs: | |
print(f'processing {lang}', flush=True) | |
process_chunks(lang, yield_chunks(lang)) | |
def process_file(args, in_file, out_file): | |
if args.out_gs: | |
storage_client = storage.Client() | |
target_blob = storage.Blob(bucket=storage_client.bucket(args.out_bucket_path), name=out_file) | |
if target_blob.exists(storage_client): | |
print(f'skipping {out_file}', flush=True) | |
return 0, 0. | |
def mv(p1, p2): | |
raise NotImplementedError() | |
def rm(f): | |
raise NotImplementedError() | |
else: | |
if os.path.exists(out_file): | |
print(f'skipping {out_file}', flush=True) | |
return 0, 0. | |
def mv(f1, f2): | |
os.rename(f1, f2) | |
def rm(f): | |
os.remove(f) if os.path.exists(f) else None | |
out_file_tmp = f'{out_file}.tmp' | |
rm(out_file_tmp) | |
print(f'processing {in_file} -> {out_file_tmp}', flush=True) | |
tokenizer = create_tokenizer(args.tokenizer_file) | |
data_iter = create_gs_data_iter(args.data_bucket, in_file) | |
n, mean_len = 0, 0. | |
with tf.io.TFRecordWriter(out_file_tmp) as writer: | |
for sample in data_iter: | |
record = tokenizer.encode(sample).ids | |
if len(record) >= args.out_min_len and len(record) <= args.out_max_len: | |
write_to_file(writer, record) | |
n += 1 | |
mean_len += (len(record) - mean_len) / n | |
if n % 10_000 == 0: | |
print(n, int(mean_len), in_file) | |
print(f'finalizing {out_file_tmp} -> {out_file}', flush=True) | |
mv(out_file_tmp, out_file) | |
return n, mean_len | |
def create_tokenizer(file): | |
return Tokenizer.from_file(file) | |
def create_gs_data_iter(data_bucket, data_file): | |
storage_client = storage.Client() | |
bucket = storage_client.get_bucket(data_bucket) | |
blob = bucket.blob(data_file) | |
data = io.BytesIO(blob.download_as_string()) | |
with gzip.open(data) as f: | |
for line in f: | |
j = json.loads(line) | |
if 'content' in j: | |
yield j['content'] | |
def _int64_feature(value): | |
return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) | |
def write_to_file(writer, data): | |
feature = { 'text': _int64_feature(data) } | |
tf_example = tf.train.Example(features=tf.train.Features(feature=feature)) | |
writer.write(tf_example.SerializeToString()) | |
def partition(l, n): | |
return [l[i:i + n] for i in range(0, len(l), n)] | |
def main(): | |
args = create_args() | |
process_files(args) | |
print('done.', flush=True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment