Skip to content

Instantly share code, notes, and snippets.

@hylowaker
Last active August 10, 2018 06:43
Show Gist options
  • Save hylowaker/fe7abc2862b4f803b3409ab45a68af4a to your computer and use it in GitHub Desktop.
Save hylowaker/fe7abc2862b4f803b3409ab45a68af4a to your computer and use it in GitHub Desktop.
쓸데없는 함수조각
##
def combinations_simple(n: int, r: int):
assert n >= r >= 0
numer = denom = 1
for i, j in zip(range(n, n-r, -1), range(r, 0, -1)):
numer *= i
denom *= j
return numer//denom
## numpy.argmax
def argmax_simple(iterable):
return max(enumerate(iterable), key=lambda x: x[1])[0]
##
def rank_simple(vector):
return sorted(range(len(vector)), key=vector.__getitem__)
# scipy.stats.rankdata
def rankdata(vector, method='average'):
n = len(vector)
ivec = rank_simple(vector)
svec = [vector[rank] for rank in ivec]
sumranks = 0
dupcount = 0
newarray = [0]*n
for i in range(n):
sumranks += i
dupcount += 1
if i == n-1 or svec[i] != svec[i+1]:
for j in range(i-dupcount+1, i+1):
if method == 'average':
averank = sumranks/dupcount + 1
newarray[ivec[j]] = averank
elif method == 'max':
newarray[ivec[j]] = i + 1
elif method == 'min':
newarray[ivec[j]] = i + 1 - dupcount + 1
else:
raise NameError('Unsupported method')
sumranks = 0
dupcount = 0
return newarray
## numpy.ndarray.flatten
def flatten_nested_list(items, inplace=False, seqtypes=(list, tuple)):
if not inplace:
from copy import deepcopy
items = items.deepcopy()
for i in range(len(items)):
while i < len(items) and isinstance(items[i], seqtypes):
items[i:i+1] = items[i]
if not inplace:
return items
else:
return None
##
def is_all_elements_identical(iterable):
first_elem = iterable[0]
return any(x != first_elem for x in iterable)
##
def chain_iterables(*iterables):
for it in iterables:
yield from it
##
def findall_overlap(text: str, sub: str):
i = -1
while True:
i = text.find(sub, i+1)
if i == -1: break
yield i
#
def count_overlap_simple(text: str, sub: str):
return sum(1 for i in range(len(text)) if text.startswith(sub, i))
# can also use regular expression...
##
def reverse_complement(seq: str):
assert set(ch for ch in seq) == {"A" ,"T", "G", "C"}
return seq.translate(str.maketrans("ATGC", "TACG"))[::-1]
##
def read_forward(handle):
"""Read through whitespaces, return the first non-whitespace line."""
while True:
line = handle.readline()
# if line is empty or line has characters and stripping does not remove
# them, return the line
if (not line) or (line and line.strip()):
return line
##
def read_fasta(handle):
label = None
seq_frags = []
for line in handle:
if line.isspace() or line.startswith(';'):
continue
if line.startswith('>'):
if label is not None:
yield (label, ''.join(seq_frags))
label = line[1:].split(maxsplit=1)[0] # description is ignored
seq_frags = []
else:
seq_frags.append(line.strip())
if label:
yield (label, ''.join(seq_frags))
return
## hierarchial clustering to nwk tree
from scipy.cluster import hierarchy
def getNewick(node, newick, parentdist, leaf_names):
if node.is_leaf():
return "%s:%.2f%s" % (leaf_names[node.id], parentdist - node.dist, newick)
else:
if len(newick) > 0:
newick = "):%.2f%s" % (parentdist - node.dist, newick)
else:
newick = ");"
newick = getNewick(node.get_left(), newick, node.dist, leaf_names)
newick = getNewick(node.get_right(), ",%s" % (newick), node.dist, leaf_names)
newick = "(%s" % (newick)
return newick
tree = hierarchy.to_tree(Z,False)
getNewick(tree, "", tree.dist, leaf_names)
## Read .csv file from URL into Python 3.x
# csv.Error: iterator should return strings, not bytes (did you open the file in text mode?)
import csv
import urllib.request
import codecs
url = "ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/file_list.csv"
ftpstream = urllib.request.urlopen(url)
csvfile = csv.reader(codecs.iterdecode(ftpstream, 'utf-8'))
for line in csvfile:
print(line) # do something with line
## Spinner (Loading indicator)
import sys
import time
import threading
import signal
class Spinner:
busy = False
delay = 0.1
@staticmethod
def spinning_cursor():
while True:
for cursor in '|/-\\':
yield cursor
def __init__(self, delay=None):
self.spinner_generator = self.spinning_cursor()
if delay and float(delay):
self.delay = delay
def _handle_keyboard_interrupt(signal, frame):
self.stop()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, _handle_keyboard_interrupt)
def spinner_task(self):
while self.busy:
sys.stdout.write(next(self.spinner_generator))
sys.stdout.flush()
time.sleep(self.delay)
sys.stdout.write('\b')
sys.stdout.flush()
def start(self):
self.busy = True
threading.Thread(target=self.spinner_task).start()
def stop(self):
self.busy = False
time.sleep(self.delay)
spinner = Spinner()
spinner.start()
# ... some long-running operations
# time.sleep(3)
spinner.stop()
#### lazy import
import sys
import importlib.util
def lazy(fullname):
try:
return sys.modules[fullname]
except KeyError:
spec = importlib.util.find_spec(fullname)
module = importlib.util.module_from_spec(spec)
loader = importlib.util.LazyLoader(spec.loader)
# Make module with proper locking and get it inserted into sys.modules.
loader.exec_module(module)
return module
os = lazy("os")
myown = lazy("myown")
print(os.name)
myown.test()
#### multi subprocess without multiprocess/thread module ####
from subprocess import Popen
from itertools import islice
max_workers = 2 # no more than 2 concurrent processes
processes = (Popen(cmd, shell=True) for cmd in commands)
running_processes = list(islice(processes, max_workers)) # start new processes
while running_processes:
for i, process in enumerate(running_processes):
if process.poll() is not None: # the process has finished
running_processes[i] = next(processes, None) # start new process
if running_processes[i] is None: # no new processes
del running_processes[i]
break
pass
## Realtime subprocess output
def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print output.strip()
rc = process.poll()
return rc
#### Pool: map_async().get() vs imap() vs imap_unordered() ####
from multiprocessing import Pool
import time
def foo(x):
time.sleep(x)
return x + 2
start = time.time()
for x in Pool().imap(foo, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
pass
## concurrent basic example
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
if __name__ == '__main__' :
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())
# concurrent.futures: Exit immediately after KeyboardInterrupt
import time
import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(1) as executor:
fs = [executor.submit(time.sleep, 10)]
try:
for f in as_completed(fs):
f.result()
except KeyboardInterrupt:
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
# bar characters
'░▒▓█'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment