Last active
January 3, 2025 02:51
-
-
Save ask/ea3e665f8a6f7568fa61323315fec39a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import faust | |
from faust import cli | |
from faust import web | |
class State(Enum): | |
# State of a shipment. | |
ISSUED = 'ISSUED' | |
CREATED = 'CREATED' | |
STARTED = 'STARTED' | |
FINISHED = 'FINISHED' | |
ERROR = 'ERROR' | |
def is_final(self) -> bool: | |
# this is true when the shipment is completed, | |
# either due to being finished or due to an error. | |
return self in FINAL_STATES | |
#: Set of final states. | |
FINAL_STATES = frozenset({State.FINISHED, State.ERROR}) | |
# This is how shipments are stored in our `shipments` table. | |
# It is also the model for creating a new shipment | |
# in our web view. | |
class Shipment(faust.Record): | |
id: str | |
name: str | |
state: State = State.ISSUED | |
def __eq__(self, other: 'Shipment') -> bool: | |
# If the state is used for identity, then a SetTable will contain | |
# duplicates for every state, so we define custom __eq__/__hash__ | |
# methods. | |
return (self.id, self.name) == (other.id, other.name) | |
def __hash__(self) -> int: | |
return hash((self.id, self.name)) | |
async def mark_as_active(self): | |
"""Add shipment to set of active shipments.""" | |
await shipment_sets.manager.add(key='active', member=self) | |
async def mark_as_inactive(self): | |
"""Remove shipment from set of active shipments.""" | |
await shipment_sets.manager.discard(key='active', member=self) | |
class ShipmentUpdate(faust.Record): | |
"""A shipment update is sent when we want to transition to a new state.""" | |
shipment_id: str | |
new_state: State | |
app = faust.App('shipments', topic_partitions=4) | |
shipments_table = app.Table('shipment', value_type=Shipment) | |
shipment_sets = app.SetTable('shipments', start_manager=True) | |
views = web.Blueprint('shipments') | |
# HTTP endpoint to update shipment to new state. | |
# Go to http://localhost:6066/shipment/{id}/update/{status}/ | |
# | |
# $ curl -X PUT http://localhost:6066/shipment/69174feb/update/FINISHED/ | |
@views.route('/{shipment_id}/update/{status}/') | |
class ShipmentUpdateView(web.View): | |
async def put(self, | |
request: web.Request, | |
shipment_id: str, | |
status: str) -> web.Response: | |
try: | |
new_state = State(status) | |
except ValueError: | |
raise self.NotFound(f'Unknown state: {status!r}') | |
await update_shipment.send( | |
key=shipment_id, | |
value=ShipmentUpdate( | |
shipment_id=shipment_id, | |
new_state=new_state, | |
), | |
) | |
return self.json({'status': 'OK'}) | |
# HTTP endpoint to show shipment status. | |
# Go to http://localhost:6066/shipment/{id}/ | |
# | |
# $ curl -X GET http://localhost:6066/shipment/69174feb/ | |
@views.route('/{shipment_id}/') | |
class ShipmentDetailView(web.View): | |
@app.table_route(table=shipments_table, match_info='shipment_id') | |
async def get(self, | |
request: web.Request, | |
shipment_id: str) -> web.Response: | |
return self.json(shipments_table[shipment_id]) | |
@views.route('/') | |
class ShipmentListView(web.View): | |
"""HTTP endpoint to list active shipments. | |
Go to http://localhost:6066/shipment/ | |
.. sourcecode:: console | |
$ curl -X GET http://localhost:6066/shipment/ | |
Do a POST on this endpoint to create a new shipment: | |
.. sourcecode:: console | |
$ curl -X POST http://localhost:6066/shipment/ -d '{"name": "foo"}' | |
{"status": "success", | |
"shipment_id": "4391ce93-b1ea-403a-8e44-a511451f8722"} | |
""" | |
@app.table_route(table=shipment_sets, exact_key='active') | |
async def get(self, request: web.Request) -> web.Response: | |
return self.json(set(shipment_sets['active'])) | |
async def post(self, request: web.Request) -> web.Response: | |
vars = await request.json() | |
name = vars.get('name') | |
id = faust.uuid() | |
await start_shipment.send(key=id, value=Shipment(id=id, name=name)) | |
return self.json({'status': 'success', 'shipment_id': id}) | |
app.web.blueprints.add('/shipment/', views) | |
@app.agent(value_type=Shipment) | |
async def start_shipment(shipments: faust.Stream[Shipment]) -> None: | |
async for shipment in shipments: | |
# now our shipment transitions to the STARTED state. | |
print(f'NEW SHIPMENT: {shipment!r}') | |
shipments_table[shipment.id] = shipment.derive(state=State.STARTED) | |
await shipment.mark_as_active() | |
@app.agent(value_type=ShipmentUpdate) | |
async def update_shipment(updates: faust.Stream[ShipmentUpdate]) -> None: | |
# transition shipment to new state. | |
async for update in updates: | |
shipment_id = update.shipment_id | |
new_state = State(update.new_state) | |
# Note how this operation is immutable! | |
# This is stored in a log so you will have a versioned | |
# history of shipment state. | |
shipment = shipments_table[shipment_id] | |
new_shipment = shipment.derive(state=new_state) | |
print(f'Shipment {shipment_id} {shipment.state} -> {new_state}') | |
# save the shipment in the table (K/V store) with the new state. | |
shipments_table[shipment.id] = new_shipment | |
if new_state.is_final(): | |
await shipment.mark_as_inactive() | |
# Run the command: | |
# $ python shipments.py finish_shipment <shipment_id> | |
# to finish a sbipment from the command-line. | |
@app.command( | |
cli.option('--final-state', default=State.FINISHED.value), | |
cli.argument('shipment_id')) | |
async def finish_shipment(self: cli.AppCommand, shipment_id: str, | |
final_state: str) -> None: | |
assert id, 'please add --id for shipment id' | |
await update_shipment.send( | |
key=shipment_id, | |
value=ShipmentUpdate( | |
shipment_id=shipment_id, | |
new_state=State(final_state), | |
), | |
) | |
if __name__ == '__main__': | |
app.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment