Last active
March 17, 2020 15:47
-
-
Save bijoythomas/1d1544e01b1703c07709523e47fcd30b to your computer and use it in GitHub Desktop.
Ramda like functions for Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from fn import F, recur | |
from datetime import datetime | |
from funcy import mapcat, walk_values, merge as funcymerge, group_by | |
# basic combinators | |
I = F(lambda x: x) # identity | |
K = F(lambda x: F(lambda y: x)) # constant | |
T = F(lambda x: F(lambda f: f(x))) # thrush | |
A = F(lambda f: F(lambda x: f(x))) # apply | |
S = F(lambda f: F(lambda g: F(lambda x: f(x)(g(x))))) # S combinator | |
def _partition_(pred, seq): | |
satisfy = [] | |
remaining = [] | |
for elem in seq: | |
if pred(elem): | |
satisfy.append(elem) | |
else: | |
remaining.append(elem) | |
return satisfy, remaining | |
append = F(lambda seq: F(lambda elem: concat(seq)([elem]))) | |
partition = F(lambda pred: F(lambda seq: _partition_(pred, seq))) | |
complement = F(lambda fn: F(lambda x: not fn(x))) | |
is_empty = F(lambda seq: len(seq) == 0) | |
is_not_empty = complement(is_empty) | |
head = F(lambda seq: seq[0] if len(seq) > 0 else None) | |
tail = F(lambda seq: seq[1:]) | |
init = F(lambda seq: seq[:-1]) | |
last = F(lambda seq: seq[-1]) | |
keys = F(lambda obj: obj.keys()) | |
values = F(lambda obj: obj.values()) | |
pick = F(lambda ks: F(lambda obj: {k: v for k, v in obj.items() if k in ks})) | |
omit = F(lambda ks: F(lambda obj: {k: v for k, v in obj.items() if k not in ks})) | |
get = F(lambda key: F(lambda obj: obj[key])) | |
get_or = F(lambda default: F(lambda key: F(lambda obj: obj.get(key, default)))) | |
chain = F(lambda fn: F(mapcat, fn)) | |
map = F(lambda fn: | |
F(lambda seq: foldl(lambda acc: lambda v: append(acc)(fn(v)))([])(seq) | |
) | |
) | |
filter = F(lambda native_filter: F(lambda pred: F(native_filter, pred)))(filter) | |
reject = F(lambda pred: F(lambda seq: filter(complement(pred))(seq))) | |
map_values = F(lambda fn: F(walk_values, fn)) | |
groupby = F(lambda fn: F(group_by, fn)) | |
merge = F(lambda obj: F(funcymerge, obj)) | |
merge_all = F(lambda seq: foldl(merge)({})(seq)) | |
inc = F(lambda n: n + 1) | |
dict_of = F(lambda key: F(lambda value: {key: value})) | |
is_odd = F(lambda n: n % 2 != 0) | |
to_lower = F(lambda string: string.lower()) | |
to_upper = F(lambda string: string.upper()) | |
equals = F(lambda a: F(lambda b: a == b)) | |
equals_by = F(lambda fn: F(lambda a: F(lambda b: fn(a) == fn(b)))) | |
zip = F(lambda native_zip: F(lambda seq1: F(native_zip, seq1)))(zip) | |
split = F(lambda sep: F(lambda s: s.split(sep))) | |
trim = F(lambda s: s.strip()) | |
def __reduce_left__(fn, initial, seq): | |
tmp = initial | |
for s in seq: | |
tmp = fn(tmp)(s) | |
return tmp | |
def __fold_left__(fn, initial, seq): | |
return __reduce_left__(fn, initial, seq) | |
# the below code is logically correct but can cause stack overflows | |
# because of recursion | |
# if is_empty(seq): | |
# return initial | |
# | |
# return fn(__fold_left__(fn, initial, seq[:-1])) (seq[-1]) | |
def __fold_right__(fn, initial, seq): | |
if is_empty(seq): | |
return initial | |
return fn(seq[0])(__fold_right__(fn, initial, seq[1:])) | |
f_reverse = F(lambda seq: seq[::-1]) | |
foldl = F(lambda fn: F(lambda initial: F(lambda seq: __fold_left__(fn, initial, seq)))) | |
foldr = F(lambda fn: F(lambda initial: F(lambda seq: __fold_right__(fn, initial, seq)))) | |
re_split = F(lambda regex: F(lambda s: re.split(regex, s))) | |
concat = F(lambda seq1: F(lambda seq2: seq1 + seq2)) | |
replace = F(lambda old: F(lambda new: F(lambda seq: seq.replace(old, new)))) | |
length = F(lambda seq: len(seq)) | |
prop_eq = F(lambda k: F(lambda v: F(lambda obj: k in obj and obj[k] == v))) | |
ascend = F( | |
lambda fn: F( | |
lambda a: F( | |
lambda b: fn(a) - fn(b)))) | |
descend = F( | |
lambda fn: F( | |
lambda a: F( | |
lambda b: fn(b) - fn(a)))) | |
pathEq = F( | |
lambda arr: F( | |
lambda v: F( | |
lambda obj: obj[arr[0]] == v if len(arr) == 1 else pathEq(tail(arr))(v)(obj[head(arr)])))) | |
gets = F(lambda ks: F(lambda obj: foldl(lambda acc: lambda k: concat(acc)([obj[k]]))([])(ks))) | |
join = F(lambda sep: F(lambda seq: sep.join(seq))) | |
find = F(lambda fn: F(lambda seq: next((i for i in seq if fn(i)), None))) | |
lens_prop = F(lambda k: F(lambda fn: F(lambda obj: merge(omit([k])(obj))({k: fn((obj[k]))})))) | |
lens_index = F(lambda idx: F(lambda f: F(lambda seq: seq[:idx] + [f(seq[idx])] + seq[idx+1:]))) | |
over = F(lambda lens: F(lambda fn: F(lambda obj: lens(fn)(obj)))) | |
zlfill = F(lambda n: F(lambda s: s.zfill(n))) | |
tap = F(lambda fn: F(lambda v: (fn(v) or True) and v)) | |
ap = S | |
sort = F(lambda comparator: F(lambda seq: sorted(seq, cmp=comparator))) | |
sort_by = F(lambda ordinal_fn: F(lambda seq: sorted(seq, key=ordinal_fn))) | |
flip = F(lambda f: F(lambda b: F(lambda a: f(a)(b)))) | |
fapply = F(lambda fn: F(lambda seq: foldl(lambda f: lambda e: f(e))(fn)(seq))) | |
def __for_each__(fn, seq): | |
for elem in seq: | |
fn(elem) | |
return seq | |
for_each = F(lambda fn: F(lambda seq: __for_each__(fn, seq))) | |
o = F(lambda f: F(lambda g: F(lambda x: f(g(x))))) | |
@recur.tco | |
def __aperture__(acc, seq, n): | |
if is_empty(seq): | |
return False, acc | |
if is_empty(acc): | |
return True, ([seq[0:n]], seq[n:], n) | |
rem = last(acc)[-(n-1):] | |
acc.append(rem + [head(seq)]) | |
return True, (acc, tail(seq), n) | |
aperture = F(lambda n: F(lambda seq: __aperture__([], seq, n))) | |
to_pairs = F(lambda obj: [[k, v] for k, v in obj.items()]) | |
from_pairs = F(lambda seq: {k: v for k, v in seq}) | |
multi_group_by = F(lambda f: F(lambda g: F(lambda seq: (from_pairs << map(over(lens_index(1))(groupby(g))) << to_pairs << groupby(f))(seq)))) | |
contains = F(lambda s: F(lambda seq: s in seq)) | |
_or_ = F(lambda bool1: F(lambda bool2: bool1 or bool2)) | |
_and_ = F(lambda bool1: F(lambda bool2: bool1 and bool2)) | |
f_any = F(lambda pred: F(lambda seq: (foldl(_or_)(False) << map(pred))(seq))) | |
f_all = F(lambda pred: F(lambda seq: (foldl(_and_)(True) << map(pred))(seq))) | |
evolve = F(lambda t: | |
F(lambda obj: | |
foldl(lambda acc: lambda pair: | |
merge(acc)(over(lens_prop(pair[0]))(pair[1])(acc) if pair[0] in acc else {}))(obj)(to_pairs(t)))) | |
repeat = F(lambda n: F(lambda s: [s for i in range(1, n+1)])) | |
intersperse = F(lambda s: F(lambda seq: foldl(lambda acc: lambda e: concat(acc)([e, s]))([])(seq)[:-1])) | |
intersperse_with = F(lambda f: F(lambda seq: concat(foldl(lambda acc: lambda pair: concat(acc)([pair[0], f(pair[0])(pair[1])]))([])(aperture(2)(seq)))([seq[-1]]))) | |
add = F(lambda a: F(lambda b: a + b)) | |
subtract = F(lambda a: F(lambda b: a - b)) | |
multiply = F(lambda a: F(lambda b: a * b)) | |
pluck = F(lambda k: F(lambda seq: map(get(k))(seq))) | |
compose = F(lambda *fns: F(lambda arg: foldr(A)(arg)(list(fns)))) | |
def __split_when__(pred, seq): | |
matched = False | |
for idx, elem in enumerate(seq): | |
if pred(elem): | |
matched = True | |
break | |
return [seq[:idx], seq[idx:]] if matched else [seq, []] | |
split_when = F(lambda pred: F(lambda seq: __split_when__(pred, seq))) | |
between = F(lambda start: F(lambda end: F(lambda val: start <= val <= end))) | |
any_pass = F(lambda preds: F(lambda val: foldl(lambda acc: lambda pred: acc or pred(val))(False)(preds))) | |
all_pass = F(lambda preds: F(lambda val: foldl(lambda acc: lambda pred: acc and pred(val))(True)(preds))) | |
if_else = F(lambda cond: F(lambda on_true: F(lambda on_false: F(lambda v: on_true(v) if cond(v) else on_false(v))))) | |
zip_with = F(lambda f: F(lambda seq1: F(lambda seq2: map(fapply(f))(zip(seq1)(seq2))))) | |
converge = F(lambda f: F(lambda fns: F(lambda x: fapply(f)(map(T(x))(fns))))) | |
use_with = F(lambda f: F(lambda fns: F(lambda *vals: fapply(f)(zip_with(A)(fns)(list(vals)))))) | |
def elapsed(fn): | |
def __helper__(*args): | |
start = datetime.now() | |
ret = apply(fn, args) | |
end = datetime.now() | |
print('Time taken for ' + fn.__name__ + ' => (hh:mm:ss.ms) {}'.format(end - start)) | |
return ret | |
return __helper__ | |
sum = F(lambda seq: foldl(add)(0)(seq)) | |
unapply = F(lambda fn: F(lambda *args: fn(list(args)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment