Last active
June 23, 2023 13:12
-
-
Save UtsavChokshiCNU/54de3050a2179ae3d3953aa2e95bb0f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Command line interface for chalice. | |
Contains commands for deploying chalice. | |
""" | |
from __future__ import annotations | |
import functools | |
import logging | |
import os | |
import platform | |
import sys | |
import traceback | |
import uuid | |
from typing import Optional # noqa | |
import botocore.exceptions | |
import chalice | |
import click | |
from chalice import __version__ as chalice_version | |
from chalice.app import WebsocketEvent | |
from chalice.cli.factory import CLIFactory | |
from chalice.config import Config # noqa | |
from chalice.constants import DEFAULT_STAGE_NAME | |
from chalice.deploy.validate import (ExperimentalFeatureError, validate_routes) | |
from websockets.exceptions import ConnectionClosedError | |
from websockets.sync.server import serve as websocket_serve | |
from urllib.parse import parse_qs, urlparse | |
connections = { | |
} | |
def message_handler(app_object, websocket): | |
global connections | |
conn_id = str(uuid.uuid1()) | |
connections[conn_id] = websocket | |
print((parse_qs(urlparse(websocket.request.path).query))) | |
base_event_dict = { | |
'requestContext': { | |
'stage': '', | |
'domainName': '', | |
'connectionId': conn_id | |
}, | |
'queryStringParameters': {k: v[0] for k, v in (parse_qs(urlparse(websocket.request.path).query).items())} | |
} | |
if app_object.websocket_handlers.get('$connect'): | |
app_object.websocket_handlers['$connect'].handler_function( | |
WebsocketEvent(base_event_dict, {})) | |
try: | |
for message in websocket: | |
print("in loop", message) | |
event_dict = { | |
'requestContext': { | |
'stage': '', | |
'domainName': '', | |
'connectionId': conn_id, | |
}, | |
'body': message | |
} | |
app_object.websocket_handlers['$default'].handler_function( | |
WebsocketEvent(event_dict, {})) | |
except ConnectionClosedError: | |
pass | |
if app_object.websocket_handlers.get('$disconnect'): | |
app_object.websocket_handlers['$disconnect'].handler_function( | |
WebsocketEvent(base_event_dict, {})) | |
del connections[conn_id] | |
def websocket_send(connection_id: str, message: str) -> None: | |
websocket = connections.get(connection_id) | |
if not websocket: | |
raise Exception("Invalid connection id") | |
websocket.send(message) | |
class LocalDevServer: | |
def __init__(self, app_object, config, host, port): | |
self.app_object = app_object | |
self.host = host | |
self.port = port | |
# self._wrapped_handler = functools.partial( | |
# handler_cls, app_object=app_object, config=config) | |
handler = functools.partial(message_handler, app_object) | |
self.server = websocket_serve(handler, host, port) | |
def serve_forever(self): | |
# type: () -> None | |
print("Serving on wss://%s:%s" % (self.host, self.port)) | |
self.server.serve_forever() | |
def shutdown(self): | |
# type: () -> None | |
# This must be called from another thread of else it | |
# will deadlock. | |
self.server.shutdown() | |
def _configure_cli_env_vars(): | |
# type: () -> None | |
# This will set chalice specific env vars so users can detect if | |
# we're running a Chalice CLI command. This is useful if you want | |
# conditional behavior only when we're actually running in Lambda | |
# in your app.py file. | |
os.environ['AWS_CHALICE_CLI_MODE'] = 'true' | |
def _configure_logging(level, format_string=None): | |
# type: (int, Optional[str]) -> None | |
if format_string is None: | |
format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s" | |
logger = logging.getLogger('') | |
logger.setLevel(level) | |
handler = logging.StreamHandler() | |
handler.setLevel(level) | |
formatter = logging.Formatter(format_string) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
def get_system_info(): | |
# type: () -> str | |
python_info = "python {}.{}.{}".format(sys.version_info[0], | |
sys.version_info[1], | |
sys.version_info[2]) | |
platform_system = platform.system().lower() | |
platform_release = platform.release() | |
platform_info = "{} {}".format(platform_system, platform_release) | |
return "{}, {}".format(python_info, platform_info) | |
# create handler for each connection | |
def factory_create_local_server( | |
app_obj: chalice, config: Config, host: str, port: int | |
) -> LocalDevServer: | |
return LocalDevServer(app_obj, config, host, port) | |
@click.group() | |
@click.version_option(version=chalice_version, | |
message='%(prog)s %(version)s, {}' | |
.format(get_system_info())) | |
@click.option('--project-dir', | |
help='The project directory path (absolute or relative).' | |
'Defaults to CWD') | |
@click.option('--debug/--no-debug', | |
default=False, | |
help='Print debug logs to stderr.') | |
@click.pass_context | |
def cli(ctx, project_dir, debug=False): | |
# type: (click.Context, str, bool) -> None | |
if project_dir is None: | |
project_dir = os.getcwd() | |
elif not os.path.isabs(project_dir): | |
project_dir = os.path.abspath(project_dir) | |
if debug is True: | |
_configure_logging(logging.DEBUG) | |
_configure_cli_env_vars() | |
ctx.obj['project_dir'] = project_dir | |
ctx.obj['debug'] = debug | |
ctx.obj['factory'] = CLIFactory(project_dir, debug, environ=os.environ) | |
ctx.obj['factory'].create_local_server = factory_create_local_server | |
os.chdir(project_dir) | |
@cli.command() | |
@click.option('--host', default='127.0.0.1') | |
@click.option('--port', default=8000, type=click.INT) | |
@click.option('--stage', default=DEFAULT_STAGE_NAME, | |
help='Name of the Chalice stage for the local server to use.') | |
@click.option('--autoreload/--no-autoreload', | |
default=True, | |
help='Automatically restart server when code changes.') | |
@click.pass_context | |
def local(ctx, host='127.0.0.1', port=8000, stage=DEFAULT_STAGE_NAME, | |
autoreload=True): | |
factory = ctx.obj['factory'] # type: CLIFactory | |
from chalice.cli import reloader | |
# We don't create the server here because that will bind the | |
# socket and we only want to do this in the worker process. | |
server_factory = functools.partial( | |
create_local_server, factory, host, port, stage) | |
# When running `chalice local`, a stdout logger is configured | |
# so you'll see the same stdout logging as you would when | |
# running in lambda. This is configuring the root logger. | |
# The app-specific logger (app.log) will still continue | |
# to work. | |
logging.basicConfig( | |
stream=sys.stdout, level=logging.INFO, format='%(message)s') | |
if autoreload: | |
project_dir = factory.create_config_obj( | |
chalice_stage_name=stage).project_dir | |
print(project_dir) | |
rc = reloader.run_with_reloader( | |
server_factory, os.environ, project_dir) | |
# Click doesn't sys.exit() with the RC this function. The | |
# recommended way to do this is to use sys.exit() directly, | |
# see: https://github.com/pallets/click/issues/747 | |
sys.exit(rc) | |
run_local_server(factory, host, port, stage) | |
def create_local_server(factory, host, port, stage): | |
# type: (CLIFactory, str, int, str) -> LocalDevServer | |
config = factory.create_config_obj( | |
chalice_stage_name=stage | |
) | |
app_obj = config.chalice_app | |
app_obj.websocket_api.send = websocket_send | |
# Check that `chalice deploy` would let us deploy these routes, otherwise | |
# there is no point in testing locally. | |
routes = config.chalice_app.routes | |
validate_routes(routes) | |
server = factory.create_local_server(app_obj, config, host, port) | |
return server | |
def run_local_server(factory, host, port, stage): | |
# type: (CLIFactory, str, int, str) -> None | |
server = create_local_server(factory, host, port, stage) | |
# server.serve_forever() | |
def main(): | |
# type: () -> int | |
# click's dynamic attrs will allow us to pass through | |
# 'obj' via the context object, so we're ignoring | |
# these error messages from pylint because we know it's ok. | |
# pylint: disable=unexpected-keyword-arg,no-value-for-parameter | |
try: | |
return cli(obj={}) | |
except botocore.exceptions.NoRegionError: | |
click.echo("No region configured. " | |
"Either export the AWS_DEFAULT_REGION " | |
"environment variable or set the " | |
"region value in our ~/.aws/config file.", err=True) | |
return 2 | |
except ExperimentalFeatureError as e: | |
click.echo(str(e)) | |
return 2 | |
except Exception: | |
click.echo(traceback.format_exc(), err=True) | |
return | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
One needs to install :
pip install websockets