Skip to content

Instantly share code, notes, and snippets.

@seanchatmangpt
Last active October 3, 2024 07:15
Show Gist options
  • Save seanchatmangpt/5ff08ac3c30adc0902ff7306bbf4cb7a to your computer and use it in GitHub Desktop.
Save seanchatmangpt/5ff08ac3c30adc0902ff7306bbf4cb7a to your computer and use it in GitHub Desktop.
Actor System and Actor
"""
The Actor Module for Reactive Domain-Driven Design (RDDDY) Framework
---------------------------------------------------------------------
This module implements the core Actor abstraction within the RDDDY framework, providing a robust foundation for building reactive, domain-driven systems that are scalable, maintainable, and capable of handling complex, concurrent interactions. The Actor model encapsulates both state and behavior, allowing for asynchronous message passing as the primary means of communication between actors, thus fostering loose coupling and enhanced system resilience.
### Overview
Actors are the fundamental units of computation in the RDDDY framework. Each actor possesses a unique identity, a mailbox for message queuing, and a set of behaviors to handle incoming messages. The Actor module facilitates the creation, supervision, and coordination of actors within an ActorSystem, ensuring that messages are delivered and processed in a manner consistent with the system's domain-driven design principles.
### Multiline Calculus for Actor Behavior Specification
The operational semantics of actors within the RDDDY framework are formalized through a rigorous multiline calculus, which outlines the preconditions, postconditions, and invariants that govern actor behavior and interaction. This calculus serves as a contract, ensuring that actors behave correctly and predictably within their designated domain contexts.
#### Actor State Transition
Given an actor \(A\) with state \(s\) and a message \(m\), the state transition is defined as:
Precondition ( Pre Pre): 𝑠 ∈ 𝑆 s∈S and 𝑚 ∈ 𝑀 m∈M
Transition: 𝑇 ( 𝑠 , 𝑚 ) → 𝑠 ′ T(s,m)→s ′
Postcondition ( Post Post): 𝑠 ′ ∈ 𝑆 ′ s ′ ∈S ′
#### Message Handling
For a message \(m\) handled by the actor, leading to a state modification:
Precondition ( Pre Pre): 𝑠 ∈ 𝑆 s∈S and 𝑚 ∈ 𝑀 m∈M
Handling: 𝐻 ( 𝑚 , 𝑠 ) → 𝑠 ′ ′ H(m,s)→s ′′
Postcondition ( Post Post): 𝑠 ′ ′ ∈ 𝑆 ′ ′ s ′′ ∈S ′′
#### Invariant Maintenance
Ensuring system invariants \(I\) across transitions:
Invariant: 𝐼 ( 𝑆 ) ∧ 𝑇 ( 𝑠 , 𝑚 ) → 𝑠 ′ ⇒ 𝐼 ( 𝑆 ′ ) I(S)∧T(s,m)→s ′ ⇒I(S ′ )
#### Domain-Specific Assertions
Linking actor state transitions to domain logic:
Precondition ( Pre Pre): Δ ( 𝑠 ) Δ(s) and 𝛿 ( 𝑚 ) δ(m)
Domain Logic: 𝐷 ( 𝑠 , 𝑚 ) → Δ ( 𝑠 ′ ) D(s,m)→Δ(s ′ )
Postcondition ( Post Post): Δ ′ ( 𝑠 ′ ) Δ ′ (s ′ )
### Purpose
This calculus not only specifies the expected behavior of actors in response to messages but also integrates domain-specific knowledge, ensuring that actors operate in alignment with the broader domain-driven objectives of the system. By adhering to these specifications, the Actor module provides a reliable and expressive framework for developing systems that are both technically sound and closely aligned with domain requirements.
### Usage
Developers should implement actor behaviors in accordance with the outlined calculus, ensuring that each actor's implementation respects the preconditions, postconditions, and domain-specific assertions relevant to their system's domain logic. This approach facilitates the development of systems that are not only functionally correct but also domain-compliant, thereby enhancing the value and applicability of the RDDDY framework in real-world scenarios.
"""
import asyncio
from loguru import logger
from reactivex.scheduler.eventloop import AsyncIOScheduler
from typing import TYPE_CHECKING, Callable
import reactivex as rx
from reactivex import operators as ops
from rdddy.messages import *
if TYPE_CHECKING:
from rdddy.actor_system import ActorSystem
class Actor:
"""
Represents an actor within the RDDDY framework.
Actors are fundamental units of computation in the RDDDY framework, encapsulating both state and behavior.
They communicate asynchronously through message passing, promoting loose coupling and system resilience.
Args:
actor_system (ActorSystem): The ActorSystem to which the actor belongs.
actor_id (int, optional): The unique identifier of the actor. Defaults to None.
Attributes:
actor_system (ActorSystem): The ActorSystem to which the actor belongs.
actor_id (int): The unique identifier of the actor.
mailbox (Subject): A subject for message queuing.
handlers (dict): A mapping of message types to corresponding handler methods.
Methods:
start(scheduler): Starts the actor's mailbox processing loop.
on_next(message): Callback function for processing incoming messages.
on_error(error): Callback function for handling errors in the actor's mailbox.
on_completed(): Callback function when the actor's mailbox stream completes.
receive(message): Processes an incoming message.
send(recipient_id, message): Sends a message to another actor.
publish(message): Publishes a message to the actor system.
map_handlers(): Maps message types to corresponding handler methods.
"""
def __init__(self, actor_system: "ActorSystem", actor_id=None):
self.actor_system = actor_system
self.actor_id = actor_id or id(self)
self.mailbox = rx.subject.Subject()
self.handlers = self.map_handlers()
async def start(self, scheduler: AsyncIOScheduler):
"""
Initiates the processing loop for the actor's mailbox, ensuring asynchronous message handling.
Preconditions (Pre):
- The actor's mailbox must be initialized.
- A valid scheduler must be provided.
Transition (T):
- Initiates the processing loop for the actor's mailbox, enabling asynchronous message handling.
Postconditions (Post):
- The actor's mailbox processing loop has started successfully.
Args:
scheduler: An asynchronous scheduler used to control the execution of tasks.
"""
self.mailbox.pipe(ops.observe_on(scheduler)).subscribe(
on_next=self.on_next, # Synchronous wrapper for async handler
on_error=self.on_error,
on_completed=self.on_completed,
)
logger.info(f"Actor {self.actor_id} started")
def on_next(self, message: Message):
"""
Handles the next incoming message in the actor's mailbox.
Preconditions (Pre):
- The incoming message must be a valid instance of the Message class.
Transition (T):
- Processes the incoming message asynchronously.
Postconditions (Post):
- The incoming message has been processed by the actor.
Args:
message (Message): The incoming message to be processed.
"""
# Schedule the async handler as a new task
logger.debug(f"Actor {self.actor_id} received message: {message}")
asyncio.create_task(self.receive(message))
def on_error(self, error):
"""
Handles errors that occur in the actor's mailbox processing.
Preconditions (Pre):
- None
Transition (T):
- Handles the error generated during mailbox processing.
Postconditions (Post):
- The error has been handled, and appropriate action has been taken.
Args:
error: The error object representing the error that occurred.
"""
logger.error(f"Error in actor {self.actor_id} mailbox: {error}")
def on_completed(self):
"""
Handles the completion of the actor's mailbox stream.
Preconditions (Pre):
- None
Transition (T):
- Handles the completion event of the actor's mailbox stream.
Postconditions (Post):
- The actor's mailbox stream has completed, and appropriate action has been taken.
"""
logger.debug(f"Actor {self.actor_id} mailbox stream completed")
async def receive(self, message: Message):
"""
Processes an incoming message received by the actor.
Preconditions (Pre):
- The incoming message must be a valid instance of the Message class.
Transition (T):
- Processes the incoming message asynchronously, invoking the appropriate handler method.
Postconditions (Post):
- The incoming message has been successfully processed by the actor.
Args:
message (Message): The incoming message to be processed.
"""
try:
handler = self.handlers.get(type(message))
if handler:
await handler(message)
except Exception as e:
error_message = f"Error in actor {self.actor_id} processing message: {e}"
# Broadcast an error event through the actor system
await self.publish(Event(content=error_message))
logger.error(error_message)
async def publish(self, message: Message):
"""
Publishes a message to the actor system for distribution.
Preconditions (Pre):
- The message must be a valid instance of the Message class.
Transition (T):
- Publishes the message to the actor system for distribution.
Postconditions (Post):
- The message has been successfully published to the actor system.
Args:
message (Message): The message to be published.
"""
if message.actor_id == -1:
message.actor_id = self.actor_id
await self.actor_system.publish(message)
def map_handlers(self) -> dict[Type[Message], Callable]:
"""
Maps message types to corresponding handler methods.
Preconditions (Pre):
- None
Transition (T):
- Iterates through the methods of the actor instance and identifies callable methods with annotations.
- Maps message types to corresponding handler methods based on method annotations.
Postconditions (Post):
- A dictionary containing message types as keys and corresponding handler methods as values has been generated.
"""
handlers = {}
for name, method in inspect.getmembers(self):
if callable(method) and hasattr(method, "__annotations__"):
annotations = method.__annotations__
for arg in annotations.values():
try:
if issubclass(arg, Message):
handlers[arg] = method
except TypeError:
pass
del handlers[Message]
return handlers
"""
Actor System Module Documentation
This module, actor_system.py, implements the ActorSystem class within the Reactive Domain-Driven Design (RDDDY) framework. It serves as the orchestrator for actor lifecycle management, message passing, and system-wide coordination, ensuring that the principles of the Actor model are adhered to in a domain-driven context.
Overview:
The ActorSystem is responsible for creating, managing, and terminating actors, facilitating asynchronous message passing between them, and maintaining system invariants. It is designed to operate seamlessly within an asynchronous programming environment, leveraging Python's asyncio library to handle concurrent operations efficiently.
ActorSystem Multiline Calculus Notation (AMCN):
The behavior and operations within the ActorSystem are rigorously defined by the ActorSystem Multiline Calculus Notation (AMCN), ensuring a formalized approach to actor management and message dissemination. The AMCN outlines the preconditions, actions, and postconditions for each operation within the system, integrating domain-specific assertions to align computational processes with the system's domain logic.
1. Actor Lifecycle Management
Actor Creation (𝐴𝑐𝑟𝑒𝑎𝑡𝑒Acreate​):
- Precondition (Pre Pre): ¬∃𝑎∈𝐴∣𝑎.𝑖𝑑=𝑖𝑑𝑛𝑒𝑤 ¬∃a∈A∣a.id=id new​
- Action: 𝑐𝑟𝑒𝑎𝑡𝑒𝐴𝑐𝑡𝑜𝑟(𝑖𝑑𝑛𝑒𝑤,𝑇𝑦𝑝𝑒)→𝑎𝑛𝑒𝑤 createActor(id new​,Type)→a new​
- Postcondition (Post Post): ∃𝑎∈𝐴∣𝑎.𝑖𝑑=𝑖𝑑𝑛𝑒𝑤∧𝑎.𝑡𝑦𝑝𝑒=𝑇𝑦𝑝𝑒 ∃a∈A∣a.id=id new​∧a.type=Type
2. Message Dispatching
Direct Message Sending (𝑀𝑠𝑒𝑛𝑑Msend​):
- Precondition (Pre Pre): ∃𝑎𝑠𝑒𝑛𝑑𝑒𝑟,𝑎𝑟𝑒𝑐𝑖𝑝𝑖𝑒𝑛𝑡∈𝐴 ∃a sender​,a recipient​∈A
- Action: 𝑠𝑒𝑛𝑑𝑀𝑒𝑠𝑠𝑎𝑔𝑒(𝑎𝑠𝑒𝑛𝑑𝑒𝑟,𝑎𝑟𝑒𝑐𝑖𝑝𝑖𝑒𝑛𝑡,𝑚) sendMessage(a sender​,a recipient​,m)
- Postcondition (Post Post): 𝑚∈𝑎𝑟𝑒𝑐𝑖𝑝𝑖𝑒𝑛𝑡.𝑚𝑎𝑖𝑙𝑏𝑜𝑥 m∈a recipient​.mailbox
Broadcast Messaging (𝑀𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡𝑀broadcast​):
- Precondition (Pre Pre): ∃𝑎𝑠𝑒𝑛𝑑𝑒𝑟∈𝐴 ∃a sender​∈A
- Action: 𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡𝑀𝑒𝑠𝑠𝑎𝑔𝑒(𝑎𝑠𝑒𝑛𝑑𝑒𝑟,𝑚) broadcastMessage(a sender​,m)
- Postcondition (Post Post): ∀𝑎∈𝐴∖{𝑎𝑠𝑒𝑛𝑑𝑒𝑟},𝑚∈𝑎.𝑚𝑎𝑖𝑙𝑏𝑜𝑥 ∀a∈A∖{a sender​},m∈a.mailbox
3. System Invariants and Domain Assertions
Invariant Preservation (𝐼𝑝𝑟𝑒𝑠𝑒𝑟𝑣𝑒Ipreserve​):
- Invariant (𝐼 I): Φ(𝐴,𝑀) Φ(A,M)
- Upon Action (𝑎𝑐𝑡𝑖𝑜𝑛action): 𝑎𝑐𝑡𝑖𝑜𝑛(𝐴,𝑀)→𝐴′,𝑀′ action(A,M)→A′,M′
- Preservation (𝐼′I′): Φ(𝐴′,𝑀′) Φ(A′,M′)
Domain-Specific Logic Integration (𝐷𝑖𝑛𝑡𝑒𝑔𝑟𝑎𝑡𝑒Dintegrate​):
- Precondition (Pre 𝐷 Pre D​): Δ(𝑠𝑔𝑙𝑜𝑏𝑎𝑙) Δ(s global​)
- Action and Domain Logic: 𝑝𝑒𝑟𝑓𝑜𝑟𝑚𝐴𝑐𝑡𝑖𝑜𝑛𝑊𝑖𝑡ℎ𝐷𝑜𝑚𝑎𝑖𝑛𝐿𝑜𝑔𝑖𝑐(𝑎,𝑚,Δ)→𝑠𝑔𝑙𝑜𝑏𝑎𝑙′,Δ′ performActionWithDomainLogic(a,m,Δ)→s global′​,Δ′
- Postcondition (Post 𝐷 Post D​): Δ′(𝑠𝑔𝑙𝑜𝑏𝑎𝑙′) Δ′(s global′​)
Implementation Details:
The ActorSystem is implemented with a focus on modularity, scalability, and ease of use. It provides a high-level API for actor management and message passing, abstracting away the complexities of asynchronous programming and actor coordination. Developers can leverage the ActorSystem to build complex, responsive applications that are both computationally correct and domain-compliant.
Usage:
To use the ActorSystem, instantiate it within your application and use its methods to create actors and manage message passing. The system integrates seamlessly with the asyncio event loop, making it straightforward to incorporate into existing asynchronous applications.
The actor_system.py module, guided by the AMCN, provides a robust foundation for developing actor-based systems within the RDDDY framework, ensuring that applications are built with a solid architectural foundation that promotes maintainability, scalability, and domain-driven design principles.
"""
from asyncio import Future
import asyncio
from typing import TYPE_CHECKING, TypeVar
from loguru import logger
import reactivex as rx
from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler
from rdddy.messages import Message
if TYPE_CHECKING:
from rdddy.actor import Actor
T = TypeVar("T", bound="Actor")
class ActorSystem:
"""
Orchestrates actor lifecycle management, message passing, and system-wide coordination within the RDDDY framework.
The ActorSystem class provides functionalities for creating, managing, and terminating actors, facilitating asynchronous message passing between them, and maintaining system invariants.
Attributes:
actors (dict): A dictionary containing actor IDs as keys and corresponding actor instances as values.
loop (asyncio.AbstractEventLoop): The asyncio event loop used for asynchronous operations.
scheduler (AsyncIOScheduler): An asynchronous scheduler for controlling task execution.
event_stream (Subject): A subject for publishing events within the actor system.
Methods:
actor_of(actor_class, **kwargs): Creates a new actor instance and starts its mailbox processing loop.
actors_of(actor_classes, **kwargs): Creates multiple actor instances of different types.
publish(message): Publishes a message to the actor system for distribution.
remove_actor(actor_id): Removes an actor from the actor system.
send(actor_id, message): Sends a message to a specific actor within the system.
wait_for_event(event_type): Waits for a specific event type to occur within the system.
Implementation Details:
The ActorSystem class implements actor management and message passing functionalities, abstracting away the complexities of asynchronous programming and actor coordination. It integrates seamlessly with the asyncio event loop, ensuring efficient concurrent operations.
Usage:
Instantiate an ActorSystem object within your application to manage actors and coordinate message passing. Use its methods to create actors, send messages, and wait for specific events within the system.
"""
def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None:
"""
Initializes the ActorSystem.
Args:
loop (asyncio.AbstractEventLoop, optional): The asyncio event loop to be used for asynchronous operations.
If not provided, the default event loop will be used.
Attributes:
actors (dict): A dictionary containing actor IDs as keys and corresponding actor instances as values.
loop (asyncio.AbstractEventLoop): The asyncio event loop used for asynchronous operations.
scheduler (AsyncIOScheduler): An asynchronous scheduler for controlling task execution.
event_stream (Subject): A subject for publishing events within the actor system.
"""
self.actors: dict[int, Actor] = {}
self.loop = loop if loop is not None else asyncio.get_event_loop()
self.scheduler = AsyncIOScheduler(loop=self.loop)
self.event_stream = rx.subject.Subject()
async def actor_of(self, actor_class, **kwargs) -> T:
"""
Creates a new actor instance and starts its mailbox processing loop.
T = TypeVar("T", bound="Actor")
Preconditions (Pre):
- None
Transition (T):
- Creates a new instance of the specified actor class.
- Initializes the actor's mailbox.
- Starts the processing loop for the actor's mailbox, enabling asynchronous message handling.
Postconditions (Post):
- A new actor instance has been created and started successfully.
Args:
actor_class: The class of the actor to be created.
**kwargs: Additional keyword arguments to be passed to the actor constructor.
Returns:
T: The created actor instance.
"""
actor = actor_class(self, **kwargs)
self.actors[actor.actor_id] = actor
await actor.start(self.scheduler)
return actor
async def actors_of(self, actor_classes, **kwargs) -> list[T]:
"""
Creates multiple actor instances of different types and starts their mailbox processing loops.
T = TypeVar("T", bound="Actor")
Preconditions (Pre):
- None
Transition (T):
- Creates new instances of the specified actor classes.
- Initializes the mailboxes for each actor.
- Starts the processing loop for each actor's mailbox, enabling asynchronous message handling.
Postconditions (Post):
- Multiple actor instances have been created and started successfully.
Args:
actor_classes (List[Type]): A list of actor classes to be instantiated.
**kwargs: Additional keyword arguments to be passed to the actor constructors.
Returns:
List[T]: A list containing the created actor instances.
"""
actors = []
for actor_class in actor_classes:
actor = await self.actor_of(actor_class, **kwargs)
actors.append(actor)
return actors
async def publish(self, message: "Message"):
"""
Publishes a message to the actor system for distribution.
Preconditions (Pre):
- None
Transition (T):
- Emits the message to the event stream of the actor system.
- Sends the message to each actor within the system for processing.
Postconditions (Post):
- The message has been published to the actor system and processed by relevant actors.
- If the message is an instance of the base Message class, an error is raised.
Args:
message (Message): The message to be published to the actor system.
Raises:
ValueError: If the base Message class is used directly.
"""
if type(message) is Message:
raise ValueError(
"The base Message class should not be used directly. Please use a subclass of Message."
)
self.event_stream.on_next(message)
actors = list(self.actors.values())
for actor in actors:
await self.send(actor.actor_id, message)
async def remove_actor(self, actor_id):
"""
Removes an actor from the actor system.
Preconditions (Pre):
- The actor ID must exist in the actor system.
Transition (T):
- Removes the actor with the specified ID from the actor system.
Postconditions (Post):
- The actor has been successfully removed from the actor system.
Args:
actor_id: The ID of the actor to be removed.
"""
actor = self.actors.pop(actor_id, None)
if actor:
logger.debug(f"Removing actor {actor_id}")
else:
logger.debug(f"Actor {actor_id} not found for removal")
logger.debug(f"Current actors count: {len(self.actors)}")
async def send(self, actor_id: int, message: "Message"):
"""
Sends a message to a specific actor within the actor system.
Preconditions (Pre):
- The actor ID must exist in the actor system.
- The message must be an instance of the Message class.
Transition (T):
- Delivers the message to the specified actor's mailbox for processing.
Postconditions (Post):
- The message has been successfully sent to the specified actor for processing.
Args:
actor_id (int): The ID of the target actor.
message (Message): The message to be sent to the target actor.
"""
logger.debug(f"Sending message {message} to actor {actor_id}")
actor = self.actors.get(actor_id)
if actor:
actor.mailbox.on_next(message)
await asyncio.sleep(0)
else:
logger.debug(f"Actor {actor_id} not found.")
async def wait_for_message(self, message_type: type) -> Future["Message"]:
"""
Waits for a message of a specific type to be published to the actor system.
Preconditions (Pre):
- None
Transition (T):
- Subscribes to the event stream of the actor system.
- Waits until a message of the specified type is published.
Postconditions (Post):
- A message of the specified type has been received from the actor system.
Args:
message_type (type): The type of message to wait for.
Returns:
asyncio.Future: A future object representing the awaited message.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
def on_next(msg):
if isinstance(msg, message_type):
future.set_result(msg)
subscription.dispose()
subscription = self.event_stream.pipe(
ops.filter(lambda msg: isinstance(msg, message_type))
).subscribe(on_next)
return await future
def __getitem__(self, actor_id) -> T:
"""
Retrieves an actor by its ID from the actor system.
Preconditions (Pre):
- The actor ID must exist in the actor system.
Transition (T):
- Retrieves the actor with the specified ID from the actor system.
Postconditions (Post):
- The actor with the specified ID has been successfully retrieved from the actor system.
Args:
actor_id: The ID of the actor to retrieve.
Returns:
Actor: The actor object corresponding to the specified ID.
"""
return self.actors.get(actor_id)
import asyncio
async def main():
print("main")
# await
if __name__ == "__main__":
asyncio.run(main())
import ast
import logging
import inspect
from typing import Type, TypeVar
from dspy import Assert, Module, ChainOfThought, Signature, InputField, OutputField
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
def eval_dict_str(dict_str: str) -> dict:
"""Safely convert str to dict"""
return ast.literal_eval(dict_str)
class PromptToPydanticInstanceSignature(Signature):
"""
Synthesize the prompt into the kwargs fit the model.
Do not duplicate the field descriptions
"""
root_pydantic_model_class_name = InputField(
desc="The class name of the pydantic model to receive the kwargs"
)
pydantic_model_definitions = InputField(
desc="Pydantic model class definitions as a string"
)
prompt = InputField(
desc="The prompt to be synthesized into data. Do not duplicate descriptions"
)
root_model_kwargs_dict = OutputField(
prefix="kwargs_dict: dict = ",
desc="Generate a Python dictionary as a string with minimized whitespace that only contains json valid values.",
)
class PromptToPydanticInstanceErrorSignature(Signature):
"""Synthesize the prompt into the kwargs fit the model"""
error = InputField(desc="Error message to fix the kwargs")
root_pydantic_model_class_name = InputField(
desc="The class name of the pydantic model to receive the kwargs"
)
pydantic_model_definitions = InputField(
desc="Pydantic model class definitions as a string"
)
prompt = InputField(desc="The prompt to be synthesized into data")
root_model_kwargs_dict = OutputField(
prefix="kwargs_dict = ",
desc="Generate a Python dictionary as a string with minimized whitespace that only contains json valid values.",
)
T = TypeVar("T", bound=BaseModel)
class GenPydanticInstance(Module):
"""
A module for generating and validating Pydantic model instances based on prompts.
Usage:
To use this module, instantiate the GenPydanticInstance class with the desired
root Pydantic model and optional child models. Then, call the `forward` method
with a prompt to generate Pydantic model instances based on the provided prompt.
"""
def __init__(
self,
root_model: Type[T],
child_models: list[Type[BaseModel]] = None,
generate_sig=PromptToPydanticInstanceSignature,
correct_generate_sig=PromptToPydanticInstanceErrorSignature,
):
super().__init__()
if not issubclass(root_model, BaseModel):
raise TypeError("root_model must inherit from pydantic.BaseModel")
self.models = [root_model] # Always include root_model in models list
if child_models:
# Validate that each child_model inherits from BaseModel
for model in child_models:
if not issubclass(model, BaseModel):
raise TypeError(
"All child_models must inherit from pydantic.BaseModel"
)
self.models.extend(child_models)
self.output_key = "root_model_kwargs_dict"
self.root_model = root_model
# Concatenate source code of models for use in generation/correction logic
self.model_sources = "\n".join(
[inspect.getsource(model) for model in self.models]
)
# Initialize DSPy ChainOfThought modules for generation and correction
self.generate = ChainOfThought(generate_sig)
self.correct_generate = ChainOfThought(correct_generate_sig)
self.validation_error = None
def validate_root_model(self, output: str) -> bool:
"""Validates whether the generated output conforms to the root Pydantic model."""
try:
model_inst = self.root_model.model_validate(eval_dict_str(output))
return isinstance(model_inst, self.root_model)
except (ValidationError, ValueError, TypeError, SyntaxError) as error:
self.validation_error = error
logger.debug(f"Validation error: {error}")
return False
def validate_output(self, output) -> T:
"""Validates the generated output and returns an instance of the root Pydantic model if successful."""
Assert(
self.validate_root_model(output),
f"""You need to create a kwargs dict for {self.root_model.__name__}\n
Validation error:\n{self.validation_error}""",
)
return self.root_model.model_validate(eval_dict_str(output))
def forward(self, prompt) -> T:
"""
Takes a prompt as input and generates a Python dictionary that represents an instance of the
root Pydantic model. It also handles error correction and validation.
"""
output = self.generate(
prompt=prompt,
root_pydantic_model_class_name=self.root_model.__name__,
pydantic_model_definitions=self.model_sources,
)
output = output[self.output_key]
try:
return self.validate_output(output)
except (AssertionError, ValueError, TypeError) as error:
logger.error(f"Error {str(error)}\nOutput:\n{output}")
# Correction attempt
corrected_output = self.correct_generate(
prompt=prompt,
root_pydantic_model_class_name=self.root_model.__name__,
pydantic_model_definitions=self.model_sources,
error=str(error),
)[self.output_key]
return self.validate_output(corrected_output)
def __call__(self, *args, **kwargs):
return self.forward(kwargs.get("prompt"))
def main():
import dspy
from rdddy.messages import EventStormModel, Event, Command, Query
lm = dspy.OpenAI(max_tokens=3000, model="gpt-4")
dspy.settings.configure(lm=lm)
prompt = """Automated Hygen template full stack system for NextJS.
Express
Express.js is arguably the most popular web framework for Node.js
A typical app structure for express celebrates the notion of routes and handlers, while views and data are left for interpretation (probably because the rise of microservices and client-side apps).
So an app structure may look like this:
app/
routes.js
handlers/
health.js
shazam.js
While routes.js glues everything together:
// ... some code ...
const health = require('./handlers/health')
const shazam = require('./handlers/shazam')
app.get('/health', health)
app.post('/shazam', shazam)
module.exports = app
Unlike React Native, you could dynamically load modules here. However, there's still a need for judgement when constructing the routes (app.get/post part).
Using hygen let's see how we could build something like this:
$ hygen route new --method post --name auth
Since we've been through a few templates as with previous use cases, let's jump straight to the interesting part, the inject part.
So let's say our generator is structured like this:
_templates/
route/
new/
handler.ejs.t
inject_handler.ejs.t
Then inject_handler looks like this:
---
inject: true
to: app/routes.js
skip_if: <%= name %>
before: "module.exports = app"
---
app.<%= method %>('/<%= name %>', <%= name %>)
Note how we're anchoring this inject to before: "module.exports = app". If in previous occasions we appended content to a given line, we're now prepending it.
"""
model_module = GenPydanticInstance(root_model=EventStormModel, child_models=[Event, Command, Query])
model_inst = model_module(prompt=prompt)
print(model_inst)
if __name__ == '__main__':
main()
import asyncio
import dspy
from pydantic import Extra, Field
from loguru import logger
from rdddy.actor_system import ActorSystem
from rdddy.actor import Actor
from rdddy.generators.gen_pydantic_instance import GenPydanticInstance
from rdddy.messages import *
class GenerateDashboardEvent(Event):
page_name: str
widget_type: str
class HygenCLIArgs(BaseModel):
"""The arguments for the Hygen CLI"""
name: str = Field(...,
description="The name associated with the template generation, often used as a filename or identifier.")
route: str = Field(None, description="The route or path where the generated file(s) should be located.")
description: str = Field(None, description="The description of the page")
items: str = Field(None,
description="A string representing items or configurations specific to the generation process.")
class Config:
extra = "allow" # Allows for additional fields beyond those explicitly defined.
class HygenTemplateModel(BaseModel):
generator: str = Field(..., description="Specifies the Hygen generator to be used for code generation.")
action: str = Field(..., description="Defines the action that the generator should perform.")
cli_args: HygenCLIArgs = Field(...,
description="Additional key-value pairs providing specific arguments for the template generation process.")
class Config:
extra = "allow" # Permits additional fields, offering flexibility for extending the model as needed.
class DashboardGeneratorActor(Actor):
def __init__(self, actor_system: ActorSystem, actor_id=None):
super().__init__(actor_system, actor_id=actor_id)
# Retrieve NextJS project root:
self.nextjs_root = "/Users/candacechatman/dev/nextjs-page"
# self.nextjs_root = os.getenv("NEXTJS_PROJECT_ROOT")
if not self.nextjs_root:
# Handle this error - is this an event to the ActorSystem, a log?
print("Error: NEXTJS_PROJECT_ROOT environment variable not found.")
return
async def handle_generate_dashboard_event(self, event: GenerateDashboardEvent):
# ... Logic will come later ...
print(f"Received generation request: {event}")
try:
module = GenPydanticInstance(root_model=HygenTemplateModel, child_models=[HygenCLIArgs])
hygen_inst = module.forward("I need a hygen template to create an about component, use the page generator. and the 'new' action. the route is about")
output = await run_hygen_async(hygen_inst, self.nextjs_root)
logger.debug(output)
except Exception as e:
logger.error(e)
async def run_hygen_async(template_params: HygenTemplateModel, cwd: str = None, overwrite: bool = True): # Use the new model
"""Executes a Hygen command asynchronously... (Same docstring as earlier) """
hygen_command = ["hygen", template_params.generator, template_params.action]
for key, value in template_params.cli_args.model_dump().items(): # Iterate over kwargs
hygen_command.extend(["--"+key, str(value)])
logger.debug(f"Running hygen command: {hygen_command}")
try:
process = await asyncio.create_subprocess_exec(
*hygen_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
response = b'yes' if overwrite else b'no'
process.stdin.write(response)
await process.stdin.drain()
process.stdin.close()
stdout, stderr = await process.communicate()
if process.returncode != 0 or "Error" in stdout.decode(): # Check for non-zero exit code from Hygen
raise ChildProcessError(f"Hygen failed with error:\n{stderr.decode()}{stdout.decode()}")
else:
return stdout.decode()
except ChildProcessError as e:
raise e
async def main():
lm = dspy.OpenAI(max_tokens=500)
dspy.settings.configure(lm=lm)
actor_system = ActorSystem()
generator_actor = await actor_system.actor_of(DashboardGeneratorActor)
await actor_system.publish(GenerateDashboardEvent(page_name="summary", widget_type="line_chart"))
await asyncio.sleep(60)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment