Skip to content

Instantly share code, notes, and snippets.

@gwsu2008
Last active May 23, 2020 16:36
Show Gist options
  • Save gwsu2008/1c091ffe3399be21ae28af9460da37f1 to your computer and use it in GitHub Desktop.
Save gwsu2008/1c091ffe3399be21ae28af9460da37f1 to your computer and use it in GitHub Desktop.
Python Scripts
"""
Useful Python scripts and tips
"""
class Node(object):
def __init__(self):
self.left = None
self.right = None
self.value = None
@property
def get_value(self):
return self.value
@property
def get_left(self):
return self.left
@property
def get_right(self):
return self.right
@get_left.setter
def set_left(self, left_node):
self.left = left_node
@get_value.setter
def set_value(self, value):
self.value = value
@get_right.setter
def set_right(self, right_node):
self.right = right_node
def create_tree(self):
_node = Node() #creating new node.
_x = input("Enter the node data(-1 for null)")
if(_x == str(-1)): #for defining no child.
return None
_node.set_value = _x #setting the value of the node.
print("Enter the left child of {}".format(_x))
_node.set_left = self.create_tree() #setting the left subtree.
print("Enter the right child of {}".format(_x))
_node.set_right = self.create_tree() #setting the right subtree.
return _node
if __name__ == '__main__':
node = Node()
root_node = node.create_tree()
print(root_node.get_value)
from datetime import datetime
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from botocore.signers import CloudFrontSigner
def rsa_signer(message):
with open('path/to/key.pem', 'rb') as key_file:
private_key = serialization.load_pem_private_key(key_file.read(), password=None, backend=default_backend())
return private_key.sign(message, padding.PKCS1v15(), hashes.SHA1())
key_id = 'xyx'
url = 'http://d2949o5mkkp72v.cloudfront.net/hello.txt'
expire_date = datetime.datetime(2017, 1, 1)
cloudfront_signer = CloudFrontSigner(key_id, rsa_signer)
# Create a signed url that will be valid until the specific expiry date provided using a canned policy.
signed_url = cloudfront_signer.generate_presigned_url(url, date_less_than=expire_date)
print(signed_url)
import logging
import inspect
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class MyLogger(metaclass=Singleton):
logger = None
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(threadName)s - %(message)s",
handlers=[
logging.StreamHandler()
])
self.logger = logging.getLogger(__name__ + '.logger')
@staticmethod
def __get_call_info():
stack = inspect.stack()
# stack[1] gives previous function ('info' in our case)
# stack[2] gives before previous function and so on
fn = stack[2][1]
ln = stack[2][2]
func = stack[2][3]
return fn, func, ln
def info(self, message, *args):
message = "{} - {} at line {}: {}".format(*self.__get_call_info(), message)
self.logger.info(message, *args)
import urllib.request as ul
import argparse
def getCode(url):
raw = ul.urlopen(url).read()
code = raw.decode()
sys.stdout.write(code)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Hostname")
parser.add_argument("--url",action="store",dest="url",required=True)
args = parser.parse_args()
url = args.url
getCode(url)
"""
Simple iOS tests, showing accessing elements and getting/setting text from them.
"""
import unittest
import os
from random import randint
from appium import webdriver
from time import sleep
class SimpleIOSTests(unittest.TestCase):
def setUp(self):
# set up appium
app = os.path.join(os.path.dirname(__file__),
'/Users/gsu/Downloads',
'proteus-mobile-app.app')
app = os.path.abspath(app)
self.driver = webdriver.Remote(
command_executor='http://127.0.0.1:4723/wd/hub',
desired_capabilities={
'app': app,
'platformName': 'iOS',
'platformVersion': '9.3',
'deviceName': 'iPhone 6'
})
def tearDown(self):
self.driver.quit()
def _populate(self):
# populate text fields with two random numbers
els = [self.driver.find_element_by_accessibility_id('Sign In'),
self.driver.find_element_by_accessibility_id('Register')]
screenshot_folder = os.getenv('SCREENSHOT_PATH', '')
self.driver.save_screenshot(screenshot_folder + "/screenshot_main.png")
self.assertTrue(1, "This is just a test")
def test_ui_computation(self):
# populate text fields with values
self._populate()
# trigger computation by using the button
self.driver.find_element_by_accessibility_id('Register').click()
# is sum equal ?
# sauce does not handle class name, so get fourth element
sum = self.driver.find_element_by_name('Terms & Conditions').text
screenshot_folder = os.getenv('SCREENSHOT_PATH', '')
self.driver.save_screenshot(screenshot_folder + "/screenshot_register.png")
self.assertEqual(1, 1)
@unittest.skip("no scroll")
def test_scroll(self):
els = self.driver.find_elements_by_class_name('UIAButton')
els[5].click()
sleep(1)
try:
el = self.driver.find_element_by_accessibility_id('OK')
el.click()
sleep(1)
except:
pass
el = self.driver.find_element_by_xpath('//UIAMapView[1]')
location = el.location
self.driver.swipe(start_x=location['x'], start_y=location['y'], end_x=0.5, end_y=location['y'], duration=800)
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(SimpleIOSTests)
unittest.TextTestRunner(verbosity=2).run(suite)
"""
To debug the program, send the SIGUSR2 to the PID. You can find the PID by using 'ps aux | grep python'
kill -SIGUSR2 {PID}
"""
import signal
import pdb
def open_debug(sig, frame):
print 'sig', sig
pdb.set_trace()
signal.signal(signal.SIGUSR2, open_debug)
# a long-running example
import time
while True:
print("sleep now")
time.sleep(5)
import boto3
# The calls to AWS STS AssumeRole must be signed with the access key ID
# and secret access key of an existing IAM user or by using existing temporary
# credentials such as those from antoher role. (You cannot call AssumeRole
# with the access key for the root account.) The credentials can be in
# environment variables or in a configuration file and will be discovered
# automatically by the boto3.client() function. For more information, see the
# Python SDK documentation:
# http://boto3.readthedocs.io/en/latest/reference/services/sts.html#client
# create an STS client object that represents a live connection to the
# STS service
sts_client = boto3.client('sts')
# Call the assume_role method of the STSConnection object and pass the role
# ARN and a role session name.
assumedRoleObject = sts_client.assume_role(
RoleArn="arn:aws:iam::account-of-role-to-assume:role/name-of-role",
RoleSessionName="AssumeRoleSession1"
)
# From the response that contains the assumed role, get the temporary
# credentials that can be used to make subsequent API calls
credentials = assumedRoleObject['Credentials']
# Use the temporary credentials that AssumeRole returns to make a
# connection to Amazon S3
s3_resource = boto3.resource(
's3',
aws_access_key_id = credentials['AccessKeyId'],
aws_secret_access_key = credentials['SecretAccessKey'],
aws_session_token = credentials['SessionToken'],
)
# Use the Amazon S3 resource object that is now configured with the
# credentials to access your S3 buckets.
for bucket in s3_resource.buckets.all():
print(bucket.name)
import boto3
import time
region = 'eu-west-1'
user_data_script = """#!/bin/bash
instanceid=$(curl http://169.254.169.254/latest/meta-data/instance-id)
cd /
mkdir moodledata
mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-xxxxxxxxxxc.efs.eu-west-1.amazonaws.com:/ moodledata
tar czf mooodledata-backup-$(date +%d-%m-%Y_%H-%M).tar.gz /moodledata
aws s3 mv mooodledata-backup-*.tar.gz s3://xxxxxxxxx/
aws ec2 terminate-instances --instance-ids $instanceid --region eu-west-1 """
# Amazon Linux (ami-ebd02392)
def lambda_handler(event, context):
ec2 = boto3.client('ec2', region_name=region)
new_instance = ec2.run_instances(
ImageId='ami-ebd02392',
MinCount=1,
MaxCount=1,
KeyName='xxxxx-key',
InstanceType='t2.micro',
SecurityGroups=['default'],
IamInstanceProfile={'Name':'EFSBackupRole'},
UserData=user_data_script)
# Static variables
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('boto3').setLevel(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(funcName)s():%(lineno)i: (%(threadName)-10s): %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='debug.log',
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("(%(threadName)-10s) %(levelname)s: %(message)s")
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
logger = logging.getLogger(__name__)
import logging
import logging.handlers
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
handler = logging.handlers.SysLogHandler(address = '/dev/log')
formatter = logging.Formatter('%(module)s.%(funcName)s: %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
def hello():
log.debug('this is debug')
log.critical('this is critical')
if __name__ == '__main__':
hello()
import logging
import sys
from logging.handlers import TimedRotatingFileHandler
FORMATTER = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
LOG_FILE = "my_app.log"
def get_console_handler():
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(FORMATTER)
return console_handler
def get_file_handler():
file_handler = TimedRotatingFileHandler(LOG_FILE, when='midnight')
file_handler.setFormatter(FORMATTER)
return file_handler
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG) # better to have too much log than not enough
logger.addHandler(get_console_handler())
logger.addHandler(get_file_handler())
# with this pattern, it's rarely necessary to propagate the error up to parent
logger.propagate = False
return logger
"""
Python - Regular Expression
------------------------------------------------------------------------------------------
match checks for a match only at the beginning of the string, while search checks for a match anywhere in the string
re.I - Performs case-insensitive matching.
re.L - Interprets words according to the current locale. This interpretation affects the alphabetic group (\w and \W), as well as word boundary behavior (\b and \B).
re.M - Makes $ match the end of a line (not just the end of the string) and makes ^ match the start of any line (not just the start of the string).
re.S - Makes a period (dot) match any character, including a newline.
re.U - Interprets letters according to the Unicode character set. This flag affects the behavior of \w, \W, \b, \B.
re.X - Permits "cuter" regular expression syntax. It ignores whitespace (except inside a set [] or when escaped by a backslash) and treats unescaped # as a comment marker.
"""
import re
line = "Cats are smarter than dogs";
matchObj = re.match( r'(.*) are (.*?) .*', line, re.M|re.I)
if matchObj:
print "match --> matchObj.group() : ", matchObj.group()
print "matchObj.group(1) : ", matchObj.group(1)
print "matchObj.group(2) : ", matchObj.group(2)
else:
print "No match!!"
searchObj = re.search( r'dogs', line, re.M|re.I)
if searchObj:
print "search --> searchObj.group() : ", searchObj.group()
else:
print "Nothing found!!"
matchObj.group() : Cats are smarter than dogs
matchObj.group(1) : Cats
matchObj.group(2) : smarter
phone = "2004-959-559 # This is Phone Number"
# Delete Python-style comments
num = re.sub(r'#.*$', "", phone)
print "Phone Num : ", num
# Remove anything other than digits
num = re.sub(r'\D', "", phone)
print "Phone Num : ", num
#====================
#result:
#No match!!
#search --> matchObj.group() : dogs
#Phone Num : 2004-959-559
#Phone Num : 2004959559
from __future__ import print_function #should be on the top
import threading
import time
class MyThread(threading.Thread):
def run(self):
print("{} started!".format(self.getName())) # "Thread-x started!"
time.sleep(1) # Pretend to work for a second
print("{} finished!".format(self.getName())) # "Thread-x finished!"
if __name__ == '__main__':
for x in range(4): # Four times...
mythread = MyThread(name = "Thread-{}".format(x + 1)) # ...Instantiate a thread and pass a unique ID to it
mythread.start() # ...Start the thread
time.sleep(.9) # ...Wait 0.9 seconds before starting another
import queue
import multiprocessing
import time
import threading
def worker(q):
while not q.empty():
try:
row = q.get(False)
print(row)
time.sleep(1)
except queue.Empty:
break
def main():
print('creating queue')
q = multiprocessing.Queue()
print('enqueuing')
for i in range(100):
q.put(i)
num_processes = 10
pool = []
for i in range(num_processes):
print('launching process {0}'.format(i))
p = multiprocessing.Process(target=worker, args=(q,))
p.start()
pool.append(p)
for p in pool:
p.join()
if __name__ == '__main__':
main()
#------------------------------------------------------------------------------------------
"""
Python Built-In Class Attributes
__dict__: Dictionary containing the class's namespace.
__doc__: Class documentation string or none, if undefined.
__name__: Class name.
__module__: Module name in which the class is defined. This attribute is "__main__" in interactive mode.
__bases__: A possibly empty tuple containing the base classes, in the order of their occurrence in the base class list.
"""
"""
------------------------------------------------------------------------------------------
Python - Join
>>>str = "-";
>>>seq = ("a", "b", "c"); # This is sequence of strings.
>>>print str.join( seq );
a-b-c
>>> params = {"server":"mpilgrim", "database":"master", "uid":"sa", "pwd":"secret"}
>>> ["%s=%s" % (k, v) for k, v in params.items()]
['server=mpilgrim', 'uid=sa', 'database=master', 'pwd=secret']
>>> ";".join(["%s=%s" % (k, v) for k, v in params.items()])
'server=mpilgrim;uid=sa;database=master;pwd=secret'
"""
"""
Python Classes (definition)
------------------------------------------------------------------------------------------
Class: A user-defined prototype for an object that defines a set of attributes that characterize any object of the class. The attributes are data members (class variables and instance variables) and methods, accessed via dot notation.
Class variable: A variable that is shared by all instances of a class.
Data member: A class variable or instance variable that holds data associated with a class and its objects.
Function overloading: The assignment of more than one behavior to a particular function. The operation performed varies by the types of objects or arguments involved.
Instance variable: A variable that is defined inside a method and belongs only to the current instance of a class.
Inheritance: The transfer of the characteristics of a class to other classes that are derived from it.
Instance: An individual object of a certain class.
Instantiation: The creation of an instance of a class.
Method : A special kind of function that is defined in a class definition. No method overloading because it's a dynamic language. But multimethod would work.
Object: A unique instance of a data structure that's defined by its class.
Operator overloading: The assignment of more than one function to an operator.
"""
Python - Slicing
#------------------------------------------------------------------------------------------
pets = [“dog”, “cat”, “bird”, ”lizard”]
pets[:2] # results [“dog”,”cat”]
pets[1:] # results [“cat”,”bird”,”lizard”]
pets[::2] # results [“dog”,”bird”]
veg = “tomato”
veg[:1] # “t”
veg[::2] # “tmt”
"""
Python - Formatting
#------------------------------------------------------------------------------------------
%s - String (or any object with a string representation, like numbers)
%d - Integers
%f - Floating point numbers
%.<# digits>f - Floating numbers with a fixed amount of digits to the right of the dot.
%x/%X - Integers in hex representation (lowercase/uppercase)
%c - ASCII code
%o - octal (base 8)
“%d” % 20 >>> result 20
“%.3f %.2f” % (20, ⅓) >>> result ‘20.000 0.33’
"""
"""
Python - Strings
#------------------------------------------------------------------------------------------
"""
astring = "Hello world!"
print len(astring) # result: 12 - same command also work on list
print astring.index("o") # result: 4 begins with 0
print astring.count("l") # result: 3
print astring[3:7] # result: lo w (count start 1)
print astring.upper() # result upper case
print astring.lower() # result lower case
afewords = astring.split(" ") # split by space
print astring.startswith("Hello") # return true
print astring.endswith("asdfasdfasdf") # return false
#------------------------------------------------------------------------------------------
"""
Python - Abstract Class
Yes. Python has Abstract Base Class module allows to enforce that a derived class implements a particular method using a special @abstractmethod decorator on that method.
"""
from abc import ABCMeta, abstractmethod
class Animal:
__metaclass__ = ABCMeta
@abstractmethod
def say_something(self): return "I'm an animal!"
class Cat(Animal):
def say_something(self):
s = super(Cat, self).say_something()
return "%s - %s" % (s, "Miauuu")
>>> c = Cat()
>>> c.say_something()
#"I'm an animal! - Miauu"
#------------------------------------------------------------------------------------------
"""
Python - Generators
"""
data = 'Drapsicle'
print list(data[i] for i in range(len(data)-1, -1, -1))
def Reverse(data):
for index in range(len(data)-1, -1, -1):
yield data[index]
#------------------------------------------------------------------------------------------
"""
Python - Fibonacci function
"""
def fibonacci(max_int):
result = []
a, b = 0, 1
while (b < max_int):
result.append(b)
a, b = b, a + b
print (result)
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
from itertools import islice
list(islice(fib(),10))
#------------------------------------------------------------------------------------------
"""
Python - OS libraries
"""
import sys
import os
import platform
import tempfile
print 'Number of arguments:', len(sys.argv), 'arguments.'
print 'Argument List:', str(sys.argv)
# first argument is always the script name.
system_type = platform.system().lower()
pid = os.getpid()
file_list = [os.path.join("/etc/",fn) for fn in next(os.walk("/etc/"))[2]]
dir_name = tempfile.mkdtemp(dir='/tmp')
temp = tempfile.NamedTemporaryFile(mode='w+t',dir=dir_name)
try:
print "temp", temp
print "temp name", temp.name
temp.write("some data\n")
temp.seek(0)
print temp.read()
finally:
temp.close()
def dispatch_if(operator, x, y):
if operator == 'add':
return x + y
elif operator == 'sub':
return x - y
elif operator == 'mul':
return x * y
elif operator == 'div':
return x / y
else:
return None
def dispatch_dict(operator, x, y):
return {
'add': lambda: x + y,
'sub': lambda: x - y,
'mul': lambda: x * y,
'div': lambda: x / y,
}.get(operator, lambda: None)()
>>> dispatch_if('mul', 2, 8)
16
>>> dispatch_dict('mul', 2, 8)
16
>>> dispatch_if('unknown', 2, 8)
None
>>> dispatch_dict('unknown', 2, 8)
None
def send_request(method, url, headers=None, data=None):
global global_config_props
fetch = getattr(requests, method.lower())
user_name = 'ec2-user'
user_password = global_config_props['matillion_instance']['instance_id']
if headers is None:
headers = {'Content-Type': 'application/json'}
try:
if data is not None:
r = fetch(url, auth=(user_name, user_password), headers=headers, data=data, verify=False)
else:
r = fetch(url, auth=(user_name, user_password), headers=headers, verify=False)
return r.json()
except requests.exceptions.ConnectionError:
logger.error("Connection Error while attempting {}".format(url))
return None
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-1", 2,))
thread.start()
threads.append(thread)
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-2", 3,))
thread.start()
threads.append(thread)
thread = threading.Thread(name='do_now', target=do_now, args=("Thread-3", 4,))
thread.start()
threads.append(thread)
for thread in threads:
if thread.isAlive():
logger.info("{} {}".format(thread.name, thread.isAlive()))
else:
logger.info("stopped {} {}".format(thread.name, thread.isAlive))
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment