Skip to content

Instantly share code, notes, and snippets.

@cretz
Last active August 13, 2024 14:21
Show Gist options
  • Save cretz/42112506fb97eaaf3ba6b1469eb0d0df to your computer and use it in GitHub Desktop.
Save cretz/42112506fb97eaaf3ba6b1469eb0d0df to your computer and use it in GitHub Desktop.
Demonstration of Temporal payload codec and payload converter applying to specific object
# GH gists don't allow empty

Putting all these files in my_folder and running python -m my_folder.main gives:

Called activity with MySpecialObject(foo='some-foo-val')
Called activity with some-other-val

And when reviewing the history, you can see the special encoding/object:

      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "dGV4dC9teS1wYXlsb2FkLWVuY29kaW5n"
            },
            "data": "c29tZS1mb28tdmFs"
          }
        ]
      }

Which base64 decodes into text/my-payload-encoding encoding and some-foo-val data. And the non-special encoding/object:

      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "anNvbi9wbGFpbg=="
            },
            "data": "InNvbWUtb3RoZXItdmFsIg=="
          }
        ]
      }

Which base64 decodes into json/plain and "some-other-val".

import asyncio
import dataclasses
from uuid import uuid4
from temporalio.client import Client
from temporalio.converter import DataConverter
from temporalio.worker import Worker
from .shared import (MyCodec, MyPayloadConverter, my_activity_other_obj,
my_activity_special_obj)
from .workflow import MyWorkflow
async def main():
# logging.basicConfig(level=logging.INFO)
client = await Client.connect(
"localhost:7233",
# Use our custom payload converter
data_converter=dataclasses.replace(
DataConverter.default, payload_codec=MyCodec(), payload_converter_class=MyPayloadConverter
),
)
task_queue = f"my-task-queue-{uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflow],
activities=[my_activity_special_obj, my_activity_other_obj],
):
await client.execute_workflow(
MyWorkflow.run,
id=f"my-workflow-{uuid4()}",
task_queue=task_queue,
)
if __name__ == "__main__":
asyncio.run(main())
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Type
from temporalio import activity
from temporalio.api.common.v1 import Payload
from temporalio.converter import (CompositePayloadConverter,
DefaultPayloadConverter,
EncodingPayloadConverter, PayloadCodec)
@dataclass
class MySpecialObject:
foo: str
my_codec_encoding = b"binary/my-codec-encoding"
my_payload_encoding_str = "text/my-payload-encoding"
my_payload_encoding = "text/my-payload-encoding".encode()
class MyCodec(PayloadCodec):
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
res: List[Payload] = []
for payload in payloads:
# We only want to encode payloads that are using our expected
# payload encoding
if payload.metadata.get("encoding") == my_payload_encoding:
res.append(
Payload(
metadata={"encoding": my_codec_encoding},
data=b"my-encoding: " + payload.SerializeToString(),
)
)
else:
res.append(payload)
return res
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
res: List[Payload] = []
for payload in payloads:
# We only want to decode payloads that are using our expected codec
# encoding
if payload.metadata.get("encoding") == my_codec_encoding:
decoded = Payload()
assert payload.data.startswith(b"my-encoding: ")
decoded.ParseFromString(payload.data.removeprefix(b"my-encoding: "))
res.append(decoded)
else:
res.append(payload)
return res
class MyEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return my_payload_encoding_str
def to_payload(self, value: Any) -> Optional[Payload]:
# We only handle our special object
if not isinstance(value, MySpecialObject):
return None
return Payload(
metadata={"encoding": my_payload_encoding},
data=value.foo.encode(),
)
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
# We can assume we have the right encoding, and also assert hint
assert payload.metadata.get("encoding") == my_payload_encoding
assert type_hint == MySpecialObject
return MySpecialObject(payload.data.decode())
class MyPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
super().__init__(
MyEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)
@activity.defn
async def my_activity_special_obj(obj: MySpecialObject) -> None:
print(f"Called activity with {obj}")
@activity.defn
async def my_activity_other_obj(obj: str) -> None:
print(f"Called activity with {obj}")
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from .shared import (MySpecialObject, my_activity_other_obj,
my_activity_special_obj)
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
await workflow.execute_activity(
my_activity_special_obj,
MySpecialObject(foo="some-foo-val"),
start_to_close_timeout=timedelta(seconds=5),
)
await workflow.execute_activity(
my_activity_other_obj,
"some-other-val",
start_to_close_timeout=timedelta(seconds=5),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment