Last active
August 10, 2018 06:43
-
-
Save hylowaker/fe7abc2862b4f803b3409ab45a68af4a to your computer and use it in GitHub Desktop.
쓸데없는 함수조각
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
def combinations_simple(n: int, r: int): | |
assert n >= r >= 0 | |
numer = denom = 1 | |
for i, j in zip(range(n, n-r, -1), range(r, 0, -1)): | |
numer *= i | |
denom *= j | |
return numer//denom | |
## numpy.argmax | |
def argmax_simple(iterable): | |
return max(enumerate(iterable), key=lambda x: x[1])[0] | |
## | |
def rank_simple(vector): | |
return sorted(range(len(vector)), key=vector.__getitem__) | |
# scipy.stats.rankdata | |
def rankdata(vector, method='average'): | |
n = len(vector) | |
ivec = rank_simple(vector) | |
svec = [vector[rank] for rank in ivec] | |
sumranks = 0 | |
dupcount = 0 | |
newarray = [0]*n | |
for i in range(n): | |
sumranks += i | |
dupcount += 1 | |
if i == n-1 or svec[i] != svec[i+1]: | |
for j in range(i-dupcount+1, i+1): | |
if method == 'average': | |
averank = sumranks/dupcount + 1 | |
newarray[ivec[j]] = averank | |
elif method == 'max': | |
newarray[ivec[j]] = i + 1 | |
elif method == 'min': | |
newarray[ivec[j]] = i + 1 - dupcount + 1 | |
else: | |
raise NameError('Unsupported method') | |
sumranks = 0 | |
dupcount = 0 | |
return newarray | |
## numpy.ndarray.flatten | |
def flatten_nested_list(items, inplace=False, seqtypes=(list, tuple)): | |
if not inplace: | |
from copy import deepcopy | |
items = items.deepcopy() | |
for i in range(len(items)): | |
while i < len(items) and isinstance(items[i], seqtypes): | |
items[i:i+1] = items[i] | |
if not inplace: | |
return items | |
else: | |
return None | |
## | |
def is_all_elements_identical(iterable): | |
first_elem = iterable[0] | |
return any(x != first_elem for x in iterable) | |
## | |
def chain_iterables(*iterables): | |
for it in iterables: | |
yield from it | |
## | |
def findall_overlap(text: str, sub: str): | |
i = -1 | |
while True: | |
i = text.find(sub, i+1) | |
if i == -1: break | |
yield i | |
# | |
def count_overlap_simple(text: str, sub: str): | |
return sum(1 for i in range(len(text)) if text.startswith(sub, i)) | |
# can also use regular expression... | |
## | |
def reverse_complement(seq: str): | |
assert set(ch for ch in seq) == {"A" ,"T", "G", "C"} | |
return seq.translate(str.maketrans("ATGC", "TACG"))[::-1] | |
## | |
def read_forward(handle): | |
"""Read through whitespaces, return the first non-whitespace line.""" | |
while True: | |
line = handle.readline() | |
# if line is empty or line has characters and stripping does not remove | |
# them, return the line | |
if (not line) or (line and line.strip()): | |
return line | |
## | |
def read_fasta(handle): | |
label = None | |
seq_frags = [] | |
for line in handle: | |
if line.isspace() or line.startswith(';'): | |
continue | |
if line.startswith('>'): | |
if label is not None: | |
yield (label, ''.join(seq_frags)) | |
label = line[1:].split(maxsplit=1)[0] # description is ignored | |
seq_frags = [] | |
else: | |
seq_frags.append(line.strip()) | |
if label: | |
yield (label, ''.join(seq_frags)) | |
return | |
## hierarchial clustering to nwk tree | |
from scipy.cluster import hierarchy | |
def getNewick(node, newick, parentdist, leaf_names): | |
if node.is_leaf(): | |
return "%s:%.2f%s" % (leaf_names[node.id], parentdist - node.dist, newick) | |
else: | |
if len(newick) > 0: | |
newick = "):%.2f%s" % (parentdist - node.dist, newick) | |
else: | |
newick = ");" | |
newick = getNewick(node.get_left(), newick, node.dist, leaf_names) | |
newick = getNewick(node.get_right(), ",%s" % (newick), node.dist, leaf_names) | |
newick = "(%s" % (newick) | |
return newick | |
tree = hierarchy.to_tree(Z,False) | |
getNewick(tree, "", tree.dist, leaf_names) | |
## Read .csv file from URL into Python 3.x | |
# csv.Error: iterator should return strings, not bytes (did you open the file in text mode?) | |
import csv | |
import urllib.request | |
import codecs | |
url = "ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/file_list.csv" | |
ftpstream = urllib.request.urlopen(url) | |
csvfile = csv.reader(codecs.iterdecode(ftpstream, 'utf-8')) | |
for line in csvfile: | |
print(line) # do something with line | |
## Spinner (Loading indicator) | |
import sys | |
import time | |
import threading | |
import signal | |
class Spinner: | |
busy = False | |
delay = 0.1 | |
@staticmethod | |
def spinning_cursor(): | |
while True: | |
for cursor in '|/-\\': | |
yield cursor | |
def __init__(self, delay=None): | |
self.spinner_generator = self.spinning_cursor() | |
if delay and float(delay): | |
self.delay = delay | |
def _handle_keyboard_interrupt(signal, frame): | |
self.stop() | |
raise KeyboardInterrupt | |
signal.signal(signal.SIGINT, _handle_keyboard_interrupt) | |
def spinner_task(self): | |
while self.busy: | |
sys.stdout.write(next(self.spinner_generator)) | |
sys.stdout.flush() | |
time.sleep(self.delay) | |
sys.stdout.write('\b') | |
sys.stdout.flush() | |
def start(self): | |
self.busy = True | |
threading.Thread(target=self.spinner_task).start() | |
def stop(self): | |
self.busy = False | |
time.sleep(self.delay) | |
spinner = Spinner() | |
spinner.start() | |
# ... some long-running operations | |
# time.sleep(3) | |
spinner.stop() | |
#### lazy import | |
import sys | |
import importlib.util | |
def lazy(fullname): | |
try: | |
return sys.modules[fullname] | |
except KeyError: | |
spec = importlib.util.find_spec(fullname) | |
module = importlib.util.module_from_spec(spec) | |
loader = importlib.util.LazyLoader(spec.loader) | |
# Make module with proper locking and get it inserted into sys.modules. | |
loader.exec_module(module) | |
return module | |
os = lazy("os") | |
myown = lazy("myown") | |
print(os.name) | |
myown.test() | |
#### multi subprocess without multiprocess/thread module #### | |
from subprocess import Popen | |
from itertools import islice | |
max_workers = 2 # no more than 2 concurrent processes | |
processes = (Popen(cmd, shell=True) for cmd in commands) | |
running_processes = list(islice(processes, max_workers)) # start new processes | |
while running_processes: | |
for i, process in enumerate(running_processes): | |
if process.poll() is not None: # the process has finished | |
running_processes[i] = next(processes, None) # start new process | |
if running_processes[i] is None: # no new processes | |
del running_processes[i] | |
break | |
pass | |
## Realtime subprocess output | |
def run_command(command): | |
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE) | |
while True: | |
output = process.stdout.readline() | |
if output == '' and process.poll() is not None: | |
break | |
if output: | |
print output.strip() | |
rc = process.poll() | |
return rc | |
#### Pool: map_async().get() vs imap() vs imap_unordered() #### | |
from multiprocessing import Pool | |
import time | |
def foo(x): | |
time.sleep(x) | |
return x + 2 | |
start = time.time() | |
for x in Pool().imap(foo, [1,5,3]): | |
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start))) | |
pass | |
## concurrent basic example | |
from concurrent.futures import ProcessPoolExecutor | |
from time import sleep | |
def return_after_5_secs(message): | |
sleep(5) | |
return message | |
if __name__ == '__main__' : | |
pool = ProcessPoolExecutor(3) | |
future = pool.submit(return_after_5_secs, ("hello")) | |
print(future.done()) | |
sleep(5) | |
print(future.done()) | |
print("Result: " + future.result()) | |
# concurrent.futures: Exit immediately after KeyboardInterrupt | |
import time | |
import concurrent.futures.thread | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
with ThreadPoolExecutor(1) as executor: | |
fs = [executor.submit(time.sleep, 10)] | |
try: | |
for f in as_completed(fs): | |
f.result() | |
except KeyboardInterrupt: | |
executor._threads.clear() | |
concurrent.futures.thread._threads_queues.clear() | |
# bar characters | |
'░▒▓█' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment