Last active
May 12, 2023 17:58
-
-
Save jackhftang/7399552e0902421a6de276b8e7483e4c to your computer and use it in GitHub Desktop.
Nim HttpClientPool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncdispatch | |
import deques | |
import options | |
import httpclient | |
import strformat | |
export httpclient # for AsyncResponse | |
#[ | |
Wrapping AsyncHttpClient for pooling and timeout, also see | |
https://github.com/nim-lang/Nim/issues/7413 | |
]# | |
type | |
## Use this when the number of resource is pre-defined | |
## and have to dequeue before use and enqueue afterward | |
ResourcePool*[T] = object | |
resources: Deque[T] | |
queuers: Deque[Future[T]] | |
proc rlen*[T](pool: ResourcePool[T]): int = | |
pool.resources.len | |
proc qlen*[T](pool: ResourcePool[T]): int = | |
pool.queuers.len | |
proc dequeue*[T](pool: var ResourcePool[T]): Future[T] = | |
## Take one resource from pool. Wait if not available | |
result = newFuture[T]("ResourcePool.dequeue") | |
if pool.resources.len == 0: | |
pool.queuers.addLast result | |
else: | |
result.complete pool.resources.popFirst() | |
proc tryDequeue*[T](pool: var ResourcePool[T]): Option[T] = | |
## Take one resource from pool. Do not wait if not available. | |
if pool.resources.len == 0: | |
result = none[T]() | |
else: | |
result = some pool.resources.popFirst() | |
proc enqueue*[T](pool: var ResourcePool[T], item: T) = | |
## Add one resource to pool. | |
if pool.queuers.len > 0: | |
pool.queuers.popFirst().complete(item) | |
else: | |
pool.resources.addLast(item) | |
type | |
RequestTimeoutError* = object of CatchableError | |
HttpClientPool* = ref object | |
size: int | |
clients: ResourcePool[AsyncHttpClient] | |
proc newHttpClientPool*(size: int): HttpClientPool = | |
result.new() | |
result.size = size | |
result.clients = ResourcePool[AsyncHttpClient]() | |
for i in 1..size: | |
result.clients.enqueue newAsyncHttpClient() | |
proc size*(pool: HttpClientPool): int = | |
pool.size | |
proc len*(pool: HttpClientPool): int = | |
pool.clients.rlen | |
proc request*( | |
pool: HttpClientPool, | |
url: string, | |
httpMethod: string, | |
body = "", | |
headers: HttpHeaders = nil, | |
multipart: MultipartData = nil, | |
timeout = 5000, | |
): Future[AsyncResponse] {.async.} = | |
let client = await pool.clients.dequeue() | |
defer: pool.clients.enqueue(client) | |
let fut = newFuture[AsyncResponse]("request") | |
proc cb1(fut1: Future[AsyncResponse]) = | |
if not fut.finished: | |
if fut1.failed: fut.fail(fut1.readError()) | |
else: fut.complete(fut1.read()) | |
proc cb2(fut2: Future[void]) = | |
if not fut.finished: | |
client.close() | |
fut.fail newException(RequestTimeoutError, fmt"timeout={timeout}ms") | |
let fut1 = client.request(url, httpMethod, body, headers, multipart) | |
let fut2 = sleepAsync(timeout) | |
fut1.addCallback cb1 | |
fut2.addCallback cb2 | |
return await fut | |
proc request*(client: HttpClientPool, url: string, | |
httpMethod = HttpGet, body = "", headers: HttpHeaders = nil, | |
multipart: MultipartData = nil, timeout = 5000): Future[AsyncResponse] {.inline.} = | |
result = request(client, url, $httpMethod, body, headers, multipart, timeout) | |
proc responseContent(resp: AsyncResponse): Future[string] {.async.} = | |
## Returns the content of a response as a string. | |
## | |
## A ``HttpRequestError`` will be raised if the server responds with a | |
## client error (status code 4xx) or a server error (status code 5xx). | |
if resp.code.is4xx or resp.code.is5xx: | |
raise newException(HttpRequestError, resp.status) | |
else: | |
return await resp.bodyStream.readAll() | |
proc head*(client: HttpClient | AsyncHttpClient, | |
url: string): Future[Response | AsyncResponse] {.multisync.} = | |
## Connects to the hostname specified by the URL and performs a HEAD request. | |
## | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, HttpHead) | |
proc get*(client: HttpClientPool, | |
url: string): Future[AsyncResponse] {.async.} = | |
## Connects to the hostname specified by the URL and performs a GET request. | |
## | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, HttpGet) | |
proc getContent*(client: HttpClientPool, | |
url: string): Future[string] {.async.} = | |
## Connects to the hostname specified by the URL and returns the content of a GET request. | |
let resp = await get(client, url) | |
return await responseContent(resp) | |
proc delete*(client: HttpClientPool, | |
url: string): Future[AsyncResponse] {.async.} = | |
## Connects to the hostname specified by the URL and performs a DELETE request. | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, HttpDelete) | |
proc deleteContent*(client: HttpClientPool, | |
url: string): Future[string] {.async.} = | |
## Connects to the hostname specified by the URL and returns the content of a DELETE request. | |
let resp = await delete(client, url) | |
return await responseContent(resp) | |
proc post*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[AsyncResponse] | |
{.async.} = | |
## Connects to the hostname specified by the URL and performs a POST request. | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, $HttpPost, body, multipart=multipart) | |
proc postContent*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[string] | |
{.async.} = | |
## Connects to the hostname specified by the URL and returns the content of a POST request. | |
let resp = await post(client, url, body, multipart) | |
return await responseContent(resp) | |
proc put*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[AsyncResponse] | |
{.async.} = | |
## Connects to the hostname specified by the URL and performs a PUT request. | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, $HttpPut, body, multipart=multipart) | |
proc putContent*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[string] {.async.} = | |
## Connects to the hostname specified by the URL andreturns the content of a PUT request. | |
let resp = await put(client, url, body, multipart) | |
return await responseContent(resp) | |
proc patch*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[AsyncResponse] | |
{.async.} = | |
## Connects to the hostname specified by the URL and performs a PATCH request. | |
## This procedure uses httpClient values such as ``client.maxRedirects``. | |
result = await client.request(url, $HttpPatch, body, multipart=multipart) | |
proc patchContent*(client: HttpClientPool, url: string, body = "", | |
multipart: MultipartData = nil): Future[string] | |
{.async.} = | |
## Connects to the hostname specified by the URL and returns the content of a PATCH request. | |
let resp = await patch(client, url, body, multipart) | |
return await responseContent(resp) |
@gabbhack Just to mention and for future readers, parts of code is copied from std/httpclient (I replaced multisync with async) and originally I just want a quick implementation of async http client that support timeout and pooling in order to carry on my work. If someone want to improve the code, I would recommend directly working on the standard libraries or start a more feature-rich http library.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think functions with
return await ...
orresult = await ...
can be replaced with the same synchronous ones, but without changing the return type.Example:
to
So this is a normal function, without the "asynchronous" overhead.