Skip to content

Instantly share code, notes, and snippets.

@ag2s20150909
Last active October 27, 2024 08:02
Show Gist options
  • Save ag2s20150909/aab17b109af2368323a5e3f9868a3f4c to your computer and use it in GitHub Desktop.
Save ag2s20150909/aab17b109af2368323a5e3f9868a3f4c to your computer and use it in GitHub Desktop.
Parsing Newline-delimited JSON (NDJSON,application/x-ndjson) with Retrofit and kotlinx.serialization

Parsing Newline-delimited JSON (NDJSON,application/x-ndjson) with Retrofit and kotlinx.serialization

    val retrofit: Retrofit=Retrofit.Builder()
    //........
    .addConverterFactory(Json.newNdJsonFactory())
    .addConverterFactory(Json.asConverterFactory(
            "application/json; charset=UTF8".toMediaType())))
    //........
    .build()
 @POST("/api/generate")
    suspend fun generate(@Body body: GenerateRequest): Flow<GenerateResponse>
import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.StringFormat
import kotlinx.serialization.serializer
import okhttp3.ResponseBody
import retrofit2.Converter
import retrofit2.Retrofit
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
class NdJsonConverterFactory(private val format:StringFormat): Converter.Factory() {
override fun responseBodyConverter(
type: Type,
annotations: Array<out Annotation>,
retrofit: Retrofit
): Converter<ResponseBody, *>? {
if (getRawType(type) != Flow::class.java) {
return null
}
check(type is ParameterizedType) { "Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>" }
val responseType = getParameterUpperBound(0, type)
val loader =format.serializersModule.serializer(responseType)
return NdJsonResponseBodyConverter(loader,format)
}
}
fun StringFormat.newNdJsonFactory()=NdJsonConverterFactory(this)
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.StringFormat
import okhttp3.ResponseBody
import retrofit2.Converter
class NdJsonResponseBodyConverter<T>(
private val loader: DeserializationStrategy<T>,
private val format: StringFormat
) : Converter<ResponseBody, Flow<T>> {
override fun convert(value: ResponseBody): Flow<T> = callbackFlow {
value.charStream().forEachLine { line->
val obj=format.decodeFromString(loader,line)
trySendBlocking(obj)
}
awaitClose {
value.close()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment