Skip to content

Events

Introduction

Arvel's events provide a simple observer pattern, letting you subscribe and listen for events in your application. Events are immutable Pydantic models (Event subclasses BaseModel with frozen=True); listeners react to them. A single event can have multiple listeners that don't depend on each other, which is a great way to decouple side effects from the code that triggers them.

Registering the Provider

Events are opt-in. Add EventServiceProvider to bootstrap/providers.py. It binds the Event facade and wires the dispatcher to the container (so listeners can be resolved with dependency injection).

Defining Events

An event is a Pydantic model. Declare its payload as typed fields. Events are frozen (immutable) and auto-register themselves, so a queued listener can deserialize them later:

from arvel.events.event import Event


class OrderShipped(Event):
    order_id: int
    tracking_number: str

Defining Listeners

A listener subclasses Listener[E] for its event type and implements an async handle:

from arvel.events.listener import Listener
from app.events.order_shipped import OrderShipped


class SendShipmentNotification(Listener[OrderShipped]):
    async def handle(self, event: OrderShipped) -> None:
        # notify the customer
        ...

Because the dispatcher resolves listeners through the container, you can declare constructor dependencies and they'll be injected.

Registering Listeners

Map events to listeners on the dispatcher with listen. Registration is idempotent and order-preserving — listeners fire in the order they were registered. Do this in a service provider's boot phase:

from arvel.events.dispatcher import EventDispatcher


def boot(self) -> None:
    dispatcher = self.app.make(EventDispatcher)
    dispatcher.listen(OrderShipped, SendShipmentNotification)

Note

There is no convention-based listener auto-discovery — register each event-to-listener mapping explicitly.

Dispatching Events

Dispatch an event instance through the Event facade. dispatch is a coroutine:

from arvel.facades.event import Event

await Event.dispatch(OrderShipped(order_id=1, tracking_number="1Z999"))

Every registered listener runs. If one listener raises, the error is logged and the remaining listeners still run — one bad listener won't break the rest.

Queued Listeners

For slow work (sending mail, calling external APIs), push the listener onto the queue by mixing in ShouldQueue. Instead of running inline, the dispatcher enqueues it through the Bus:

from arvel.events.listener import Listener
from arvel.events.should_queue import ShouldQueue


class SendShipmentNotification(Listener[OrderShipped], ShouldQueue):
    async def handle(self, event: OrderShipped) -> None:
        ...

Note

If the queue (Bus) isn't bound, queued listeners fall back to running inline so events still fire in development.

Events & Broadcasting

An event that also implements the ShouldBroadcast contract is pushed to your broadcast driver after the synchronous listeners finish. See Broadcasting.

Testing

Event.fake() records dispatched events instead of running listeners, so you can assert they fired:

with Event.fake():
    await ship_order(order)
    Event.assert_dispatched(OrderShipped)
    Event.assert_dispatched(OrderShipped, times=1)
    Event.assert_not_dispatched(OrderCancelled)