Skip to content

Instantly share code, notes, and snippets.

@berryp
Created November 15, 2017 10:25
Show Gist options
  • Save berryp/9ad5acaa0c9c77306ce9ad006cbeebff to your computer and use it in GitHub Desktop.
Save berryp/9ad5acaa0c9c77306ce9ad006cbeebff to your computer and use it in GitHub Desktop.
aiodocker with Hyper.sh support
import asyncio
import json
import logging
import os
from pathlib import Path
import re
import ssl
import aiohttp
from yarl import URL
from .utils import httpize, parse_result
# Sub-API classes
from .containers import DockerContainers, DockerContainer
from .events import DockerEvents
from .exceptions import DockerError
from .images import DockerImages
from .logs import DockerLog
from .swarm import DockerSwarm
from .services import DockerServices
from .tasks import DockerTasks
from .requests_aws4auth import AWS4Auth
from .volumes import DockerVolumes, DockerVolume
__all__ = (
'Docker',
'DockerContainers',
'DockerContainer',
'DockerEvents',
'DockerError',
'DockerImages',
'DockerLog',
'DockerSwarm',
'DockerServices',
'DockerTasks',
'DockerVolumes',
'DockerVolume',
)
log = logging.getLogger(__name__)
_sock_search_paths = [
Path('/run/docker.sock'),
Path('/var/run/docker.sock'),
]
_rx_version = re.compile(r'^v\d+\.\d+$')
_rx_tcp_schemes = re.compile(r'^(tcp|http)://')
class MockRequest:
"""
The standard library used to handle AWS4 authentication is designed
specifically for the python-requests library. As such, it exposes
an interface that expects a requests.Request object. The Request
initializer expects thins we don't have so we create a mock Request
object to pass to the aws4auth library.
"""
def __init__(self, url, method, headers=None):
self.url = url
self.method = method
self.headers = headers or {}
class Docker:
def __init__(self,
url=None,
connector=None,
session=None,
ssl_context=None,
api_version='v1.26',
credentials=None):
docker_host = url # rename
if docker_host is None:
docker_host = os.environ.get('DOCKER_HOST', None)
if docker_host is None:
for sockpath in _sock_search_paths:
if sockpath.is_socket():
docker_host = 'unix://' + str(sockpath)
break
self.docker_host = docker_host
assert _rx_version.search(api_version) is not None, \
'Invalid API version format'
self.api_version = api_version
self.credentials = credentials
if connector is None:
if _rx_tcp_schemes.search(docker_host):
if os.environ.get('DOCKER_TLS_VERIFY', '0') == '1':
ssl_context = self._docker_machine_ssl_context()
docker_host = _rx_tcp_schemes.sub('https://', docker_host)
else:
ssl_context = None
connector = aiohttp.TCPConnector(ssl_context=ssl_context)
self.docker_host = docker_host
elif docker_host.startswith('unix://'):
connector = aiohttp.UnixConnector(docker_host[7:])
# dummy hostname for URL composition
self.docker_host = "unix://localhost"
else:
raise ValueError('Missing protocol scheme in docker_host.')
self.connector = connector
if session is None:
session = aiohttp.ClientSession(connector=self.connector)
self.session = session
self.events = DockerEvents(self)
self.containers = DockerContainers(self)
self.swarm = DockerSwarm(self)
self.services = DockerServices(self)
self.tasks = DockerTasks(self)
self.images = DockerImages(self)
self.volumes = DockerVolumes(self)
# legacy aliases
self.pull = self.images.pull
self.push = self.images.push
async def close(self):
await self.events.stop()
await self.session.close()
async def auth(self, **credentials):
response = await self._query_json(
"auth",
"GET",
data=credentials,
)
return response
async def version(self):
data = await self._query_json("version")
return data
def _canonicalize_url(self, path):
return URL("{self.docker_host}/{self.api_version}/{path}".format(
self=self, path=path))
def _auth_headers(self, url, method):
'''
Get AWS4 authentication headers
'''
url = url.human_repr().replace('tcp', 'https')
req = MockRequest(url, method)
auth = AWS4Auth(self.credentials['accesskey'],
self.credentials['secretkey'],
self.credentials['region'], 'hyper')
return auth(req).headers
async def _query(self,
path,
method='GET',
*,
params=None,
data=None,
headers=None,
timeout=None):
'''
Get the response object by performing the HTTP request.
The caller is responsible to finalize the response object.
'''
url = self._canonicalize_url(path)
headers = headers or {}
if headers and 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
# Attach the Hyper auth headers.
auth_headers = self._auth_headers(url, method)
headers.update(auth_headers)
try:
response = await self.session.request(
method,
url,
params=httpize(params),
headers=headers,
data=data,
timeout=timeout)
except asyncio.TimeoutError:
raise
if (response.status // 100) in [4, 5]:
what = await response.read()
content_type = response.headers.get('Content-Type', '')
response.close()
if content_type == 'application/json':
raise DockerError(response.status,
json.loads(what.decode('utf8')))
else:
raise DockerError(response.status, {
"message": what.decode('utf8')
})
return response
async def _query_json(self,
path,
method='GET',
*,
params=None,
data=None,
headers=None,
timeout=None):
'''
A shorthand of _query() that treats the input as JSON.
'''
if headers is None:
headers = {}
headers['Content-Type'] = 'application/json'
if not isinstance(data, (str, bytes)):
data = json.dumps(data)
response = await self._query(
path,
method,
params=params,
data=data,
headers=headers,
timeout=timeout)
data = await parse_result(response, 'json')
return data
async def _websocket(self, path, **params):
if not params:
params = {
'stdin': True,
'stdout': True,
'stderr': True,
'stream': True
}
url = self._canonicalize_url(path)
# ws_connect() does not have params arg.
url = url.with_query(httpize(params))
ws = await self.session.ws_connect(
url,
protocols=['chat'],
origin='http://localhost',
autoping=True,
autoclose=True)
return ws
@staticmethod
def _docker_machine_ssl_context():
'''
Create a SSLContext object using DOCKER_* env vars.
'''
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS)
certs_path = os.environ.get('DOCKER_CERT_PATH', None)
if certs_path is None:
raise ValueError("Cannot create ssl context, "
"DOCKER_CERT_PATH is not set!")
certs_path = Path(certs_path)
context.load_verify_locations(cafile=certs_path / 'ca.pem')
context.load_cert_chain(
certfile=certs_path / 'cert.pem', keyfile=certs_path / 'key.pem')
return context
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment