Skip to content

Instantly share code, notes, and snippets.

@PaulSchweizer
Created August 6, 2019 11:07
Show Gist options
  • Save PaulSchweizer/0256b942406a9e765b2a3858b09cf8c4 to your computer and use it in GitHub Desktop.
Save PaulSchweizer/0256b942406a9e765b2a3858b09cf8c4 to your computer and use it in GitHub Desktop.
Pyblish with Flowpipe
"""Convert a pyblish context with collected instances to a flowpipe graph:
+----------------------------+ +----------------------------+ +-----------------------------+
| ValidatorA Instance 0 | | ExtractorA Instance 0 | | IntegratorA Instance 0 |
|----------------------------| |----------------------------| |-----------------------------|
% contexts | % contexts | % contexts |
o contexts.0<{"results"> | +--->o contexts.0<> | +--->o contexts.0<> |
o instance<{"data": {> | |--->o contexts.1<> | |--->o instance<> |
o plugin<"Validator> | |--->o instance<> | | o plugin<"Integrato> |
o step<"validate"> | | o plugin<"Extractor> | | o step<"integrate> |
| context o-----+ o step<"extract"> | | | context o
| instance o | | context o-----+ | instance o
+----------------------------+ | | instance o-----| +-----------------------------+
+----------------------------+ | +----------------------------+ | +-----------------------------+
| ValidatorA Instance 1 | | +----------------------------+ | | IntegratorA Instance 1 |
|----------------------------| | | ExtractorA Instance 1 | | |-----------------------------|
% contexts | | |----------------------------| | % contexts |
o contexts.0<{"results"> | | % contexts | |--->o contexts.0<> |
o instance<{"data": {> | |--->o contexts.0<> | |--->o instance<> |
o plugin<"Validator> | |--->o contexts.1<> | | o plugin<"Integrato> |
o step<"validate"> | |--->o instance<> | | o step<"integrate> |
| context o-----| o plugin<"Extractor> | | | context o
| instance o | o step<"extract"> | | | instance o
+----------------------------+ | | context o-----| +-----------------------------+
+----------------------------+ | | instance o-----| +-----------------------------+
| ValidatorB Instance 0 | | +----------------------------+ | | IntegratorB Instance 0 |
|----------------------------| | | |-----------------------------|
% contexts | | | % contexts |
o contexts.0<{"results"> | | +--->o contexts.0<> |
o instance<{"data": {> | | +--->o instance<> |
o plugin<"Validator> | | | o plugin<"Integrato> |
o step<"validate"> | | | o step<"integrate> |
| context o-----+ | | context o
| instance o-----+ | | instance o
+----------------------------+ | | +-----------------------------+
+----------------------------+ | | +-----------------------------+
| ValidatorB Instance 1 | | | | IntegratorB Instance 1 |
|----------------------------| | | |-----------------------------|
% contexts | | | % contexts |
o contexts.0<{"results"> | | +--->o contexts.0<> |
o instance<{"data": {> | | +--->o instance<> |
o plugin<"Validator> | | o plugin<"Integrato> |
o step<"validate"> | | o step<"integrate> |
| context o-----+ | context o
| instance o-----+ | instance o
+----------------------------+ +-----------------------------+
-- Evaluate -----------------------------
Instance 0 - ValidatorA PASSED
Instance 1 - ValidatorA ERROR
Instance 0 - ValidatorB PASSED
Instance 1 - ValidatorB PASSED
Instance 0 - ExtractorA
Instance 0 - IntegratorA
Instance 0 - IntegratorB
"""
import os
import pyblish.api
import pyblish.lib
import pyblish.util
from flowpipe.graph import Graph
from flowpipe.node import Node
# -----------------------------------------------------------------------------
# PYBLISH
# Some demo pyblish plugins
# -----------------------------------------------------------------------------
class Collector(pyblish.api.Collector):
def process(self, context):
for i in range(2):
context.create_instance(
"Instance {0}".format(i), family="my-family")
class ValidatorA(pyblish.api.Validator):
family = ["my-family"]
def process(self, instance):
if instance.name.endswith("1"):
raise Exception("{0} - ValidatorA ERROR".format(instance.name))
print instance, "- ValidatorA PASSED"
class ValidatorB(pyblish.api.Validator):
family = ["my-family"]
def process(self, instance):
print instance, "- ValidatorB PASSED"
class ExtractorA(pyblish.api.Extractor):
family = ["my-family"]
def process(self, instance):
print instance, "- ExtractorA"
class IntegratorA(pyblish.api.Integrator):
family = ["my-family"]
def process(self, instance):
print instance, "- IntegratorA"
class IntegratorB(pyblish.api.Integrator):
family = ["my-family"]
def process(self, instance):
print instance, "- IntegratorB"
# -----------------------------------------------------------------------------
# FLOWPIPE NODE
# -----------------------------------------------------------------------------
@Node(outputs=["context", "instance"])
def PyblishNode(contexts, instance, plugin, step):
"""Wrapping the given pyblish instance plugin into a flowpipe node."""
all_plugins = pyblish.api.discover()
context = json_deserialize_context(contexts.values(), all_plugins)
instance = json_deserialize_instance(instance, context)
# Bypassing if there are errors in the results for this instance
for result in context.data.get("results", []):
if instance == result["instance"] and not result["success"]:
return {
"context": None,
"instance": None
}
plugin = [p for p in all_plugins if p.__name__ == plugin][0]
if step == "validate":
pyblish.util.validate(context, [plugin], None)
elif step == "extract":
pyblish.util.extract(context, [plugin], None)
elif step == "integrate":
pyblish.util.integrate(context, [plugin], None)
return {
"context": json_serialize_context(context),
"instance": json_serialize_instance(instance)
}
# -----------------------------------------------------------------------------
# CONVERT PYBLISH TO FLOWPIPE GRAPH
# -----------------------------------------------------------------------------
def convert_pyblish_to_flowpipe_graph(context):
"""Convert the found instances into an equivalent flowpipe graph."""
instances = [instance for instance in context]
if not instances:
return
all_plugins = pyblish.api.discover()
context_data = json_serialize_context(context)
graph = Graph(name="Pyblish with Flowpipe")
for instance in instances:
plugins = pyblish.api.plugins_by_instance(all_plugins, instance)
validators = [p for p in plugins if pyblish.lib.inrange(
number=p.order, base=pyblish.api.ValidatorOrder)]
validate_nodes = []
for validator in validators:
validate_node = PyblishNode(
graph=graph, name="{validator} {name}".format(
validator=validator.__name__, name=instance.data["name"]),
step="validate")
validate_node.inputs["plugin"].value = validator.__name__
validate_node.inputs["contexts"]["0"].value = context_data
validate_node.inputs["instance"].value = json_serialize_instance(instance)
validate_nodes.append(validate_node)
extractors = [p for p in plugins if pyblish.lib.inrange(
number=p.order, base=pyblish.api.ExtractorOrder)]
extract_nodes = []
for extractor in extractors:
extract_node = PyblishNode(
graph=graph, name="{extractor} {name}".format(
extractor=extractor.__name__, name=instance.data["name"]),
step="extract")
extract_node.inputs["plugin"].value = extractor.__name__
extract_nodes.append(extract_node)
for i, validate_node in enumerate(validate_nodes):
validate_node.outputs["context"].connect(
extract_node.inputs["contexts"][str(i)])
validate_node.outputs["instance"].connect(
extract_node.inputs["instance"])
integrators = [p for p in plugins if pyblish.lib.inrange(
number=p.order, base=pyblish.api.IntegratorOrder)]
for integrator in integrators:
integrate_node = PyblishNode(
graph=graph, name="{integrator} {name}".format(
integrator=integrator.__name__, name=instance.data["name"]),
step="integrate")
integrate_node.inputs["plugin"].value = integrator.__name__
for i, extract_node in enumerate(extract_nodes):
extract_node.outputs["context"].connect(
integrate_node.inputs["contexts"][str(i)])
extract_node.outputs["instance"].connect(
integrate_node.inputs["instance"])
return graph
# -----------------------------------------------------------------------------
# PYBLISH SERIALIZATION UTILITIES
# -----------------------------------------------------------------------------
def json_serialize_context(context):
"""Serialize the context to a JSON-compatible dict."""
json_data = {}
for result in context.data.get("results", []):
json_data.setdefault("results", [])
instance = json_serialize_instance(result["instance"])
data = {
"instance": instance,
"records": result["records"],
"success": result["success"],
"plugin": result["plugin"].__name__,
"action": result["action"],
"progress": result["progress"],
"duration": result["duration"],
"error": None if result["error"] is None else str(result["error"]),
"asset": instance,
}
json_data["results"].append(data)
return json_data
def json_deserialize_context(contexts, plugins):
"""Deserialize the context from a JSON-compatible dict."""
context = pyblish.api.Context()
for data in contexts:
if data is None:
continue
for result in data.get("results", []):
context.data.setdefault("results", [])
i = json_deserialize_instance(result["instance"], context)
result = {
"instance": i,
"records": result["records"],
"success": result["success"],
"plugin": [p for p in plugins if p.__name__ == result["plugin"]][0],
"action": result["action"],
"progress": result["progress"],
"duration": result["duration"],
"error": result["error"],
"asset": i
}
context.data["results"].append(result)
return context
def json_serialize_instance(instance):
"""Serialize the instance to a JSON-compatible dict."""
if instance is None:
return None
json_data = {}
json_data["id"] = instance.id
json_data["data"] = instance.data
return json_data
def json_deserialize_instance(json_data, context):
"""Deserialize an instance from a JSON-compatible dict."""
if json_data is None:
return None
for instance in context:
if instance.id == json_data["id"]:
return instance
instance = context.create_instance(**json_data["data"])
instance._id = json_data["id"]
return instance
# -----------------------------------------------------------------------------
# EXECUTE
# -----------------------------------------------------------------------------
if __name__ == "__main__":
pyblish.api.deregister_all_paths()
pyblish.api.deregister_all_plugins()
pyblish.api.register_plugin_path(os.path.dirname(__file__))
context = pyblish.util.collect()
graph = convert_pyblish_to_flowpipe_graph(context)
print graph
print "\n-- Evaluate -----------------------------\n"
graph.evaluate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment