Created
December 8, 2023 10:18
-
-
Save apeckham/e803cf1ee8f64e470324458e1d152a05 to your computer and use it in GitHub Desktop.
firestore import in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.cloud import firestore | |
from datetime import datetime | |
import json | |
import concurrent.futures | |
from tqdm import tqdm | |
import math | |
client = firestore.Client(database="xxxxx", project="xxxxx") | |
def process_chunk(chunk, pbar): | |
batch = client.batch() | |
for line in chunk: | |
doc = json.loads(line) | |
doc_id = doc["key"] | |
doc_data = doc["fields"] | |
doc_data["now"] = datetime.utcnow() | |
batch.set(client.collection("xxxxxx").document(doc_id), doc_data) | |
batch.commit() | |
pbar.update(len(chunk)) | |
def read_file_in_chunks(file, chunk_size=500): | |
chunk = [] | |
for line in file: | |
chunk.append(line) | |
if len(chunk) == chunk_size: | |
yield chunk | |
chunk = [] | |
if chunk: | |
yield chunk | |
def parallel_import(filename, num_threads): | |
with open(filename, "r") as file: | |
total_chunks = sum(1 for _ in file) | |
file.seek(0) | |
with tqdm(total=total_chunks) as pbar: | |
with concurrent.futures.ThreadPoolExecutor( | |
max_workers=num_threads | |
) as executor: | |
for chunk in read_file_in_chunks(file): | |
executor.submit(process_chunk, chunk, pbar) | |
if __name__ == "__main__": | |
parallel_import("xxxxx.json", 8) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment