Created
July 9, 2024 20:19
-
-
Save valencik/b1bcf0f173321c4dec17901ac3611baa to your computer and use it in GitHub Desktop.
Elasticsearch Bulk Indexing with a stream of JSON strings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.IO | |
import fs2.{Pipe, Pull, Stream} | |
object JsonlPayloadPipe { | |
/** Builds a pipe that transforms a stream of JSON strings into payloads for the Elasticsearch Bulk | |
* API. Each input string represents a JSON document to be sent to the Bulk API. The JSONs are | |
* gathered, interspersed with `delimiter`, and packaged into payload strings up to a maximum | |
* size of `batchMax` bytes. | |
* | |
* @param delimiter | |
* This is the "action" ES should apply to the document following it. | |
* A typical value might be `{ "index" : { "_index" : "my-index" } }` | |
* @param batchMax | |
* Maximum payload size in bytes | |
* @return | |
* A pipe that yields a stream of row counts and payloads | |
*/ | |
def formatUpToMaxBytes(delimiter: String, batchMax: Int): Pipe[IO, String, (Int, String)] = { | |
val sb = StringBuilder.apply(capacity = batchMax) | |
def finalizePayload(): String = { | |
// ES requires an extra trailing newline at the end of bulk payloads | |
sb.append("\n") | |
val res = sb.result() | |
sb.clear() | |
res | |
} | |
// This is a recursive function that processes the stream | |
// while accumulating the current payload size and current number of rows in the payload | |
def go(jsons: Stream[IO, String], currSize: Int, currNum: Int): Pull[IO, (Int, String), Unit] = | |
// Separate stream into current Chunk and remaining Stream | |
jsons.pull.uncons.flatMap { | |
case Some((headChunk, tailStream)) => { | |
var i = 0 | |
var takeMore = true | |
var size = currSize | |
// Accumulate jsonRows into the string builder | |
while (takeMore && i < headChunk.size) { | |
val jsonRow = headChunk(i) | |
val newSize = size + delimiter.size + jsonRow.size + 2 | |
if (newSize < batchMax && jsonRow.nonEmpty) { | |
sb.append(delimiter) | |
sb.append(jsonRow) | |
sb.append("\n") | |
size = newSize | |
i += 1 | |
} else { | |
// Couldn't fit this jsonRow, don't take anymore | |
takeMore = false | |
} | |
} | |
if (takeMore) { | |
// We finished processing this chunk and still have more room | |
go(tailStream, currSize = size, currNum = currNum + i) | |
} else { | |
// We are done this chunk or have no more room in the string builder | |
// If we haven't accumulated any rows, stop with no output | |
// Otherwise return the number of rows processed, the payload, and keep processing | |
if (currNum == 0) | |
Pull.done | |
else { | |
// Recursively call ourselves with Stream of remaining json strings | |
val remaining = Stream.chunk(headChunk.drop(i)) ++ tailStream | |
val continue = go(remaining, currSize = 0, currNum = 0) | |
Pull.output1((currNum, finalizePayload())) >> continue | |
} | |
} | |
} | |
case None => { | |
// The jsons stream is now empty | |
// If we haven't accumulated any rows, stop with no output | |
// Otherwise return the number of rows processed and the payload | |
if (currNum == 0) Pull.done else Pull.output1((currNum, finalizePayload())) | |
} | |
} | |
rows => go(rows, 0, 0).stream | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment