Alpha Version: You are viewing the ALPHA documentation. This is an experimental version and may contain breaking changes.
Skip to main content

Aggregate-Based Implementation

The aggregate-based approach uses a separate event log per aggregate instance. Each entity in the domain is modelled as an aggregate with its own stream of events. Commands are processed by loading the aggregate's event history, running business logic, and appending new events to that aggregate's log.

This is the traditional event sourcing pattern and the starting point for most Reventless applications.


Plugin 1: Catalog

Manages the product catalogue — what is available for sale and how it is organized.

Aggregate: Product

A product listing with name, description, and price. Commands and events use short names scoped to the aggregate (Product.Add, Product.Added).

CommandsEvents
AddAdded
UpdateNameNameUpdated
UpdateDescriptionDescriptionUpdated
UpdatePricePriceUpdated

Aggregate: Category

A named grouping of products (e.g. "Books", "Electronics"). Product aggregates reference a categoryId.

CommandsEvents
AddAdded
RenameRenamed
ArchiveArchived

Aggregate: ProductDemand

Tracks per-product order demand. Driven entirely by events arriving from Ordering's Extension Point — no direct commands are sent by UI clients.

CommandsEvents
RecordRecorded
RevokeRevoked

Task: Import Products from CSV

A file-triggered Task that watches an S3 bucket for CSV uploads and publishes Product.Add commands for each row.

TriggerAction
S3 ObjectCreated on product-imports bucketParse file, publish Product.Add commands

Read Models

Read ModelSource AggregatesWhat It Tracks
ProductsProductAll product listings with current name, description, and price
CategoriesCategoryAll category names
ProductDemandsProduct + ProductDemandPer-product order count, initialized from Product.Added

Extension Point: Products_ExtensionPoint

Outbound API from Catalog to Ordering. Translates internal Product events into a stable public vocabulary.

EP EventTriggered By
ProductBecameAvailableProduct.Added
ProductPriceChangedProduct.PriceUpdated

Extension: Orders_Extension

Inbound subscription to Ordering's Orders_ExtensionPoint. Routes demand events to ProductDemand aggregate commands.

EP Event ReceivedCommand Dispatched
ItemOrderedProductDemand.Record
ItemOrderCancelledProductDemand.Revoke

Plugin 2: Ordering

Handles the purchase flow — who is buying and what they ordered.

Aggregate: Customer

A registered buyer with contact details and account status.

CommandsEvents
RegisterRegistered
UpdateEmailEmailUpdated
UpdateAddressAddressUpdated
DeactivateDeactivated

Aggregate: Order

A confirmed purchase referencing product IDs and a customerId. Clear, linear lifecycle.

CommandsEvents
PlacePlaced
ShipShipped
CancelCancelled

Event Mappings: Auto-Ship Order

When an Order.Placed event is emitted, an event mapping automatically issues an Order.Ship command for the same aggregate. Stateless fire-and-forget — no TODO list, no resolution tracking. The mappings live in Order_Mappings.res alongside the aggregate.

Source EventTarget Command
Order.PlacedOrder.Ship (same aggregate ID)

Side Effect: Send Order Confirmation Email

When an Order.Placed event is emitted, a side effect calls a (stubbed) email service. Fire-and-forget — no retry tracking, no TODO list. Hosted on the OrderNotifications Task.

Source EventSide Effect
Order.PlacedEmailService.sendOrderConfirmation

Aggregate: CatalogProduct

A shadow replica of Catalog product data, kept in sync via Catalog's Extension Point. Allows Ordering to validate product references without querying Catalog at command time.

CommandsEvents
SyncSynced
UpdatePricePriceUpdated

Extension Point: Orders_ExtensionPoint

Outbound API from Ordering to Catalog. Publishes order lifecycle events that Catalog's demand tracking subscribes to. A batch Order.Placed (multiple product IDs) is decomposed into one ItemOrdered per product.

EP EventTriggered By
ItemOrderedOrder.Placed (one per product)
ItemOrderCancelledOrder.Cancelled (one per product)

Extension: Products_Extension

Inbound subscription to Catalog's Products_ExtensionPoint. Routes product availability events to CatalogProduct aggregate commands.

EP Event ReceivedCommand Dispatched
ProductBecameAvailableCatalogProduct.Sync
ProductPriceChangedCatalogProduct.UpdatePrice

Cross-Plugin Integration

The two plugins communicate exclusively through Extension Points. Neither plugin imports the other's internal modules — only the shared EP specs are referenced.

Catalog                            Ordering
───────────────────────────────────────────────────────
Products_ExtensionPoint ──────► Products_Extension
(syncs CatalogProduct)

Orders_Extension ◄───────────── Orders_ExtensionPoint
(updates ProductDemand)

Implementation

The following walkthrough uses the Catalog Plugin from examples/online-shop-aggregates/catalog/ — the Product aggregate with its read model, the ProductDemand aggregate for demand tracking, the Products_ExtensionPoint, the Orders_Extension, and the generated Plugin that wires everything together.

Each component is split into a spec file (<Name>.res, annotated @@reventless.spec) and a body file (e.g. <Name>_Behavior.res, <Name>_Projections.res). The PPX auto-injects boilerplate — let name, module Id, let moduleUrl, the open/module Spec lines, projection mapping wrappers, read-model config, default authorization, and visibility — so the source you write contains only domain logic. The folder a file lives in (Aggregate/, ReadModel/, Task/, …) tells the PPX and the plugin generator what kind of component it is.

1. Aggregate Spec

The spec module defines the vocabulary for the aggregate: its commands, the events it can emit, and the errors it can return. It lives in Product/Aggregate/Product.res. The @@reventless.spec annotation derives let name = "Product" from the filename and injects module Id and let moduleUrl — you write only the types.

// Product/Aggregate/Product.res
@@reventless.spec

@schema
type command =
| Add({name: string, description: string, price: float})
| UpdateName({name: string})
| UpdateDescription({description: string})
| UpdatePrice({price: float})

@schema
type event =
| Added({name: string, description: string, price: float})
| NameUpdated({name: string})
| DescriptionUpdated({description: string})
| PriceUpdated({price: float})

@schema
type error =
| ProductAlreadyExists
| ProductNotFound

The @schema attribute generates JSON serialization automatically via the Sury PPX — no hand-written encoders or decoders needed. The aggregate ID is supplied by the framework at command time, so it is not part of the command or event payloads.

2. Behavior

The behavior module implements the aggregate's state machine. It lives next to the spec at Product/Aggregate/Product_Behavior.res and is annotated @@reventless.behavior, which auto-injects open Spec / module Spec = Product (derived from the filename) and let moduleUrl. It defines the state type and three functions that the framework calls at runtime:

ValueRole
initialStateThe starting state before any event is replayed (a "not yet created" instance)
evolveFolds the current state and an event into the next state; called once per historic event during replay
decideTakes the current state and a command, returns result<array<event>, error>Ok([...]) accepts, Error(err) rejects
// Product/Aggregate/Product_Behavior.res
@@reventless.behavior

@schema
type state =
| NotCreated
| Created({name: string, description: string, price: float})

let initialState = NotCreated

let evolve = (state, event) =>
switch (state, event) {
| (NotCreated, Added({name, description, price})) => Created({name, description, price})
| (Created(_), Added({name, description, price})) => Created({name, description, price})
| (Created(s), NameUpdated({name})) => Created({...s, name})
| (Created(s), DescriptionUpdated({description})) => Created({...s, description})
| (Created(s), PriceUpdated({price})) => Created({...s, price})
| (NotCreated, _) => state
}

let decide = (state, command) =>
switch (state, command) {
| (NotCreated, Add({name, description, price})) =>
Ok([Added({name, description, price})])
| (NotCreated, UpdateName(_)) => Error(ProductNotFound)
| (NotCreated, UpdateDescription(_)) => Error(ProductNotFound)
| (NotCreated, UpdatePrice(_)) => Error(ProductNotFound)
| (Created(_), Add(_)) => Error(ProductAlreadyExists)
| (Created(s), UpdateName({name})) if name == s.name => Ok([])
| (Created(_), UpdateName({name})) => Ok([NameUpdated({name: name})])
| (Created(s), UpdateDescription({description})) if description == s.description => Ok([])
| (Created(_), UpdateDescription({description})) =>
Ok([DescriptionUpdated({description: description})])
| (Created(s), UpdatePrice({price})) if price == s.price => Ok([])
| (Created(_), UpdatePrice({price})) => Ok([PriceUpdated({price: price})])
}

Update commands are idempotent: if the incoming value matches the current state decide returns Ok([]), producing no write.

3. Read Model

The read model is also split: a spec file Product/ReadModel/Products.res (the shape of the stored view) and a projections file Product/ReadModel/Products_Projections.res (how events maintain it). Inside a ReadModel/ folder, @@reventless.spec auto-injects let config and let subIdConfig = None for you.

// Product/ReadModel/Products.res
@@reventless.spec

@schema
type state = {
name: string,
description: string,
price: float,
}

The projections file is annotated @@reventless.mappings, which infers the domain from the ReadModel/ folder, brings Mapping, Set, Update, etc. into scope, and emits the module type Mapping wrapper. You write the per-source Mapping.Make modules and the let mappings array. Each mapping's project function returns a Set or Update action on the read model store:

// Product/ReadModel/Products_Projections.res
@@reventless.mappings

module ProductMapping = Mapping.Make(
Product,
Products,
{
open Product
let project = ({event, id, _}) =>
switch event {
| Added({name, description, price}) =>
Set(id, {Products.name: name, description, price})
| NameUpdated({name}) => Update(id, state => {...state, name})
| DescriptionUpdated({description}) =>
Update(id, state => {...state, description})
| PriceUpdated({price}) => Update(id, state => {...state, price})
}
},
)

let mappings: array<module(Mapping)> = [module(ProductMapping)]

A read model can consume events from multiple aggregates. The ProductDemands read model (ProductDemand/ReadModel/ProductDemands.res) combines events from both Product (to initialise the entry) and ProductDemand (to track the order count). Its projections file declares one Mapping.Make per source:

// ProductDemand/ReadModel/ProductDemands_Projections.res
@@reventless.mappings

module ProductMapping = Mapping.Make(
Product,
ProductDemands,
{
open Product
let project = ({event, id, _}) =>
switch event {
| Added({name}) =>
Set(id, {ProductDemands.name: name, orderCount: 0})
| _ => Ignore
}
},
)

module ProductDemandMapping = Mapping.Make(
ProductDemand,
ProductDemands,
{
open ProductDemand
let project = ({event, id, _}) =>
switch event {
| Recorded(_) =>
Update(id, (state: ProductDemands.state) => {...state, orderCount: state.orderCount + 1})
| Revoked(_) =>
Update(id, (state: ProductDemands.state) => {...state, orderCount: max(0, state.orderCount - 1)})
}
},
)

let mappings: array<module(Mapping)> = [module(ProductMapping), module(ProductDemandMapping)]

4. Extension Point

An Extension Point is the outbound API that Catalog publishes for other Plugins to subscribe to. It has two parts: a spec (the stable public contract, living in the catalog-spec package) and a mapping (the translation from internal events to EP events, in the catalog package).

The spec defines the stable public vocabulary — the events that Ordering will depend on. This is intentionally different from the internal Product event types so that internal refactoring does not break the cross-plugin contract. It lives in catalog-spec/src/Products_ExtensionPoint.res; in a *Spec namespace @@reventless.spec derives the dotted name "Catalog.Products" automatically:

// catalog-spec/src/Products_ExtensionPoint.res
@@reventless.spec

@schema
type command = unit // read-only: no inbound commands

@schema
type event =
| ProductBecameAvailable({productId: string, name: string, price: float})
| ProductPriceChanged({productId: string, price: float})

@schema
type directive = unit

The mapping (ExtensionPoint/Products_ExtensionPointMapping.res) translates internal Product aggregate events to the stable EP vocabulary. It is annotated @@reventless.spec, which brings PublishEvent into scope. The internal aggregate is referenced as module Delegate = Product. Only events that Ordering needs to observe are mapped — everything else is ignored:

// ExtensionPoint/Products_ExtensionPointMapping.res
@@reventless.spec

module ExtensionPoint = CatalogSpec.Products_ExtensionPoint
module Delegate = Product

let mapIncomingCommand = (_id, _command, _meta) => []

let mapOutgoingEvent = Some((id, event, _meta, _queryEngine) =>
switch event {
| Product.Added({name, price}) => [
PublishEvent(id, CatalogSpec.Products_ExtensionPoint.ProductBecameAvailable({productId: id, name, price})),
]
| Product.PriceUpdated({price}) => [
PublishEvent(id, CatalogSpec.Products_ExtensionPoint.ProductPriceChanged({productId: id, price})),
]
| _ => []
}
)

PublishEvent(id, event) routes the EP event to all Extensions that have subscribed to this Extension Point.

5. Extension

An Extension is the inbound subscription that Catalog registers to receive events from Ordering's Extension Point. Catalog decodes the incoming events through Ordering's spec package (ordering-spec) — it never depends on Ordering's implementation.

The extension file (Extension/Orders_Extension.res) is annotated @@reventless.extension, which brings PublishAggregateCommand into scope. It exposes a module Mapping that names the source Extension Point and the internal Delegate aggregate, then routes each incoming EP event to a command. Here, ItemOrdered triggers a Record command on the ProductDemand aggregate for the referenced product:

// Extension/Orders_Extension.res
@@reventless.extension

module Mapping = {
module ExtensionPoint = OrderingSpec.Orders_ExtensionPoint
module Delegate = ProductDemand

open ExtensionPoint
open Delegate
let mapIncomingEvent = (_id, event, _meta, _pluginDef, _queryEngine) =>
switch event {
| ItemOrdered({productId, orderId}) => [
PublishAggregateCommand(productId, Record({orderId: orderId})),
]
| ItemOrderCancelled({productId, orderId}) => [
PublishAggregateCommand(productId, Revoke({orderId: orderId})),
]
}

let mapOutgoingEvent = None
}

PublishAggregateCommand(id, command) dispatches the command to the aggregate identified by id — in this case the ProductDemand aggregate for the given productId. When the local target is a StateChangeSlice instead of an Aggregate, use PublishStateChangeSliceCommand(command); the framework derives the FIFO grouping id from the command's @partitionTag field, so no id argument is needed.

6. Plugin

The plugin wires all aggregates, read models, the Extension Point, the Extension, and the Task together using any Platform implementation. You do not write this file — it is generated at src/Plugin.res by generate-plugin, which scans src/ by folder name and emits the composition root before each build. It carries an "AUTO-GENERATED — do not edit" banner. Each component is wired with a two-argument Make (spec + body); aggregates take a third argument for their event mappings (NoEventMappings.Make(<Spec>) when there are none):

// src/Plugin.res — AUTO-GENERATED — do not edit. Run `npm run generate` to update.
module Make = (Platform: ReventlessInfra.Platform.T) => {
// Aggregates
module CategoryAggregate = Platform.Aggregate.Make(
Category,
Category_Behavior,
ReventlessInfra.NoEventMappings.Make(Category),
)
module ProductAggregate = Platform.Aggregate.Make(
Product,
Product_Behavior,
ReventlessInfra.NoEventMappings.Make(Product),
)
module ProductDemandAggregate = Platform.Aggregate.Make(
ProductDemand,
ProductDemand_Behavior,
ReventlessInfra.NoEventMappings.Make(ProductDemand),
)

// ReadModels
module CategoriesReadModel = Platform.ReadModel.Make(Categories, Categories_Projections)
module ProductDemandsReadModel = Platform.ReadModel.Make(ProductDemands, ProductDemands_Projections)
module ProductsReadModel = Platform.ReadModel.Make(Products, Products_Projections)

// Tasks
module ImportProductsTask = Platform.Task.Make(ImportProducts)

// ExtensionPoints
module Products_ExtensionPoint = Platform.ExtensionPoint.Make(Products_ExtensionPointMapping)

// Extensions
module Orders_Extension = Platform.Extension.Make(Orders_Extension.Mapping)

let make = (~uiBundleUrl=?) =>
Platform.Plugin.make(
~name="Catalog",
~heartbeatInterval=5,
~extensionPoints=[module(Products_ExtensionPoint)],
~extensions=[module(Orders_Extension)],
~aggregates=[module(CategoryAggregate), module(ProductAggregate), module(ProductDemandAggregate)],
~readModels=[module(CategoriesReadModel), module(ProductDemandsReadModel), module(ProductsReadModel)],
~tasks=[module(ImportProductsTask)],
// ...pluginStructure and AutoUI manifest omitted for brevity
)
}

The plugin is referenced from the platform assembly as CatalogPlugin.Plugin.Make(Platform) — note the .Plugin. segment (the namespace is CatalogPlugin, the generated module is Plugin). Swapping Platform is the only change needed to move from an in-memory test environment to a full AWS deployment:

// platform-local/src/Main.res
module Platform = ReventlessLocal.Platform.Make()

module Catalog = CatalogPlugin.Plugin.Make(Platform)
module Ordering = OrderingPlugin.Plugin.Make(Platform)

@val external processEnv: dict<string> = "process.env"

// Each plugin's `make` accepts an optional UI bundle URL; wrap it in a Maker
// module so the platform can call `make()` uniformly.
module CatalogMaker = {
let make = () => Catalog.make(~uiBundleUrl=?processEnv->Dict.get("CATALOG_UI_BUNDLE_URL"))
}
module OrderingMaker = {
let make = () => Ordering.make(~uiBundleUrl=?processEnv->Dict.get("ORDERING_UI_BUNDLE_URL"))
}

Platform.makePlatform(
~version=Reventless.PackageVersion.fromCwd(),
~plugins=[module(CatalogMaker), module(OrderingMaker)],
)

Platform.startServers()

7. Event Mappings

Event mappings route events from one aggregate to commands on the same or a different aggregate. They live in a <Entity>_Mappings.res sibling of the aggregate spec and replace NoEventMappings.Make(<Spec>) as the third argument to Platform.Aggregate.Make.

The file is annotated @@reventless.mappings, which (inside an Aggregate/ folder) infers the Reventless.EventMapping domain, brings Publish and the Mapping module type into scope, and wires the source/target. You write the per-source Mapping module — with a module Source and a map function — and the let mappings array.

Here, Order.Placed triggers an automatic Order.Ship command on the same aggregate:

// Order/Aggregate/Order_Mappings.res
@@reventless.mappings

module AutoShipMapping = {
module Source = Order

let map = (orderId, event, _queryEngine) =>
switch event {
| Order.Placed(_) => [Publish(orderId, Order.Ship)]
| _ => []
}
}

let mappings: array<module(Mapping)> = [module(AutoShipMapping)]

The plugin generator detects the Order_Mappings.res sibling and wires it as the third argument automatically:

module OrderAggregate = Platform.Aggregate.Make(
Order,
Order_Behavior,
Order_Mappings, // generated; replaces NoEventMappings.Make(Order)
)

8. Side Effect

A side effect executes imperative work (e.g. sending emails, calling APIs) when aggregate events are emitted. Side effects are fire-and-forget — they do not produce new events or commands.

A side effect module lives at <Entity>/SideEffect/<Entity>_<Name>.res, is annotated @@reventless.spec, and defines the Source it subscribes to plus an execute function:

// Order/SideEffect/Order_EmailNotification.res
@@reventless.spec

module Source = {
let name = Order.name
module Id = Order.Id
@schema type event = Order.event
}

let execute = async (orderId, _meta, event, _queryEngine) =>
switch event {
| Order.Placed({customerId}) =>
await EmailService.sendOrderConfirmation(
~email=customerId,
~orderId=orderId->Order.Id.toString,
)
| _ => ()
}

Side effects are hosted on a Task. The Task file (Task/OrderNotifications.res) is annotated @@reventless.task — which injects let name from the filename — and lists its side effects in the setup function:

// Task/OrderNotifications.res
@@reventless.task

let setup = (_queryEngine, _queryBucketName, _opts) => {
Task.sideEffects: [module(Order_EmailNotification): module(SideEffect.T)],
}

The plugin generator discovers the Task and wires it into ~tasks in the generated Plugin.res:

module OrderNotificationsTask = Platform.Task.Make(OrderNotifications)
// ...
~tasks=[module(OrderNotificationsTask)],

9. Task

A Task is a serverless handler triggered by S3 events. It can publish commands, manage schedules, and execute side effects. Like the side-effect host above, a Task file lives in Task/ and is annotated @@reventless.task (which injects let name from the filename).

Here, the ImportProducts Task watches an S3 bucket for CSV uploads and publishes Product.Add commands:

// Task/ImportProducts.res
@@reventless.task

let importCallback = (~eventName, ~key) => {
if eventName->String.includes("ObjectCreated") {
Console.log("[ImportProducts] Processing file: " ++ key)

let meta: Message.meta = {
service: "ImportProducts",
time: Date.now()->Float.toString,
ip: "",
user: "system",
msgId: key,
correlationId: key,
}

[
Task.PublishCommands(
"Product",
[
{
id: key,
meta,
commandJson: Product.Add({
name: "Imported Product",
description: "Imported from " ++ key,
price: 9.99,
})->Message.encode(Product.commandSchema),
},
],
),
]->Promise.resolve
} else {
[]->Promise.resolve
}
}

let setup = (_queryEngine, _queryBucketName, _opts) => {
Task.buckets: [
{
bucketName: "product-imports",
bucketMode: Task.Read,
callback: importCallback,
},
],
}

The plugin generator discovers Tasks under Task/ and wires them into ~tasks in the generated Plugin.res automatically.