Last active
June 19, 2021 07:35
-
-
Save gildor/c05930cc165d0a05cda85b5c8cfd1b27 to your computer and use it in GitHub Desktop.
Simple non-blocking extension function for OkHttp Call that wraps request to Kotlin Coroutine and saves response to File
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.* | |
import okhttp3.* | |
import okio.Buffer | |
import okio.Okio | |
import java.io.File | |
import java.io.IOException | |
/** | |
* Custom coroutine dispatcher for blocking calls | |
*/ | |
val OK_IO = newFixedThreadPoolContext(5, "OK_IO") | |
/** | |
* Invokes OkHttp Call and saves successful result to [output] | |
* | |
* Warning: Dispatcher in [blockingDispatcher] executes blocking calls | |
* [progress] callback returns downloaded bytes and total bytes, but total not always available | |
*/ | |
suspend fun Call.downloadAndSaveTo( | |
output: File, | |
bufferSize: Long = DEFAULT_BUFFER_SIZE.toLong(), | |
blockingDispatcher: CoroutineDispatcher = OK_IO, | |
progress: ((downloaded: Long, total: Long) -> Unit)? = null | |
): File = withContext(blockingDispatcher) { | |
suspendCancellableCoroutine<File> { cont -> | |
cont.invokeOnCompletion { | |
cancel() | |
} | |
enqueue(object : Callback { | |
override fun onFailure(call: Call?, e: IOException) { | |
cont.resumeWithException(e) | |
} | |
override fun onResponse(call: Call?, response: Response) { | |
if (!response.isSuccessful) { | |
cont.resumeWithException(IOException("Unexpected HTTP code: ${response.code()}")) | |
return | |
} | |
try { | |
val body = response.body() | |
if (body == null) { | |
cont.resumeWithException(IllegalStateException("Body is null")) | |
return | |
} | |
val contentLength = body.contentLength() | |
val buffer = Buffer() | |
var finished = false | |
Okio.buffer(Okio.sink(output)).use { out -> | |
body.source().use { source -> | |
var totalLength = 0L | |
while (cont.isActive) { | |
val read = source.read(buffer, bufferSize) | |
if (read == -1L) { | |
finished = true | |
break | |
} | |
out.write(buffer, read) | |
out.flush() | |
totalLength += read | |
progress?.invoke(totalLength, contentLength) | |
} | |
} | |
} | |
if (finished) { | |
cont.resume(output) | |
} else { | |
cont.resumeWithException(IOException("Download cancelled")) | |
} | |
} catch (e: Exception) { | |
cont.resumeWithException(e) | |
} | |
} | |
}) | |
} | |
} | |
fun main(args: Array<String>) = runBlocking { | |
// 2 different requests | |
val gist = Request.Builder().get() | |
.url("https://gist.githubusercontent.com/gildor/c05930cc165d0a05cda85b5c8cfd1b27/raw/d1a4f7369b22c4d6d40d5e5c3eb30588c72d97f2/OkHttpDownloader.kt") | |
.build() | |
val octocat = Request.Builder().get() | |
.url("https://assets-cdn.github.com/images/modules/logos_page/Octocat.png") | |
.build() | |
val client = OkHttpClient() | |
try { | |
println("Before download") | |
// Run 2 tasks in parallel | |
val task1 = async { | |
client.newCall(gist).downloadAndSaveTo(File("/tmp/gist.kt")) { progress, total -> | |
val percent = (progress.toFloat() / total) * 100 | |
println("Download task 1 $percent: $progress/$total") | |
} | |
} | |
val task2 = async { | |
// Just run it, we can do that in parallel, just wrap to async{} coroutine builder | |
client.newCall(octocat).downloadAndSaveTo(File("/tmp/Octocat.png")) { progress, total -> | |
val percent = (progress.toFloat() / total) * 100 | |
println("Download task 2 $percent: $progress/$total") | |
} | |
} | |
// await for results of each task, but run requests in parallel | |
val result1 = task1.await() | |
println("After successful download task 1: $result1") | |
val result2 = task2.await() | |
println("After successful download task 2: $result2") | |
} catch (e: Exception) { | |
println("Download error: $e") | |
throw e | |
} | |
// Cancellation is also supported using Call.cancel() | |
// call.cancel() | |
// Or using coroutine context or Job | |
// coroutineContext.cancel() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment