Skip to content

Instantly share code, notes, and snippets.

@ptgolden
Last active October 18, 2024 02:19
Show Gist options
  • Save ptgolden/8d836d11b9c6b2211e5f606ed6203960 to your computer and use it in GitHub Desktop.
Save ptgolden/8d836d11b9c6b2211e5f606ed6203960 to your computer and use it in GitHub Desktop.
import concurrent.futures
import gzip
import io
import os
import multiprocessing
import json
import queue
import threading
from typing import Dict, Optional, Tuple
from kgx.sink import RdfSink
from kgx.source import JsonlSource
from kgx.transformer import Transformer
NODE_CHUNK_SIZE = 100
EDGE_CHUNK_SIZE = 100
JSONL_NODES_FILENAME = './graph_nodes.jsonl'
JSONL_EDGES_FILENAME = './graph_edges.jsonl'
NTRIPLES_OUTPUT_FILENAME = './out.ntriples'
def init_kgx():
t = Transformer(stream=True)
source = JsonlSource(owner=t)
sink = RdfSink(owner=t, filename="/dev/null")
def reset_sink():
buf = io.BytesIO()
sink.FH = buf # type: ignore
return buf
def convert_edge(edge: Dict):
buf = reset_sink()
record = source.read_edge(edge)
if record is None:
return ""
sink.write_edge(record[3])
buf.seek(0)
return buf.read().decode()
def convert_node(node: Dict):
buf = reset_sink()
record = source.read_node(node)
if record is None:
return ""
sink.write_node(record[1])
buf.seek(0)
return buf.read().decode()
return convert_edge, convert_node
def kgx_worker(
jsonl_queue: queue.Queue[Optional[Tuple[str, bytes]]],
ntriples_queue: queue.Queue[Optional[str]],
):
convert_edge, convert_node = init_kgx()
while True:
message = jsonl_queue.get()
if message is None:
break
record_type, lines = message
ret = ""
for line in lines.split(b"\n"):
data = json.loads(line)
if record_type == "node":
result = convert_node(data)
ret += result
if record_type == "edge":
result = convert_edge(data)
ret += result
ntriples_queue.put(ret)
jsonl_queue.task_done()
def ntriples_writer(
ntriples_queue: queue.Queue[Optional[str]],
output_file_path: str,
):
with gzip.open(output_file_path, "wt") as fd:
while True:
triples_string = ntriples_queue.get()
if triples_string is None:
break
fd.write(triples_string)
ntriples_queue.task_done()
def ntriples_counter(ntriples_queue: queue.Queue[Optional[str]]):
i = 0
total_triples = 0
while True:
triples_string = ntriples_queue.get()
if triples_string is None:
break
num_triples = len(triples_string.strip().split("\n"))
total_triples += num_triples
i += 1
ntriples_queue.task_done()
def jsonl_to_ntriples_parallel(
nodes_jsonl_filename: str,
edges_jsonl_filename: str,
ntriples_filename: str,
):
num_workers = os.cpu_count() or 1
with multiprocessing.Manager() as manager:
jsonl_queue: queue.Queue[Optional[Tuple[str, bytes]]] = manager.Queue(
maxsize=num_workers * 2
)
ntriples_queue: queue.Queue[Optional[str]] = manager.Queue(maxsize=1000)
ntriples_thread = threading.Thread(
target=ntriples_counter,
args=(ntriples_queue,),
daemon=True,
)
ntriples_thread.start()
executor = concurrent.futures.ProcessPoolExecutor()
jsonl_converter_processes = [
executor.submit(kgx_worker, jsonl_queue, ntriples_queue)
for _ in range(num_workers)
]
with open(nodes_jsonl_filename, "rb") as fd:
cur_chunk_size = 0
chunk = b""
for line in fd:
chunk += line
cur_chunk_size += 1
if cur_chunk_size >= NODE_CHUNK_SIZE:
jsonl_queue.put(("node", chunk.strip()))
chunk = b""
cur_chunk_size = 0
if chunk:
jsonl_queue.put(("node", chunk.strip()))
with open(edges_jsonl_filename, "rb") as fd:
cur_chunk_size = 0
chunk = b""
for line in fd:
chunk += line
cur_chunk_size += 1
if cur_chunk_size >= EDGE_CHUNK_SIZE:
jsonl_queue.put(("edge", chunk.strip()))
chunk = b""
cur_chunk_size = 0
if chunk:
jsonl_queue.put(("edge", chunk.strip()))
# Wait for jsonl queue to be consumed
jsonl_queue.join()
# Signal thread and processes to complete
ntriples_queue.put(None)
for _ in range(num_workers):
jsonl_queue.put(None)
# Wait for writer thread to finish
ntriples_thread.join()
# FIXME: Check for errors and log them
for process in jsonl_converter_processes:
process.result()
if __name__ == "__main__":
nodes_jsonl_filename = JSONL_NODES_FILENAME
edges_jsonl_filename = JSONL_EDGES_FILENAME
ntriples_filename = NTRIPLES_OUTPUT_FILENAME
jsonl_to_ntriples_parallel(
nodes_jsonl_filename, edges_jsonl_filename, ntriples_filename
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment