Last active
December 24, 2023 16:35
-
-
Save kissgyorgy/9e58881131aeea51ed0a2c8bbffbdba4 to your computer and use it in GitHub Desktop.
Python: HTTP over SSH
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from typing import Optional | |
import asyncssh | |
import httpx | |
class MySSHTCPSession(asyncssh.SSHTCPSession): | |
def __init__(self): | |
self._buffer = [] | |
def data_received(self, data: bytes, datatype: asyncssh.DataType) -> None: | |
self._buffer.append(data) | |
def connection_lost(self, exc: Optional[Exception]) -> None: | |
if exc: | |
print("Direct connection error:", str(exc), file=sys.stderr) | |
class HelloWorldTransport(httpx.AsyncBaseTransport): | |
""" | |
A mock transport that always returns a JSON "Hello, world!" response. | |
""" | |
def __init__(self): | |
self._conn = None | |
async def connect(self): | |
self._conn = await asyncssh.connect("localhost") | |
await self._conn.__aenter__() | |
async def handle_async_request(self, request: httpx.Request) -> httpx.Response: | |
if self._conn is None: | |
await self.connect() | |
chan, session = await self._conn.create_connection( | |
MySSHTCPSession, request.url.host, 80 | |
) | |
chan.write(b"HEAD / HTTP/1.0\r\n\r\n") | |
chan.write_eof() | |
await chan.wait_closed() | |
response = b"".join(session._buffer) | |
lines = response.splitlines() | |
status_code = int(lines[0].split(b" ")[1]) | |
headers = {} | |
index = None | |
for index, line in enumerate(lines[1:], start=1): | |
if line == b"": | |
break | |
# breakpoint() | |
header, value = line.split(b": ", 1) | |
headers[header] = value.strip() | |
body = b"".join(lines[index + 1 :]) if index is not None else b"" | |
stream = httpx.ByteStream(body) | |
return httpx.Response(status_code, headers=headers, stream=stream) | |
async def run_client(url: str | httpx.URL) -> httpx.Response: | |
client = httpx.AsyncClient(transport=HelloWorldTransport()) | |
response = await client.get(url) | |
await client.aclose() | |
return response | |
request_coro = run_client("http://www.google.com") | |
response = asyncio.run(request_coro) | |
print("STATUS: ", response.status_code) | |
print("HEADERS: ", response.headers) | |
print("BODY: ", response.text) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from typing import Optional | |
import asyncssh | |
import httpx | |
class SSHSession(asyncssh.SSHClientSession): | |
def __init__(self): | |
self.buffer = [] | |
def data_received(self, data: bytes, datatype: asyncssh.DataType) -> None: | |
self.buffer.append(data) | |
def connection_lost(self, exc: Optional[Exception]) -> None: | |
if exc: | |
print("Direct connection error:", str(exc), file=sys.stderr) | |
class SSHTransport(httpx.AsyncBaseTransport): | |
def __init__(self): | |
self._conn = None | |
async def connect(self): | |
self._conn = await asyncssh.connect( | |
"localhost", 8022, username="walkman", password="blabla" | |
) | |
await self._conn.__aenter__() | |
async def handle_async_request(self, request: httpx.Request) -> httpx.Response: | |
if self._conn is None: | |
await self.connect() | |
assert self._conn is not None | |
chan, session = await self._conn.create_session(SSHSession) | |
chan.write(f"{request.method} {request.url.path} HTTP/1.0\r\n\r\n") | |
chan.write_eof() | |
await chan.wait_closed() | |
response = "".join(session.buffer) | |
lines = response.splitlines() | |
status_code = int(lines[0].split(" ")[1]) | |
headers = {} | |
index = None | |
for index, line in enumerate(lines[1:], start=1): | |
if not line: | |
break | |
header, value = line.split(": ", 1) | |
headers[header] = value.strip() | |
body = "".join(lines[index + 1 :]) if index is not None else "" | |
return httpx.Response(status_code, headers=headers, content=body) | |
async def run_client(url: str | httpx.URL) -> httpx.Response: | |
client = httpx.AsyncClient(transport=SSHTransport(), base_url="http://testserver") | |
response = await client.get(url) | |
await client.aclose() | |
return response | |
request_coro = run_client("/") | |
response = asyncio.run(request_coro) | |
print("STATUS:", response.status_code) | |
print("HEADERS:", response.headers) | |
print("BODY:", repr(response.text)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import asyncssh | |
from flask import Flask | |
app = Flask(__name__) | |
@app.route("/") | |
def hello_world(): | |
return "<p>Hello, World!</p>" | |
async def wsgi_response(process: asyncssh.SSHServerProcess) -> None: | |
write = process.stdout.write | |
def start_response(status, headers): | |
write(f"HTTP/1.0 {status}\r\n") | |
for name, value in headers: | |
write(f"{name}: {value}\r\n") | |
write("\r\n") | |
request_line = await process.stdin.readline() | |
method, path, version = request_line.split(" ", 2) | |
print("REQUEST:", method, path, version) | |
env = { | |
"REQUEST_METHOD": method, | |
"PATH_INFO": path, | |
"SERVER_PROTOCOL": version, | |
"wsgi.url_scheme": "http", | |
} | |
try: | |
body = app(env, start_response) | |
except Exception as e: | |
print("Exception:", type(e), e) | |
process.exit(1) | |
else: | |
for chunk in body: | |
write(chunk.decode()) | |
process.exit(0) | |
class NoAuthSSHServer(asyncssh.SSHServer): | |
def begin_auth(self, username: str) -> bool: | |
print("begin_auth", username) | |
# Immediately accept any username | |
return False | |
async def start_server() -> None: | |
await asyncssh.create_server( | |
NoAuthSSHServer, | |
"", | |
8022, | |
server_host_keys=["ssh_host_ed25519_key"], | |
process_factory=wsgi_response, | |
) | |
loop = asyncio.get_event_loop() | |
server_coro = start_server() | |
loop.run_until_complete(server_coro) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment