Skip to content

Instantly share code, notes, and snippets.

@Garciat
Last active February 22, 2025 08:07
Show Gist options
  • Save Garciat/4112888e649f37d032ed671265781b8c to your computer and use it in GitHub Desktop.
Save Garciat/4112888e649f37d032ed671265781b8c to your computer and use it in GitHub Desktop.
Manage your VS Code Remote Tunnel with Python 3 + CGI.
# pip install google-api-python-client python-dotenv
from typing import Optional, List, Tuple, Dict
from dataclasses import dataclass
import http.cookies
import os
import subprocess
import sys
import urllib.parse
from dotenv import load_dotenv
load_dotenv()
VSCODE_COMMAND = os.environ.get('VSCODE_COMMAND')
GOOGLE_CLIENT_ID = os.environ.get('GOOGLE_CLIENT_ID')
AUTH_USER_EMAILS = os.environ.get('AUTH_USER_EMAIL').split(',')
AUTH_TOKEN_COOKIE = 'auth_token'
@dataclass
class Request:
method: str
uri: urllib.parse.ParseResult
headers: List[Tuple[str, str]]
cookies: http.cookies.SimpleCookie
body: bytes
@staticmethod
def parse_cgi():
method = os.environ['REQUEST_METHOD'].upper()
uri = urllib.parse.urlparse(os.environ['REQUEST_URI'])
body = sys.stdin.buffer.read()
cookies = http.cookies.SimpleCookie()
cookies.load(os.environ.get('HTTP_COOKIE') or '')
return Request(
method=method,
uri=uri,
headers=[], # TODO
cookies=cookies,
body=body,
)
class Response:
headers: List[Tuple[str, str]] = []
cookies: http.cookies.SimpleCookie = http.cookies.SimpleCookie()
body: str = ''
def set_html(self):
self.headers.append(('Content-Type', 'text/html'))
def print(self, s=''):
self.body += s
self.body += '\n'
def write_cgi(self):
for (name, value) in self.headers:
print(f'{name}: {value}')
print(self.cookies.output())
print('\r\n')
print(self.body)
@staticmethod
def from_text(s):
res = Response()
res.set_html()
res.body = s
return res
@staticmethod
def from_console_text(s: Optional[str] = None):
res = Response()
res.set_html()
res.print('<style>html{color-scheme:dark}</style>')
res.print('<pre>')
if s is not None:
res.print(s)
return res
def pgrep_command(cmd: str) -> Optional[int]:
proc = subprocess.run(['pgrep', '-f', cmd], capture_output=True)
if proc.returncode != 0:
return None
return int(proc.stdout.decode().strip())
def pgrep_children(pid: int) -> List[int]:
proc = subprocess.run(['pgrep', '-P', str(pid)], capture_output=True)
if proc.returncode != 0:
return []
return list(map(int, proc.stdout.decode().strip().splitlines()))
def pgrep_whole_tree(pid: int) -> List[int]:
pids = [pid]
for child in pgrep_children(pid):
pids.extend(pgrep_whole_tree(child))
return pids
@dataclass
class ProcessInfo:
pid: int
command: str
def ps_info(pid: int) -> ProcessInfo:
proc = subprocess.run(['ps', '-o', 'pid=,command=', '-p', str(pid)], capture_output=True)
if proc.returncode != 0:
raise Exception('ps_info() fail')
pid_str, command = proc.stdout.decode().strip().split(' ', 1)
return ProcessInfo(pid=int(pid_str), command=command)
def kill_pid(pid: int, *, kill_opt: str):
proc = subprocess.run(['kill', '-'+kill_opt, str(pid)], capture_output=True)
if proc.returncode != 0:
raise Exception('kill_term() fail')
def exec_background(args):
subprocess.Popen(
args,
close_fds=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def command_show(res, cmd):
cmd_pid = pgrep_command(cmd)
if cmd_pid is None:
res.print('Not running')
return
res.print()
res.print('Processes:')
pids = pgrep_whole_tree(cmd_pid)
for pid in pids:
res.print(repr(ps_info(pid)))
res.print()
res.print('Output:')
with open('../tmp/vscode-tunnel', 'rt') as f:
res.body += f.read()
def command_start(res, cmd):
cmd_pid = pgrep_command(cmd)
if cmd_pid is not None:
res.print('Already running')
return
output_file = '../tmp/vscode-tunnel'
with open(output_file, 'wb') as f:
f.truncate()
args = ['bash', '-c', f'{cmd} 2>&1 >{output_file} &']
res.print(f'Running: {args}')
exec_background(args)
def command_kill(res, cmd, *, kill_opt):
cmd_pid = pgrep_command(cmd)
if cmd_pid is None:
res.print('Not running')
return
pids = pgrep_whole_tree(cmd_pid)
for pid in pids:
res.print(f'Killing {pid}')
kill_pid(pid, kill_opt=kill_opt)
def main(req: Request) -> Response:
res = Response.from_console_text()
import getpass
res.print(f'User: {getpass.getuser()}')
res.print(f'Python: {sys.executable}')
res.print(f'CWD: {os.getcwd()}')
cmd = VSCODE_COMMAND
arg = req.uri.query
match arg:
case 'clear':
redirect = req.uri._replace(query='').geturl()
secs = 5
res.headers.append(('Clear-Site-Data', '"*"'))
res.headers.append(('Refresh', f'{secs}; {redirect}'))
res.printf(f'Sent Clear-Site-Data. Refreshing in {secs} seconds.')
case 'kill':
command_kill(res, cmd, kill_opt='TERM')
case 'kill9':
command_kill(res, cmd, kill_opt='9')
case 'start':
command_start(res, cmd)
case _:
command_show(res, cmd)
return res
def do_auth(req: Request) -> Optional[Response]:
if GOOGLE_CLIENT_ID is None:
raise Exception('Missing: GOOGLE_CLIENT_ID')
if AUTH_USER_EMAILS is None:
raise Exception('Missing: AUTH_USER_EMAIL')
match req.method:
case 'GET':
return handle_auth_get(req)
case 'POST':
return handle_auth_post(req)
def handle_auth_get(req: Request) -> Optional[Response]:
import google.auth.exceptions
token_cookie = req.cookies.get(AUTH_TOKEN_COOKIE)
if token_cookie is None:
return display_auth_page()
token = token_cookie.value
try:
verify_auth_token(token)
except google.auth.exceptions.InvalidValue:
res = Response()
res.cookies[AUTH_TOKEN_COOKIE] = ""
res.cookies[AUTH_TOKEN_COOKIE]['secure'] = True
res.cookies[AUTH_TOKEN_COOKIE]['expires'] = -1
res.headers.append(('Refresh', f'1; {req.uri._replace(query='').geturl()}'))
return res
def display_auth_page() -> Response:
return Response.from_text("""
<!DOCTYPE html>
<html>
<body>
<script src="https://accounts.google.com/gsi/client" async></script>
<script>
async function handleCredentialResponse(result) {
const response = await fetch(window.location.toString(), {
method: "POST",
body: result.credential,
});
console.log(await response.text());
window.location.reload();
}
window.onload = function () {
google.accounts.id.initialize({
client_id: "%s",
use_fedcm_for_prompt: true,
callback: handleCredentialResponse,
});
google.accounts.id.renderButton(
document.getElementById("buttonDiv"),
{ theme: "outline", size: "large" } // customization attributes
);
google.accounts.id.prompt(); // also display the One Tap dialog
}
</script>
<div id="buttonDiv"></div>
</body>
</html>
""" % (GOOGLE_CLIENT_ID, ))
def handle_auth_post(req: Request) -> Response:
token = req.body.decode()
verify_auth_token(token)
res = Response()
res.set_html()
res.cookies[AUTH_TOKEN_COOKIE] = token
res.cookies[AUTH_TOKEN_COOKIE]['secure'] = True
res.body = 'OK'
return res
def verify_auth_token(token):
from google.oauth2 import id_token
from google.auth.transport import requests
idinfo = id_token.verify_oauth2_token(token, requests.Request(), GOOGLE_CLIENT_ID)
if idinfo['email'] not in AUTH_USER_EMAILS:
raise PermissionError('Invalid user')
def handle_request(req: Request) -> Response:
res = do_auth(req)
if res is None:
res = main(req)
return res
if __name__ == '__main__':
try:
req = Request.parse_cgi()
res = handle_request(req)
res.write_cgi()
except SystemExit:
raise
except:
import traceback
Response.from_console_text(traceback.format_exc()).write_cgi()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment