Skip to content

Instantly share code, notes, and snippets.

@florimondmanca
Last active August 4, 2020 06:34
Show Gist options
  • Save florimondmanca/110133096a4d025eaea2dd08d9bcc380 to your computer and use it in GitHub Desktop.
Save florimondmanca/110133096a4d025eaea2dd08d9bcc380 to your computer and use it in GitHub Desktop.
Proof of concept for an `httpcore` Unix Domain Socket (UDS) transport, compatible with asyncio and trio. Also contains an anyio implementation example.
"""
Example using the transport directly.
"""
import json
from httpcore_uds import AsyncUDSTransport
async def main() -> None:
"""
An example that fetches info from the Docker UDS API.
The Docker daemon must be running.
"""
async with AsyncUDSTransport(path="/var/run/docker.sock") as transport:
method = b"GET"
url = (b"http", b"localhost", None, b"/info")
headers = [(b"host", b"")]
http_version, status_code, reason, headers, stream = await transport.request(
method, url, headers
)
assert http_version == b"HTTP/1.1"
assert status_code == 200, status_code
assert reason == b"OK"
body = b"".join([chunk async for chunk in stream])
info = json.loads(body.decode("utf-8"))
print("Containers:", info["Containers"])
if __name__ == "__main__":
print("AsyncIO...")
import asyncio
asyncio.run(main())
print("Trio...")
import trio
trio.run(main)
"""
Example using the transport with HTTPX.
"""
import httpx
from httpcore_uds import AsyncUDSTransport
async def main() -> None:
"""
An example that fetches info from the Docker UDS API.
The Docker daemon must be running.
"""
transport = AsyncUDSTransport(path="/var/run/docker.sock")
async with httpx.AsyncClient(transport=transport) as client:
response = await client.get("http://localhost/info")
response.raise_for_status()
info = response.json()
print("Containers:", info["Containers"])
if __name__ == "__main__":
print("AsyncIO...")
import asyncio
asyncio.run(main())
print("Trio...")
import trio
trio.run(main)
import asyncio
from typing import List, Optional, Tuple, Union
import httpcore
import sniffio
import trio
# Caution, this is private API.
from httpcore._async.http11 import AsyncHTTP11Connection
from httpcore._backends.asyncio import SocketStream as AsyncIOSocketStream
from httpcore._backends.trio import SocketStream as TrioSocketStream
async def open_unix_socket_stream(
path: str, *, timeout: dict
) -> Union[AsyncIOSocketStream, TrioSocketStream]:
connect_timeout = timeout.get("connect")
library = sniffio.current_async_library()
if library == "trio":
connect_timeout = float("inf") if connect_timeout is None else connect_timeout
with trio.move_on_after(connect_timeout):
try:
stream = await trio.open_unix_socket(path)
except trio.BrokenResourceError as exc:
raise httpcore.ConnectError(exc) from None
return TrioSocketStream(stream=stream)
raise httpcore.ConnectTimeout() from None
else:
assert library == "asyncio"
try:
reader, writer = await asyncio.wait_for(
asyncio.open_unix_connection(path), timeout=connect_timeout
)
except asyncio.TimeoutError as exc:
raise httpcore.ConnectTimeout(exc) from None
except OSError as exc:
raise httpcore.ConnectError(exc) from None
return AsyncIOSocketStream(reader, writer)
class AsyncUDSTransport(httpcore.AsyncHTTPTransport):
def __init__(self, path: str) -> None:
self.path = path
self._connection: Optional[httpcore.AsyncHTTPTransport] = None
async def _create_connection(
self, *, timeout: dict = None
) -> httpcore.AsyncHTTPTransport:
timeout = {} if timeout is None else timeout
socket = await open_unix_socket_stream(self.path, timeout=timeout)
return AsyncHTTP11Connection(socket=socket)
async def request(
self,
method: bytes,
url: Tuple[bytes, bytes, Optional[int], bytes],
headers: list = None,
stream: httpcore.AsyncByteStream = None,
timeout: dict = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream]:
if self._connection is None:
self._connection = await self._create_connection(timeout=timeout)
return await self._connection.request(method, url, headers, stream, timeout)
async def aclose(self) -> None:
if self._connection is not None:
await self._connection.aclose()
"""
Alternative implementation using anyio for multi-concurrency-library support,
instead of sniffio + asyncio + trio.
(Also supports curio as a result.)
"""
from typing import List, Optional, Tuple
import anyio
import httpcore
# Caution, this is private API.
from httpcore._async.http11 import AsyncHTTP11Connection
from httpcore._backends.base import AsyncSocketStream
class AnyIOSocketStream(AsyncSocketStream):
def __init__(self, socket: anyio.SocketStream) -> None:
self._socket = socket
def get_http_version(self) -> str:
return "HTTP/1.1"
async def read(self, n: int, timeout: dict) -> bytes:
read_timeout = timeout.get("read")
async with anyio.move_on_after(read_timeout):
return await self._socket.receive_some(n)
raise httpcore.ReadTimeout()
async def write(self, data: bytes, timeout: dict) -> None:
write_timeout = timeout.get("write")
async with anyio.move_on_after(write_timeout):
await self._socket.send_all(data)
return
raise httpcore.WriteTimeout()
async def aclose(self) -> None:
await self._socket.close()
async def open_unix_socket_stream(path: str, *, timeout: dict) -> AnyIOSocketStream:
connect_timeout = timeout.get("connect")
async with anyio.move_on_after(connect_timeout):
try:
socket = await anyio.connect_unix(path)
except OSError as exc:
raise httpcore.ConnectError(exc) from None
return AnyIOSocketStream(socket)
raise httpcore.ConnectTimeout() from None
class AsyncUDSTransport(httpcore.AsyncHTTPTransport):
def __init__(self, path: str) -> None:
self.path = path
self._connection: Optional[httpcore.AsyncHTTPTransport] = None
async def _create_connection(
self, *, timeout: dict = None
) -> httpcore.AsyncHTTPTransport:
timeout = {} if timeout is None else timeout
socket = await open_unix_socket_stream(self.path, timeout=timeout)
return AsyncHTTP11Connection(socket=socket)
async def request(
self,
method: bytes,
url: Tuple[bytes, bytes, Optional[int], bytes],
headers: list = None,
stream: httpcore.AsyncByteStream = None,
timeout: dict = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream]:
if self._connection is None:
self._connection = await self._create_connection(timeout=timeout)
return await self._connection.request(method, url, headers, stream, timeout)
async def aclose(self) -> None:
if self._connection is not None:
await self._connection.aclose()
async def main() -> None:
import httpx
transport = AsyncUDSTransport(path="/var/run/docker.sock")
async with httpx.AsyncClient(transport=transport) as client:
response: httpx.Response = await client.get("http://localhost/info")
response.raise_for_status()
info = response.json()
print("Containers:", info["Containers"])
if __name__ == "__main__":
print("AsyncIO...")
anyio.run(main, backend="asyncio")
print("Trio...")
anyio.run(main, backend="trio")
print("Curio...")
anyio.run(main, backend="curio")
httpcore
httpx
trio
# Alternative anyio implementation.
anyio
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment