Created
October 14, 2015 16:16
-
-
Save tp7/1e39044e1b660ef0a02c to your computer and use it in GitHub Desktop.
VSynth lutspa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import numpy as np | |
import vapoursynth as vs | |
def bool_to_number(function): | |
def wrapped(*args): | |
value = function(*args) | |
return np.where(value, np.array(1.0), np.array(-1.0)) | |
return wrapped | |
def get_limits(dtype): | |
try: | |
info = np.iinfo(dtype) | |
except ValueError: | |
info = np.finfo(dtype) | |
return info.min, info.max | |
def bitwise_invert(a, intermediate_type): | |
limits = get_limits(intermediate_type) | |
return np.invert(a.clip(*limits).astype(intermediate_type)).astype("float") | |
def bitwise_or(a, b, intermediate_type): | |
limits = get_limits(intermediate_type) | |
return np.bitwise_or(a.clip(*limits).astype(intermediate_type), | |
b.clip(*limits).astype(intermediate_type)).astype("float") | |
def bitwise_and(a, b, intermediate_type): | |
limits = get_limits(intermediate_type) | |
return np.bitwise_and(a.clip(*limits).astype(intermediate_type), | |
b.clip(*limits).astype(intermediate_type)).astype("float") | |
def bitwise_xor(a, b, intermediate_type): | |
limits = get_limits(intermediate_type) | |
return np.bitwise_xor(a.clip(*limits).astype(intermediate_type), | |
b.clip(*limits).astype(intermediate_type)).astype("float") | |
def shift_left(a, b, intermediate_type): | |
limits = get_limits(intermediate_type) | |
int_a = a.clip(*limits).astype(intermediate_type) | |
return np.where(b >= 0, | |
int_a << b.clip(*limits).astype(intermediate_type), | |
int_a >> (-b).clip(*limits).astype(intermediate_type)).astype("float") | |
def shift_right(a, b, intermediate_type): | |
limits = get_limits(intermediate_type) | |
int_a = a.clip(*limits).astype(intermediate_type) | |
return np.where(b >= 0, | |
int_a >> b.clip(*limits).astype(intermediate_type), | |
int_a << (-b).clip(*limits).astype(intermediate_type)).astype("float") | |
unary_functions = { | |
"~u": lambda a: bitwise_invert(a, "uint64"), | |
"~s": lambda a: bitwise_invert(a, "int64"), | |
"cos": np.cos, | |
"sin": np.sin, | |
"tan": np.tan, | |
"log": np.log, | |
"exp": np.exp, | |
"abs": np.abs, | |
"atan": np.arctan, | |
"acos": np.arccos, | |
"asin": np.arcsin, | |
"round": np.round, | |
"ceil": np.ceil, | |
"floor": np.floor, | |
"trunc": np.trunc, | |
} | |
binary_functions = { | |
"+": np.add, | |
"-": np.subtract, | |
"*": np.multiply, | |
"/": lambda a, b: a / np.where(b == 0, b + 0.0000001, b), | |
"max": np.maximum, | |
"min": np.minimum, | |
">": bool_to_number(np.greater), | |
">=": bool_to_number(np.greater_equal), | |
"<": bool_to_number(np.less), | |
"<=": bool_to_number(np.less_equal), | |
"=": bool_to_number(np.equal), | |
"==": bool_to_number(np.equal), | |
"!=": bool_to_number(np.not_equal), | |
"^": np.power, | |
"%": np.mod, | |
"&": bool_to_number(lambda a, b: np.logical_and(a > 0, b > 0)), | |
"|": bool_to_number(lambda a, b: np.logical_or(a > 0, b > 0)), | |
"&!": bool_to_number(lambda a, b: np.logical_and(a > 0, b <= 0)), | |
"°": bool_to_number(lambda a, b: np.logical_xor(a > 0, b > 0)), | |
"@": bool_to_number(lambda a, b: np.logical_xor(a > 0, b > 0)), | |
"&u": lambda a, b: bitwise_and(a, b, "uint64"), | |
"|u": lambda a, b: bitwise_or(a, b, "uint64"), | |
"°u": lambda a, b: bitwise_xor(a, b, "uint64"), | |
"@u": lambda a, b: bitwise_xor(a, b, "uint64"), | |
"<<": lambda a, b: shift_left(a, b, "uint64"), | |
"<<u": lambda a, b: shift_left(a, b, "uint64"), | |
">>": lambda a, b: shift_right(a, b, "uint64"), | |
">>u": lambda a, b: shift_right(a, b, "uint64"), | |
"&s": lambda a, b: bitwise_and(a, b, "int64"), | |
"|s": lambda a, b: bitwise_or(a, b, "int64"), | |
"°s": lambda a, b: bitwise_xor(a, b, "int64"), | |
"@s": lambda a, b: bitwise_xor(a, b, "int64"), | |
"<<s": lambda a, b: shift_left(a, b, "int64"), | |
">>s": lambda a, b: shift_right(a, b, "int64"), | |
} | |
ternary_functions = { | |
"?": lambda condition, left, right: np.where(condition > 0, left, right), | |
"clip": lambda value, bottom, top: np.clip(value, bottom, top) | |
} | |
class Context(object): | |
def __init__(self, ops_string): | |
self._tokens = [] | |
for token in reversed(ops_string.split()): | |
try: | |
self._tokens.append(float(token)) | |
except ValueError: | |
self._tokens.append(token) | |
self._pos = 0 | |
def reset(self): | |
self._pos = 0 | |
def pop(self): | |
value = self._tokens[self._pos] | |
self._pos += 1 | |
return value | |
def evaluate(context, x, y): | |
op = context.pop() | |
if op == 'x': | |
return x | |
elif op == 'y': | |
return y | |
elif op == 'po': | |
return np.array(3.1415927) | |
elif isinstance(op, float): | |
return np.array(op) | |
elif op in unary_functions: | |
arg = evaluate(context, x, y) | |
return unary_functions[op](arg) | |
elif op in binary_functions: | |
arg2 = evaluate(context, x, y) | |
arg1 = evaluate(context, x, y) | |
return binary_functions[op](arg1, arg2) | |
elif op in ternary_functions: | |
arg3 = evaluate(context, x, y) | |
arg2 = evaluate(context, x, y) | |
arg1 = evaluate(context, x, y) | |
return ternary_functions[op](arg1, arg2, arg3) | |
else: | |
raise ValueError("Unexpected token") | |
def create_plane(expr_string, width, height, relative, biased, output_format): | |
def scale(value, maximum): | |
if relative: | |
return value * 1.0 / (maximum if biased or maximum < 2 else maximum - 1) | |
return value | |
context = Context(expr_string) | |
plane = np.empty((height, width), "float") | |
x_list = np.array([scale(x, width) for x in range(width)], dtype="float") | |
for y in range(height): | |
plane[y] = evaluate(context, x_list, np.array(scale(y, height))) | |
context.reset() | |
plane = plane.clip(*get_limits(output_format)).astype(output_format) | |
return plane | |
_planes_cache = {} | |
def lutspa(clip, mode="relative", relative=True, biased=True, expr="x", | |
y_expr=None, u_expr=None, v_expr=None, y=3, u=1, v=1, chroma=None): | |
relative, biased = { | |
"absolute": (False, False), | |
"relative inclusive": (True, False), | |
"relative closed": (True, False), | |
"relative exclusive": (True, True), | |
"relative opened": (True, True), | |
"relative": (True, True) | |
}.get(mode, (relative, biased)) | |
if chroma is not None: | |
u, v = { | |
"copy": (2, 2), | |
"process": (3,3), | |
"copy first": (2, 2) | |
}.get(chroma) or (-int(chroma), -int(chroma)) | |
colorspaces = { | |
(vs.YUV, vs.INTEGER, 1): "uint8", | |
(vs.YUV, vs.INTEGER, 2): "uint16", | |
(vs.YUV, vs.INTEGER, 4): "uint32", | |
(vs.YUV, vs.FLOAT, 4): "float32" | |
} | |
output_format = colorspaces.get((clip.format.color_family, | |
clip.format.sample_type, | |
clip.format.bytes_per_sample)) | |
if not output_format: | |
raise ValueError("Unsupported colorspace") | |
modes_lookup = (y, u, v) | |
expressions_lookup = (y_expr, u_expr, v_expr) | |
def lutspa_core(n, f): | |
frame = f.copy() | |
for plane in range(frame.format.num_planes): | |
mode = modes_lookup[plane] | |
write_array = np.array(frame.get_write_array(plane), copy=False) | |
if mode < 0: | |
write_array.fill(-mode) | |
elif mode == 3: | |
expression = expressions_lookup[plane] or expr | |
height, width = write_array.shape | |
key = (expression, width, height, output_format) | |
if key not in _planes_cache: | |
_planes_cache[key] = create_plane(expression, width, height, | |
relative, biased, output_format) | |
write_array[:] = _planes_cache[key] | |
return frame | |
return vs.get_core().std.ModifyFrame(clip, clip, lutspa_core) | |
core = vs.get_core() | |
blank = core.std.BlankClip(width=848, height=480, length=250, format=vs.YUV420P8) | |
out = lutspa(blank, mode="relative", y_expr="x 0.5 > x 255 * y 255 * &u x y max 255 * ?", | |
expr="x 255 * y 255 * &u 112 max 144 min", | |
chroma="process") | |
out.set_output() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment