Last active
May 23, 2020 16:36
-
-
Save gwsu2008/1c091ffe3399be21ae28af9460da37f1 to your computer and use it in GitHub Desktop.
Python Scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Useful Python scripts and tips | |
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Node(object): | |
def __init__(self): | |
self.left = None | |
self.right = None | |
self.value = None | |
@property | |
def get_value(self): | |
return self.value | |
@property | |
def get_left(self): | |
return self.left | |
@property | |
def get_right(self): | |
return self.right | |
@get_left.setter | |
def set_left(self, left_node): | |
self.left = left_node | |
@get_value.setter | |
def set_value(self, value): | |
self.value = value | |
@get_right.setter | |
def set_right(self, right_node): | |
self.right = right_node | |
def create_tree(self): | |
_node = Node() #creating new node. | |
_x = input("Enter the node data(-1 for null)") | |
if(_x == str(-1)): #for defining no child. | |
return None | |
_node.set_value = _x #setting the value of the node. | |
print("Enter the left child of {}".format(_x)) | |
_node.set_left = self.create_tree() #setting the left subtree. | |
print("Enter the right child of {}".format(_x)) | |
_node.set_right = self.create_tree() #setting the right subtree. | |
return _node | |
if __name__ == '__main__': | |
node = Node() | |
root_node = node.create_tree() | |
print(root_node.get_value) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import padding | |
from botocore.signers import CloudFrontSigner | |
def rsa_signer(message): | |
with open('path/to/key.pem', 'rb') as key_file: | |
private_key = serialization.load_pem_private_key(key_file.read(), password=None, backend=default_backend()) | |
return private_key.sign(message, padding.PKCS1v15(), hashes.SHA1()) | |
key_id = 'xyx' | |
url = 'http://d2949o5mkkp72v.cloudfront.net/hello.txt' | |
expire_date = datetime.datetime(2017, 1, 1) | |
cloudfront_signer = CloudFrontSigner(key_id, rsa_signer) | |
# Create a signed url that will be valid until the specific expiry date provided using a canned policy. | |
signed_url = cloudfront_signer.generate_presigned_url(url, date_less_than=expire_date) | |
print(signed_url) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import inspect | |
class Singleton(type): | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
class MyLogger(metaclass=Singleton): | |
logger = None | |
def __init__(self): | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(threadName)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler() | |
]) | |
self.logger = logging.getLogger(__name__ + '.logger') | |
@staticmethod | |
def __get_call_info(): | |
stack = inspect.stack() | |
# stack[1] gives previous function ('info' in our case) | |
# stack[2] gives before previous function and so on | |
fn = stack[2][1] | |
ln = stack[2][2] | |
func = stack[2][3] | |
return fn, func, ln | |
def info(self, message, *args): | |
message = "{} - {} at line {}: {}".format(*self.__get_call_info(), message) | |
self.logger.info(message, *args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request as ul | |
import argparse | |
def getCode(url): | |
raw = ul.urlopen(url).read() | |
code = raw.decode() | |
sys.stdout.write(code) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Hostname") | |
parser.add_argument("--url",action="store",dest="url",required=True) | |
args = parser.parse_args() | |
url = args.url | |
getCode(url) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple iOS tests, showing accessing elements and getting/setting text from them. | |
""" | |
import unittest | |
import os | |
from random import randint | |
from appium import webdriver | |
from time import sleep | |
class SimpleIOSTests(unittest.TestCase): | |
def setUp(self): | |
# set up appium | |
app = os.path.join(os.path.dirname(__file__), | |
'/Users/gsu/Downloads', | |
'proteus-mobile-app.app') | |
app = os.path.abspath(app) | |
self.driver = webdriver.Remote( | |
command_executor='http://127.0.0.1:4723/wd/hub', | |
desired_capabilities={ | |
'app': app, | |
'platformName': 'iOS', | |
'platformVersion': '9.3', | |
'deviceName': 'iPhone 6' | |
}) | |
def tearDown(self): | |
self.driver.quit() | |
def _populate(self): | |
# populate text fields with two random numbers | |
els = [self.driver.find_element_by_accessibility_id('Sign In'), | |
self.driver.find_element_by_accessibility_id('Register')] | |
screenshot_folder = os.getenv('SCREENSHOT_PATH', '') | |
self.driver.save_screenshot(screenshot_folder + "/screenshot_main.png") | |
self.assertTrue(1, "This is just a test") | |
def test_ui_computation(self): | |
# populate text fields with values | |
self._populate() | |
# trigger computation by using the button | |
self.driver.find_element_by_accessibility_id('Register').click() | |
# is sum equal ? | |
# sauce does not handle class name, so get fourth element | |
sum = self.driver.find_element_by_name('Terms & Conditions').text | |
screenshot_folder = os.getenv('SCREENSHOT_PATH', '') | |
self.driver.save_screenshot(screenshot_folder + "/screenshot_register.png") | |
self.assertEqual(1, 1) | |
@unittest.skip("no scroll") | |
def test_scroll(self): | |
els = self.driver.find_elements_by_class_name('UIAButton') | |
els[5].click() | |
sleep(1) | |
try: | |
el = self.driver.find_element_by_accessibility_id('OK') | |
el.click() | |
sleep(1) | |
except: | |
pass | |
el = self.driver.find_element_by_xpath('//UIAMapView[1]') | |
location = el.location | |
self.driver.swipe(start_x=location['x'], start_y=location['y'], end_x=0.5, end_y=location['y'], duration=800) | |
if __name__ == '__main__': | |
suite = unittest.TestLoader().loadTestsFromTestCase(SimpleIOSTests) | |
unittest.TextTestRunner(verbosity=2).run(suite) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
To debug the program, send the SIGUSR2 to the PID. You can find the PID by using 'ps aux | grep python' | |
kill -SIGUSR2 {PID} | |
""" | |
import signal | |
import pdb | |
def open_debug(sig, frame): | |
print 'sig', sig | |
pdb.set_trace() | |
signal.signal(signal.SIGUSR2, open_debug) | |
# a long-running example | |
import time | |
while True: | |
print("sleep now") | |
time.sleep(5) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
# The calls to AWS STS AssumeRole must be signed with the access key ID | |
# and secret access key of an existing IAM user or by using existing temporary | |
# credentials such as those from antoher role. (You cannot call AssumeRole | |
# with the access key for the root account.) The credentials can be in | |
# environment variables or in a configuration file and will be discovered | |
# automatically by the boto3.client() function. For more information, see the | |
# Python SDK documentation: | |
# http://boto3.readthedocs.io/en/latest/reference/services/sts.html#client | |
# create an STS client object that represents a live connection to the | |
# STS service | |
sts_client = boto3.client('sts') | |
# Call the assume_role method of the STSConnection object and pass the role | |
# ARN and a role session name. | |
assumedRoleObject = sts_client.assume_role( | |
RoleArn="arn:aws:iam::account-of-role-to-assume:role/name-of-role", | |
RoleSessionName="AssumeRoleSession1" | |
) | |
# From the response that contains the assumed role, get the temporary | |
# credentials that can be used to make subsequent API calls | |
credentials = assumedRoleObject['Credentials'] | |
# Use the temporary credentials that AssumeRole returns to make a | |
# connection to Amazon S3 | |
s3_resource = boto3.resource( | |
's3', | |
aws_access_key_id = credentials['AccessKeyId'], | |
aws_secret_access_key = credentials['SecretAccessKey'], | |
aws_session_token = credentials['SessionToken'], | |
) | |
# Use the Amazon S3 resource object that is now configured with the | |
# credentials to access your S3 buckets. | |
for bucket in s3_resource.buckets.all(): | |
print(bucket.name) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import time | |
region = 'eu-west-1' | |
user_data_script = """#!/bin/bash | |
instanceid=$(curl http://169.254.169.254/latest/meta-data/instance-id) | |
cd / | |
mkdir moodledata | |
mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-xxxxxxxxxxc.efs.eu-west-1.amazonaws.com:/ moodledata | |
tar czf mooodledata-backup-$(date +%d-%m-%Y_%H-%M).tar.gz /moodledata | |
aws s3 mv mooodledata-backup-*.tar.gz s3://xxxxxxxxx/ | |
aws ec2 terminate-instances --instance-ids $instanceid --region eu-west-1 """ | |
# Amazon Linux (ami-ebd02392) | |
def lambda_handler(event, context): | |
ec2 = boto3.client('ec2', region_name=region) | |
new_instance = ec2.run_instances( | |
ImageId='ami-ebd02392', | |
MinCount=1, | |
MaxCount=1, | |
KeyName='xxxxx-key', | |
InstanceType='t2.micro', | |
SecurityGroups=['default'], | |
IamInstanceProfile={'Name':'EFSBackupRole'}, | |
UserData=user_data_script) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Static variables | |
logging.getLogger('botocore').setLevel(logging.CRITICAL) | |
logging.getLogger('boto3').setLevel(logging.CRITICAL) | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(funcName)s():%(lineno)i: (%(threadName)-10s): %(levelname)s: %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S', | |
filename='debug.log', | |
filemode='w') | |
console = logging.StreamHandler() | |
console.setLevel(logging.INFO) | |
formatter = logging.Formatter("(%(threadName)-10s) %(levelname)s: %(message)s") | |
console.setFormatter(formatter) | |
logging.getLogger('').addHandler(console) | |
logger = logging.getLogger(__name__) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import logging.handlers | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.DEBUG) | |
handler = logging.handlers.SysLogHandler(address = '/dev/log') | |
formatter = logging.Formatter('%(module)s.%(funcName)s: %(message)s') | |
handler.setFormatter(formatter) | |
log.addHandler(handler) | |
def hello(): | |
log.debug('this is debug') | |
log.critical('this is critical') | |
if __name__ == '__main__': | |
hello() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
from logging.handlers import TimedRotatingFileHandler | |
FORMATTER = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
LOG_FILE = "my_app.log" | |
def get_console_handler(): | |
console_handler = logging.StreamHandler(sys.stdout) | |
console_handler.setFormatter(FORMATTER) | |
return console_handler | |
def get_file_handler(): | |
file_handler = TimedRotatingFileHandler(LOG_FILE, when='midnight') | |
file_handler.setFormatter(FORMATTER) | |
return file_handler | |
def get_logger(logger_name): | |
logger = logging.getLogger(logger_name) | |
logger.setLevel(logging.DEBUG) # better to have too much log than not enough | |
logger.addHandler(get_console_handler()) | |
logger.addHandler(get_file_handler()) | |
# with this pattern, it's rarely necessary to propagate the error up to parent | |
logger.propagate = False | |
return logger |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python - Regular Expression | |
------------------------------------------------------------------------------------------ | |
match checks for a match only at the beginning of the string, while search checks for a match anywhere in the string | |
re.I - Performs case-insensitive matching. | |
re.L - Interprets words according to the current locale. This interpretation affects the alphabetic group (\w and \W), as well as word boundary behavior (\b and \B). | |
re.M - Makes $ match the end of a line (not just the end of the string) and makes ^ match the start of any line (not just the start of the string). | |
re.S - Makes a period (dot) match any character, including a newline. | |
re.U - Interprets letters according to the Unicode character set. This flag affects the behavior of \w, \W, \b, \B. | |
re.X - Permits "cuter" regular expression syntax. It ignores whitespace (except inside a set [] or when escaped by a backslash) and treats unescaped # as a comment marker. | |
""" | |
import re | |
line = "Cats are smarter than dogs"; | |
matchObj = re.match( r'(.*) are (.*?) .*', line, re.M|re.I) | |
if matchObj: | |
print "match --> matchObj.group() : ", matchObj.group() | |
print "matchObj.group(1) : ", matchObj.group(1) | |
print "matchObj.group(2) : ", matchObj.group(2) | |
else: | |
print "No match!!" | |
searchObj = re.search( r'dogs', line, re.M|re.I) | |
if searchObj: | |
print "search --> searchObj.group() : ", searchObj.group() | |
else: | |
print "Nothing found!!" | |
matchObj.group() : Cats are smarter than dogs | |
matchObj.group(1) : Cats | |
matchObj.group(2) : smarter | |
phone = "2004-959-559 # This is Phone Number" | |
# Delete Python-style comments | |
num = re.sub(r'#.*$', "", phone) | |
print "Phone Num : ", num | |
# Remove anything other than digits | |
num = re.sub(r'\D', "", phone) | |
print "Phone Num : ", num | |
#==================== | |
#result: | |
#No match!! | |
#search --> matchObj.group() : dogs | |
#Phone Num : 2004-959-559 | |
#Phone Num : 2004959559 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function #should be on the top | |
import threading | |
import time | |
class MyThread(threading.Thread): | |
def run(self): | |
print("{} started!".format(self.getName())) # "Thread-x started!" | |
time.sleep(1) # Pretend to work for a second | |
print("{} finished!".format(self.getName())) # "Thread-x finished!" | |
if __name__ == '__main__': | |
for x in range(4): # Four times... | |
mythread = MyThread(name = "Thread-{}".format(x + 1)) # ...Instantiate a thread and pass a unique ID to it | |
mythread.start() # ...Start the thread | |
time.sleep(.9) # ...Wait 0.9 seconds before starting another | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
import multiprocessing | |
import time | |
import threading | |
def worker(q): | |
while not q.empty(): | |
try: | |
row = q.get(False) | |
print(row) | |
time.sleep(1) | |
except queue.Empty: | |
break | |
def main(): | |
print('creating queue') | |
q = multiprocessing.Queue() | |
print('enqueuing') | |
for i in range(100): | |
q.put(i) | |
num_processes = 10 | |
pool = [] | |
for i in range(num_processes): | |
print('launching process {0}'.format(i)) | |
p = multiprocessing.Process(target=worker, args=(q,)) | |
p.start() | |
pool.append(p) | |
for p in pool: | |
p.join() | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#------------------------------------------------------------------------------------------ | |
""" | |
Python Built-In Class Attributes | |
__dict__: Dictionary containing the class's namespace. | |
__doc__: Class documentation string or none, if undefined. | |
__name__: Class name. | |
__module__: Module name in which the class is defined. This attribute is "__main__" in interactive mode. | |
__bases__: A possibly empty tuple containing the base classes, in the order of their occurrence in the base class list. | |
""" | |
""" | |
------------------------------------------------------------------------------------------ | |
Python - Join | |
>>>str = "-"; | |
>>>seq = ("a", "b", "c"); # This is sequence of strings. | |
>>>print str.join( seq ); | |
a-b-c | |
>>> params = {"server":"mpilgrim", "database":"master", "uid":"sa", "pwd":"secret"} | |
>>> ["%s=%s" % (k, v) for k, v in params.items()] | |
['server=mpilgrim', 'uid=sa', 'database=master', 'pwd=secret'] | |
>>> ";".join(["%s=%s" % (k, v) for k, v in params.items()]) | |
'server=mpilgrim;uid=sa;database=master;pwd=secret' | |
""" | |
""" | |
Python Classes (definition) | |
------------------------------------------------------------------------------------------ | |
Class: A user-defined prototype for an object that defines a set of attributes that characterize any object of the class. The attributes are data members (class variables and instance variables) and methods, accessed via dot notation. | |
Class variable: A variable that is shared by all instances of a class. | |
Data member: A class variable or instance variable that holds data associated with a class and its objects. | |
Function overloading: The assignment of more than one behavior to a particular function. The operation performed varies by the types of objects or arguments involved. | |
Instance variable: A variable that is defined inside a method and belongs only to the current instance of a class. | |
Inheritance: The transfer of the characteristics of a class to other classes that are derived from it. | |
Instance: An individual object of a certain class. | |
Instantiation: The creation of an instance of a class. | |
Method : A special kind of function that is defined in a class definition. No method overloading because it's a dynamic language. But multimethod would work. | |
Object: A unique instance of a data structure that's defined by its class. | |
Operator overloading: The assignment of more than one function to an operator. | |
""" | |
Python - Slicing | |
#------------------------------------------------------------------------------------------ | |
pets = [“dog”, “cat”, “bird”, ”lizard”] | |
pets[:2] # results [“dog”,”cat”] | |
pets[1:] # results [“cat”,”bird”,”lizard”] | |
pets[::2] # results [“dog”,”bird”] | |
veg = “tomato” | |
veg[:1] # “t” | |
veg[::2] # “tmt” | |
""" | |
Python - Formatting | |
#------------------------------------------------------------------------------------------ | |
%s - String (or any object with a string representation, like numbers) | |
%d - Integers | |
%f - Floating point numbers | |
%.<# digits>f - Floating numbers with a fixed amount of digits to the right of the dot. | |
%x/%X - Integers in hex representation (lowercase/uppercase) | |
%c - ASCII code | |
%o - octal (base 8) | |
“%d” % 20 >>> result 20 | |
“%.3f %.2f” % (20, ⅓) >>> result ‘20.000 0.33’ | |
""" | |
""" | |
Python - Strings | |
#------------------------------------------------------------------------------------------ | |
""" | |
astring = "Hello world!" | |
print len(astring) # result: 12 - same command also work on list | |
print astring.index("o") # result: 4 begins with 0 | |
print astring.count("l") # result: 3 | |
print astring[3:7] # result: lo w (count start 1) | |
print astring.upper() # result upper case | |
print astring.lower() # result lower case | |
afewords = astring.split(" ") # split by space | |
print astring.startswith("Hello") # return true | |
print astring.endswith("asdfasdfasdf") # return false | |
#------------------------------------------------------------------------------------------ | |
""" | |
Python - Abstract Class | |
Yes. Python has Abstract Base Class module allows to enforce that a derived class implements a particular method using a special @abstractmethod decorator on that method. | |
""" | |
from abc import ABCMeta, abstractmethod | |
class Animal: | |
__metaclass__ = ABCMeta | |
@abstractmethod | |
def say_something(self): return "I'm an animal!" | |
class Cat(Animal): | |
def say_something(self): | |
s = super(Cat, self).say_something() | |
return "%s - %s" % (s, "Miauuu") | |
>>> c = Cat() | |
>>> c.say_something() | |
#"I'm an animal! - Miauu" | |
#------------------------------------------------------------------------------------------ | |
""" | |
Python - Generators | |
""" | |
data = 'Drapsicle' | |
print list(data[i] for i in range(len(data)-1, -1, -1)) | |
def Reverse(data): | |
for index in range(len(data)-1, -1, -1): | |
yield data[index] | |
#------------------------------------------------------------------------------------------ | |
""" | |
Python - Fibonacci function | |
""" | |
def fibonacci(max_int): | |
result = [] | |
a, b = 0, 1 | |
while (b < max_int): | |
result.append(b) | |
a, b = b, a + b | |
print (result) | |
def fib(a=1, b=1): | |
while True: | |
yield a | |
a, b = b, a + b | |
from itertools import islice | |
list(islice(fib(),10)) | |
#------------------------------------------------------------------------------------------ | |
""" | |
Python - OS libraries | |
""" | |
import sys | |
import os | |
import platform | |
import tempfile | |
print 'Number of arguments:', len(sys.argv), 'arguments.' | |
print 'Argument List:', str(sys.argv) | |
# first argument is always the script name. | |
system_type = platform.system().lower() | |
pid = os.getpid() | |
file_list = [os.path.join("/etc/",fn) for fn in next(os.walk("/etc/"))[2]] | |
dir_name = tempfile.mkdtemp(dir='/tmp') | |
temp = tempfile.NamedTemporaryFile(mode='w+t',dir=dir_name) | |
try: | |
print "temp", temp | |
print "temp name", temp.name | |
temp.write("some data\n") | |
temp.seek(0) | |
print temp.read() | |
finally: | |
temp.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dispatch_if(operator, x, y): | |
if operator == 'add': | |
return x + y | |
elif operator == 'sub': | |
return x - y | |
elif operator == 'mul': | |
return x * y | |
elif operator == 'div': | |
return x / y | |
else: | |
return None | |
def dispatch_dict(operator, x, y): | |
return { | |
'add': lambda: x + y, | |
'sub': lambda: x - y, | |
'mul': lambda: x * y, | |
'div': lambda: x / y, | |
}.get(operator, lambda: None)() | |
>>> dispatch_if('mul', 2, 8) | |
16 | |
>>> dispatch_dict('mul', 2, 8) | |
16 | |
>>> dispatch_if('unknown', 2, 8) | |
None | |
>>> dispatch_dict('unknown', 2, 8) | |
None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def send_request(method, url, headers=None, data=None): | |
global global_config_props | |
fetch = getattr(requests, method.lower()) | |
user_name = 'ec2-user' | |
user_password = global_config_props['matillion_instance']['instance_id'] | |
if headers is None: | |
headers = {'Content-Type': 'application/json'} | |
try: | |
if data is not None: | |
r = fetch(url, auth=(user_name, user_password), headers=headers, data=data, verify=False) | |
else: | |
r = fetch(url, auth=(user_name, user_password), headers=headers, verify=False) | |
return r.json() | |
except requests.exceptions.ConnectionError: | |
logger.error("Connection Error while attempting {}".format(url)) | |
return None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-1", 2,)) | |
thread.start() | |
threads.append(thread) | |
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-2", 3,)) | |
thread.start() | |
threads.append(thread) | |
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-3", 4,)) | |
thread.start() | |
threads.append(thread) | |
for thread in threads: | |
if thread.isAlive(): | |
logger.info("{} {}".format(thread.name, thread.isAlive())) | |
else: | |
logger.info("stopped {} {}".format(thread.name, thread.isAlive)) | |
time.sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment