Last active
August 4, 2020 06:34
-
-
Save florimondmanca/110133096a4d025eaea2dd08d9bcc380 to your computer and use it in GitHub Desktop.
Proof of concept for an `httpcore` Unix Domain Socket (UDS) transport, compatible with asyncio and trio. Also contains an anyio implementation example.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example using the transport directly. | |
""" | |
import json | |
from httpcore_uds import AsyncUDSTransport | |
async def main() -> None: | |
""" | |
An example that fetches info from the Docker UDS API. | |
The Docker daemon must be running. | |
""" | |
async with AsyncUDSTransport(path="/var/run/docker.sock") as transport: | |
method = b"GET" | |
url = (b"http", b"localhost", None, b"/info") | |
headers = [(b"host", b"")] | |
http_version, status_code, reason, headers, stream = await transport.request( | |
method, url, headers | |
) | |
assert http_version == b"HTTP/1.1" | |
assert status_code == 200, status_code | |
assert reason == b"OK" | |
body = b"".join([chunk async for chunk in stream]) | |
info = json.loads(body.decode("utf-8")) | |
print("Containers:", info["Containers"]) | |
if __name__ == "__main__": | |
print("AsyncIO...") | |
import asyncio | |
asyncio.run(main()) | |
print("Trio...") | |
import trio | |
trio.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example using the transport with HTTPX. | |
""" | |
import httpx | |
from httpcore_uds import AsyncUDSTransport | |
async def main() -> None: | |
""" | |
An example that fetches info from the Docker UDS API. | |
The Docker daemon must be running. | |
""" | |
transport = AsyncUDSTransport(path="/var/run/docker.sock") | |
async with httpx.AsyncClient(transport=transport) as client: | |
response = await client.get("http://localhost/info") | |
response.raise_for_status() | |
info = response.json() | |
print("Containers:", info["Containers"]) | |
if __name__ == "__main__": | |
print("AsyncIO...") | |
import asyncio | |
asyncio.run(main()) | |
print("Trio...") | |
import trio | |
trio.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import List, Optional, Tuple, Union | |
import httpcore | |
import sniffio | |
import trio | |
# Caution, this is private API. | |
from httpcore._async.http11 import AsyncHTTP11Connection | |
from httpcore._backends.asyncio import SocketStream as AsyncIOSocketStream | |
from httpcore._backends.trio import SocketStream as TrioSocketStream | |
async def open_unix_socket_stream( | |
path: str, *, timeout: dict | |
) -> Union[AsyncIOSocketStream, TrioSocketStream]: | |
connect_timeout = timeout.get("connect") | |
library = sniffio.current_async_library() | |
if library == "trio": | |
connect_timeout = float("inf") if connect_timeout is None else connect_timeout | |
with trio.move_on_after(connect_timeout): | |
try: | |
stream = await trio.open_unix_socket(path) | |
except trio.BrokenResourceError as exc: | |
raise httpcore.ConnectError(exc) from None | |
return TrioSocketStream(stream=stream) | |
raise httpcore.ConnectTimeout() from None | |
else: | |
assert library == "asyncio" | |
try: | |
reader, writer = await asyncio.wait_for( | |
asyncio.open_unix_connection(path), timeout=connect_timeout | |
) | |
except asyncio.TimeoutError as exc: | |
raise httpcore.ConnectTimeout(exc) from None | |
except OSError as exc: | |
raise httpcore.ConnectError(exc) from None | |
return AsyncIOSocketStream(reader, writer) | |
class AsyncUDSTransport(httpcore.AsyncHTTPTransport): | |
def __init__(self, path: str) -> None: | |
self.path = path | |
self._connection: Optional[httpcore.AsyncHTTPTransport] = None | |
async def _create_connection( | |
self, *, timeout: dict = None | |
) -> httpcore.AsyncHTTPTransport: | |
timeout = {} if timeout is None else timeout | |
socket = await open_unix_socket_stream(self.path, timeout=timeout) | |
return AsyncHTTP11Connection(socket=socket) | |
async def request( | |
self, | |
method: bytes, | |
url: Tuple[bytes, bytes, Optional[int], bytes], | |
headers: list = None, | |
stream: httpcore.AsyncByteStream = None, | |
timeout: dict = None, | |
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream]: | |
if self._connection is None: | |
self._connection = await self._create_connection(timeout=timeout) | |
return await self._connection.request(method, url, headers, stream, timeout) | |
async def aclose(self) -> None: | |
if self._connection is not None: | |
await self._connection.aclose() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Alternative implementation using anyio for multi-concurrency-library support, | |
instead of sniffio + asyncio + trio. | |
(Also supports curio as a result.) | |
""" | |
from typing import List, Optional, Tuple | |
import anyio | |
import httpcore | |
# Caution, this is private API. | |
from httpcore._async.http11 import AsyncHTTP11Connection | |
from httpcore._backends.base import AsyncSocketStream | |
class AnyIOSocketStream(AsyncSocketStream): | |
def __init__(self, socket: anyio.SocketStream) -> None: | |
self._socket = socket | |
def get_http_version(self) -> str: | |
return "HTTP/1.1" | |
async def read(self, n: int, timeout: dict) -> bytes: | |
read_timeout = timeout.get("read") | |
async with anyio.move_on_after(read_timeout): | |
return await self._socket.receive_some(n) | |
raise httpcore.ReadTimeout() | |
async def write(self, data: bytes, timeout: dict) -> None: | |
write_timeout = timeout.get("write") | |
async with anyio.move_on_after(write_timeout): | |
await self._socket.send_all(data) | |
return | |
raise httpcore.WriteTimeout() | |
async def aclose(self) -> None: | |
await self._socket.close() | |
async def open_unix_socket_stream(path: str, *, timeout: dict) -> AnyIOSocketStream: | |
connect_timeout = timeout.get("connect") | |
async with anyio.move_on_after(connect_timeout): | |
try: | |
socket = await anyio.connect_unix(path) | |
except OSError as exc: | |
raise httpcore.ConnectError(exc) from None | |
return AnyIOSocketStream(socket) | |
raise httpcore.ConnectTimeout() from None | |
class AsyncUDSTransport(httpcore.AsyncHTTPTransport): | |
def __init__(self, path: str) -> None: | |
self.path = path | |
self._connection: Optional[httpcore.AsyncHTTPTransport] = None | |
async def _create_connection( | |
self, *, timeout: dict = None | |
) -> httpcore.AsyncHTTPTransport: | |
timeout = {} if timeout is None else timeout | |
socket = await open_unix_socket_stream(self.path, timeout=timeout) | |
return AsyncHTTP11Connection(socket=socket) | |
async def request( | |
self, | |
method: bytes, | |
url: Tuple[bytes, bytes, Optional[int], bytes], | |
headers: list = None, | |
stream: httpcore.AsyncByteStream = None, | |
timeout: dict = None, | |
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream]: | |
if self._connection is None: | |
self._connection = await self._create_connection(timeout=timeout) | |
return await self._connection.request(method, url, headers, stream, timeout) | |
async def aclose(self) -> None: | |
if self._connection is not None: | |
await self._connection.aclose() | |
async def main() -> None: | |
import httpx | |
transport = AsyncUDSTransport(path="/var/run/docker.sock") | |
async with httpx.AsyncClient(transport=transport) as client: | |
response: httpx.Response = await client.get("http://localhost/info") | |
response.raise_for_status() | |
info = response.json() | |
print("Containers:", info["Containers"]) | |
if __name__ == "__main__": | |
print("AsyncIO...") | |
anyio.run(main, backend="asyncio") | |
print("Trio...") | |
anyio.run(main, backend="trio") | |
print("Curio...") | |
anyio.run(main, backend="curio") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
httpcore | |
httpx | |
trio | |
# Alternative anyio implementation. | |
anyio |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment