Last active
October 18, 2024 02:19
-
-
Save ptgolden/8d836d11b9c6b2211e5f606ed6203960 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import gzip | |
import io | |
import os | |
import multiprocessing | |
import json | |
import queue | |
import threading | |
from typing import Dict, Optional, Tuple | |
from kgx.sink import RdfSink | |
from kgx.source import JsonlSource | |
from kgx.transformer import Transformer | |
NODE_CHUNK_SIZE = 100 | |
EDGE_CHUNK_SIZE = 100 | |
JSONL_NODES_FILENAME = './graph_nodes.jsonl' | |
JSONL_EDGES_FILENAME = './graph_edges.jsonl' | |
NTRIPLES_OUTPUT_FILENAME = './out.ntriples' | |
def init_kgx(): | |
t = Transformer(stream=True) | |
source = JsonlSource(owner=t) | |
sink = RdfSink(owner=t, filename="/dev/null") | |
def reset_sink(): | |
buf = io.BytesIO() | |
sink.FH = buf # type: ignore | |
return buf | |
def convert_edge(edge: Dict): | |
buf = reset_sink() | |
record = source.read_edge(edge) | |
if record is None: | |
return "" | |
sink.write_edge(record[3]) | |
buf.seek(0) | |
return buf.read().decode() | |
def convert_node(node: Dict): | |
buf = reset_sink() | |
record = source.read_node(node) | |
if record is None: | |
return "" | |
sink.write_node(record[1]) | |
buf.seek(0) | |
return buf.read().decode() | |
return convert_edge, convert_node | |
def kgx_worker( | |
jsonl_queue: queue.Queue[Optional[Tuple[str, bytes]]], | |
ntriples_queue: queue.Queue[Optional[str]], | |
): | |
convert_edge, convert_node = init_kgx() | |
while True: | |
message = jsonl_queue.get() | |
if message is None: | |
break | |
record_type, lines = message | |
ret = "" | |
for line in lines.split(b"\n"): | |
data = json.loads(line) | |
if record_type == "node": | |
result = convert_node(data) | |
ret += result | |
if record_type == "edge": | |
result = convert_edge(data) | |
ret += result | |
ntriples_queue.put(ret) | |
jsonl_queue.task_done() | |
def ntriples_writer( | |
ntriples_queue: queue.Queue[Optional[str]], | |
output_file_path: str, | |
): | |
with gzip.open(output_file_path, "wt") as fd: | |
while True: | |
triples_string = ntriples_queue.get() | |
if triples_string is None: | |
break | |
fd.write(triples_string) | |
ntriples_queue.task_done() | |
def ntriples_counter(ntriples_queue: queue.Queue[Optional[str]]): | |
i = 0 | |
total_triples = 0 | |
while True: | |
triples_string = ntriples_queue.get() | |
if triples_string is None: | |
break | |
num_triples = len(triples_string.strip().split("\n")) | |
total_triples += num_triples | |
i += 1 | |
ntriples_queue.task_done() | |
def jsonl_to_ntriples_parallel( | |
nodes_jsonl_filename: str, | |
edges_jsonl_filename: str, | |
ntriples_filename: str, | |
): | |
num_workers = os.cpu_count() or 1 | |
with multiprocessing.Manager() as manager: | |
jsonl_queue: queue.Queue[Optional[Tuple[str, bytes]]] = manager.Queue( | |
maxsize=num_workers * 2 | |
) | |
ntriples_queue: queue.Queue[Optional[str]] = manager.Queue(maxsize=1000) | |
ntriples_thread = threading.Thread( | |
target=ntriples_counter, | |
args=(ntriples_queue,), | |
daemon=True, | |
) | |
ntriples_thread.start() | |
executor = concurrent.futures.ProcessPoolExecutor() | |
jsonl_converter_processes = [ | |
executor.submit(kgx_worker, jsonl_queue, ntriples_queue) | |
for _ in range(num_workers) | |
] | |
with open(nodes_jsonl_filename, "rb") as fd: | |
cur_chunk_size = 0 | |
chunk = b"" | |
for line in fd: | |
chunk += line | |
cur_chunk_size += 1 | |
if cur_chunk_size >= NODE_CHUNK_SIZE: | |
jsonl_queue.put(("node", chunk.strip())) | |
chunk = b"" | |
cur_chunk_size = 0 | |
if chunk: | |
jsonl_queue.put(("node", chunk.strip())) | |
with open(edges_jsonl_filename, "rb") as fd: | |
cur_chunk_size = 0 | |
chunk = b"" | |
for line in fd: | |
chunk += line | |
cur_chunk_size += 1 | |
if cur_chunk_size >= EDGE_CHUNK_SIZE: | |
jsonl_queue.put(("edge", chunk.strip())) | |
chunk = b"" | |
cur_chunk_size = 0 | |
if chunk: | |
jsonl_queue.put(("edge", chunk.strip())) | |
# Wait for jsonl queue to be consumed | |
jsonl_queue.join() | |
# Signal thread and processes to complete | |
ntriples_queue.put(None) | |
for _ in range(num_workers): | |
jsonl_queue.put(None) | |
# Wait for writer thread to finish | |
ntriples_thread.join() | |
# FIXME: Check for errors and log them | |
for process in jsonl_converter_processes: | |
process.result() | |
if __name__ == "__main__": | |
nodes_jsonl_filename = JSONL_NODES_FILENAME | |
edges_jsonl_filename = JSONL_EDGES_FILENAME | |
ntriples_filename = NTRIPLES_OUTPUT_FILENAME | |
jsonl_to_ntriples_parallel( | |
nodes_jsonl_filename, edges_jsonl_filename, ntriples_filename | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment