Last active
April 29, 2024 12:54
-
-
Save gimmi/c4a830d3db6180441a4beceb138351b9 to your computer and use it in GitHub Desktop.
Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from pathlib import Path | |
import json | |
default_path = Path(__file__, '..', 'defaults.json') | |
parser = argparse.ArgumentParser() | |
def defaults_from_json(*args): | |
global default_path | |
default_path = Path(*args) | |
def add_argument(*args, **kwargs): | |
parser.add_argument(*args, **kwargs) | |
def parse_args(): | |
args = json.loads(default_path.read_text()) | |
args = argparse.Namespace(**args) | |
return parser.parse_args(namespace=args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from types import SimpleNamespace | |
from urllib.request import Request | |
import urllib.parse | |
import json | |
import argparse | |
from pathlib import Path | |
import email.utils | |
import shutil | |
def get_access_token(tenant, app_id, password, resource): | |
# See https://docs.microsoft.com/en-us/azure/active-directory/azuread-dev/v1-oauth2-client-creds-grant-flow | |
req = Request( | |
f'https://login.microsoftonline.com/{tenant}/oauth2/token', | |
method='POST', | |
headers={ | |
'Content-Type': 'application/x-www-form-urlencoded', | |
}, | |
data=urllib.parse.urlencode({ | |
'grant_type': 'client_credentials', | |
'client_id': app_id, | |
'client_secret': password, | |
'resource': resource, | |
}).encode(), | |
) | |
with urllib.request.urlopen(req) as res: | |
config = json.load(res) | |
return config['access_token'] | |
def split_blob_url(url): | |
(scheme, netloc, path, _, _) = urllib.parse.urlsplit(url) | |
resource = urllib.parse.urlunsplit((scheme, netloc, '', '', '')) | |
(_, container_name, blob_name) = path.split('/', 2) | |
return (resource, container_name, blob_name) | |
def download_blob(tenant, app_id, password, blob_url, out_fileobj): | |
(resource, container_name, blob_name) = split_blob_url(blob_url) | |
# This require the "Storage Blob Data Reader" role on the service principal | |
access_token = get_access_token(tenant, app_id, password, resource) | |
req = Request( | |
f'{resource}/{container_name}/{blob_name}', | |
method='GET', | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
'Date': email.utils.formatdate(timeval=None, localtime=False, usegmt=True), | |
'x-ms-version': '2019-07-07', | |
}, | |
) | |
with urllib.request.urlopen(req) as res: | |
# https://stackoverflow.com/a/42011644/66629 | |
shutil.copyfileobj(res, out_fileobj) | |
def put_blob(tenant, app_id, password, blob_url, path): | |
(resource, container_name, blob_name) = split_blob_url(blob_url) | |
# This require the "Storage Blob Data Reader" role on the service principal | |
access_token = get_access_token(tenant, app_id, password, resource) | |
content_length = path.stat().st_size | |
with path.open('rb') as file: | |
req = Request( | |
f'{resource}/{container_name}/{blob_name}', | |
method='PUT', | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
'Date': email.utils.formatdate(timeval=None, localtime=False, usegmt=True), | |
'x-ms-version': '2019-07-07', | |
'x-ms-blob-type': 'BlockBlob', | |
'Content-Length': content_length, | |
}, | |
data=file, | |
) | |
urllib.request.urlopen(req) | |
if __name__ == '__main__': | |
tenant = os.environ['AZCOPY_TENANT_ID'] | |
app_id = os.environ['AZCOPY_SPA_APPLICATION_ID'] | |
password = os.environ['AZCOPY_SPA_CLIENT_SECRET'] | |
src = sys.argv[1] | |
dst = Path(sys.argv[2]) | |
print(f'{src} => {dst}') | |
dst.parent.mkdir(parents=True, exist_ok=True) | |
with dst.open('wb') as file: | |
download_blob(tenant, app_id, password, src, file) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request, json, base64, ssl | |
def http_get_json(username, password, url, verify=True): | |
ctx = None | |
if not verify: | |
# https://stackoverflow.com/a/58337431/66629 | |
ctx = ssl.create_default_context() | |
ctx.check_hostname = False | |
ctx.verify_mode = ssl.CERT_NONE | |
req = urllib.request.Request(url, headers={ 'Accept': 'application/json' }) | |
if password: | |
authorization = f'{username}:{password}'.encode('ascii') | |
authorization = base64.b64encode(authorization).decode('ascii') | |
req.add_header('Authorization', f'Basic {authorization}') | |
with urllib.request.urlopen(req, context=ctx) as res: | |
return json.load(res) | |
urls = [ | |
'https://httpbin.org/anything', # OK | |
'https://httpbin.org/redirect-to?url=http%3A%2F%2Flocalhost%2Fanything', # OK | |
'https://httpbin.org/status/404', # HTTP Error 404: NOT FOUND | |
'https://httpbin.org/status/500', # HTTP Error 500: INTERNAL SERVER ERROR | |
] | |
for url in urls: | |
try: | |
print(f'---- GET {url}') | |
print(http_get_json(url)) | |
except Exception as e: | |
print(e) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import re | |
class OneLineFormatter(logging.Formatter): | |
def __init__(self): | |
super().__init__('%(levelname)s:%(name)s:%(message)s') | |
def format(self, record): | |
log = super().format(record) | |
return re.sub(r'\s*\n+\s*', ' | ', log) | |
def configure_logging(): | |
formatter = OneLineFormatter() | |
handler = logging.StreamHandler() | |
handler.setFormatter(formatter) | |
root = logging.getLogger() | |
root.addHandler(handler) | |
root.setLevel(logging.INFO) | |
configure_logging() | |
logging.debug('This is a DEBUG') | |
logging.info('This is a INFO') | |
logging.warning('This is a WARNING') | |
logging.error('This is a ERROR') | |
logging.critical('This is a CRITICAL') | |
logging.getLogger().setLevel(logging.DEBUG) | |
logging.debug('This is a DEBUG') | |
logging.info('This is a INFO') | |
logging.warning('This is a WARNING') | |
logging.error('This is a ERROR') | |
logging.critical('This is a CRITICAL') | |
try: | |
raise Exception('message') | |
except Exception as e: | |
logging.exception('This is a EXCEPTION') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Inspired from https://stackoverflow.com/a/4628446/66629 | |
import sys | |
args = iter(sys.argv) | |
next(args) # Discard script name | |
for key, val in zip(args, args): | |
print(f'{key} -> {val}') | |
args = iter(sys.argv) | |
next(args) # Discard script name | |
print(dict(zip(args, args))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import sys | |
import textwrap | |
# subprocess.check_call(['scp -i', r'C:\Users\dev\.ssh\bastion-rke', os.path.join(os.path.dirname(__file__), 'xxx.py'), '[email protected]:/tmp/xxx.py']) | |
# subprocess.check_call(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke [email protected] python3 /tmp/xxx.py']) | |
# subprocess.check_call(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke [email protected] python3 -c print("llll")']) | |
subprocess.run(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke [email protected]', 'python3', '-', 'a', 'b', 'c'], check=True, text=True, input=textwrap.dedent(''' | |
import sys | |
print(f'Running python code with {len(sys.argv)} args:') | |
for i, arg in enumerate(sys.argv): | |
print(f"Argument {i:>6}: {arg}") | |
''')) | |
# TODO big problems with interpreting windows line endings | |
subprocess.run(['ssh', '-i', r'C:\Users\dev\.ssh\bastion-rke', '[email protected]', 'bash -s'], check=True, text=True, input='echo "hello from bash"') | |
subprocess.run(['ssh', '-i', r'C:\Users\dev\.ssh\bastion-rke', '[email protected]', 'docker', 'run', '-it', '--rm', 'hello-world'], check=True) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tarfile | |
tar_file = tarfile.open('file.tar', 'r:') | |
print(tar_file.getnames()) | |
for name in tar_file.getnames(): | |
print(f'-- name: {name} ---------------------') | |
with tar_file.extractfile(name) as rfile: | |
print(rfile.read()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment