This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture(autouse=True) | |
def mock_request(): | |
with responses.RequestsMock() as req: | |
yield req | |
@pytest.fixture() | |
def check_request(mock_request): | |
def _check_request(body): | |
if isinstance(body, dict): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.7-alpine3.9 | |
LABEL maintainer='[email protected]' | |
LABEL description='Python services base image' | |
# Don't periodically check PyPI to determine whether a new version of pip is available for download. | |
ENV PIP_DISABLE_PIP_VERSION_CHECK=on | |
# Disable package cache. | |
ENV PIP_NO_CACHE_DIR=off |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.dialects import postgresql | |
def upsert(session, model, fields, returning=False, return_columns=None): | |
table = model.__table__ | |
stmt = postgresql.insert(table).values(**fields) | |
pk_columns = table.primary_key.columns.keys() | |
update_cols = {col: val for col, val in fields.items() if col not in pk_columns} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any | |
import requests | |
REQUEST_DEFAULT_TIMEOUT = 5 | |
REQUEST_RETRIES = 5 | |
class SessionWithTimeout(requests.Session): | |
def request(self, *args: Any, **kwargs: Any): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@contextmanager | |
def session(*args: Any, **kwargs: Any) -> Iterator[sa.orm.Session]: | |
with closing(session_factory(*args, **kwargs)) as _session: | |
try: | |
yield _session | |
except Exception: | |
_session.rollback() | |
raise | |
else: | |
_session.commit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def threadpool(func: FUNC) -> FUNC: | |
@wraps(func) | |
def wrapper(*args: Any, **kwargs: Any) -> Awaitable[Any]: | |
loop = get_event_loop() | |
callback = partial(func, *args, **kwargs) | |
return loop.run_in_executor(None, callback) | |
return wrapper |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
function db_ready(){ | |
python << END | |
import sys | |
import sqlalchemy | |
try: | |
engine_pro = sqlalchemy.create_engine("${DATABASE}").connect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
server { | |
listen 80 default_server; | |
server_name _; | |
return 301 https://$host$request_uri; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3.3' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.0.0 | |
restart: always | |
environment: | |
ZOOKEEPER_CLIENT_PORT: 2181 | |
ZOOKEEPER_TICK_TIME: 2000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
server { | |
server_name rancher.afonasev.ru; # managed by Certbot | |
location / { | |
proxy_set_header Host $host; | |
proxy_set_header X-Forwarded-Proto $scheme; | |
proxy_set_header X-Forwarded-Port $server_port; | |
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; | |
proxy_pass http://localhost:8000; | |
proxy_http_version 1.1; |