Aggregate
For a short summary of an Aggregate, see Reventless Components Overview.
This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (Aggregate.res), builder logic (Aggregate_Builder.res), and runtime callbacks (Aggregate_Callback.res).
An Aggregate's business logic is defined by it's Spec, Behavior, Config and Event Mappings.
Commands are requests for change, which may be accepted (or not). Several Components can act as a Command Source: Command Generator, Task, Event Mapper, Extension, and Extension Point. Commands will never be stored. Accepted Commands result in any number of Events. Events are factual statements of the past, which cannot change. (Only new events may be created.) Events will be persisted in the Event Log. An Event Log is an "append-only" storage. In an event-sourced system Events are the single source of truth. (note: any system based on Reventless is an event-sourced system!)
Aggregate Spec
An Aggregate Spec defines the id, name, command and event types of an Aggregate in a declarataive manner. The Spec is used at any place, where a programmatic interaction with the aggregate is desired. (Aggregate Behavior, EventMapper, ReadModel Projections, Extensionpoint Mappings, Extension Mappings)
Example
@@reventless.spec
@schema
type command =
| Register({email: string, address: string})
| UpdateEmail({email: string})
| UpdateAddress({address: string})
| Deactivate
@schema
type event =
| Registered({email: string, address: string})
| EmailUpdated({email: string})
| AddressUpdated({address: string})
| Deactivated
@schema
type error =
| CustomerAlreadyRegistered
| CustomerNotFound
| CustomerAlreadyDeactivated
The @@reventless.spec annotation auto-injects let name (derived from filename) and other boilerplate. For information about @schema see Schema annotation.
Id
See Id.
name
A name is a string which must be unique in the scope of Aggregate names in one plugin and should describe the aggregate aptly. The name will also be used "behind the scenes" to e.g. route payload to the right mappings etc.
command
The command type declares the possible inputs of the aggregate.
There are no explicit constraints for the command type (developer can choose whichever type is best suited — provided the @schema annotation is applied for serialization), but usually variants are the ideal choice.
Command Variant Constructors should be formulated as imperative.
event
The event type declares the possible results of the aggregate.
There are no explicit constraints for the event type (developer can choose whichever type is best suited — provided the @schema annotation is applied for serialization), but usually variants are the ideal choice.
Event Variant Constructors should be formulated in past tense.
error
The error type declares possible unrecoverable errors of the aggregate. The semantic may be chosen by the developer, but usually variants are the ideal choice.
Behavior
The aggregate specific business logic is implemented in a behavior module.
It defines:
- the initial state and how to evolve it from historic events.
- how to decide on commands based on the current state.
Example
@@reventless.behavior
@schema
type state =
| NotCreated
| Active({email: string, address: string})
| Deactivated
let initialState = NotCreated
// (state, event) => state
let evolve = (state, event) =>
switch (state, event) {
| (NotCreated, Registered({email, address})) => Active({email, address})
| (Active(_), Registered({email, address})) => Active({email, address})
| (Active(s), EmailUpdated({email})) => Active({...s, email})
| (Active(s), AddressUpdated({address})) => Active({...s, address})
| (Active(_), Customer.Deactivated) => Deactivated
| (Deactivated, _) => state
| (NotCreated, _) => state
}
// (state, command) => result<array<event>, error>
let decide = (state, command) =>
switch (state, command) {
| (NotCreated, Register({email, address})) => Ok([Registered({email, address})])
| (NotCreated, UpdateEmail(_)) => Error(CustomerNotFound)
| (NotCreated, UpdateAddress(_)) => Error(CustomerNotFound)
| (NotCreated, Deactivate) => Error(CustomerNotFound)
| (Active(_), Register(_)) => Error(CustomerAlreadyRegistered)
| (Active(s), UpdateEmail({email})) if email == s.email => Ok([]) // idempotent
| (Active(_), UpdateEmail({email})) => Ok([EmailUpdated({email})])
| (Active(s), UpdateAddress({address})) if address == s.address => Ok([])
| (Active(_), UpdateAddress({address})) => Ok([AddressUpdated({address})])
| (Active(_), Deactivate) => Ok([Customer.Deactivated])
| (Deactivated, Register(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, UpdateEmail(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, UpdateAddress(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, Deactivate) => Ok([]) // idempotent
}
Spec
This is a module alias to the Aggregate Spec to be used.
state
Defines the type of the state, which will be calculated by the evolve function based on historic events, starting from initialState.
You can choose whatever type suites your needs. Very often this will be a record.
initialState
The initialState value is the starting state before any events have been applied. This represents the state of a "not yet created" aggregate instance.
evolve
The evolve function calculates the next state based on the current state and the next event. It combines the former init and apply functions into a single function, with initialState providing the starting value.
decide
The decide function takes the current state and a command, and returns result<array<event>, error>. It combines the former create and execute functions into a single function. Return Ok([...events]) for accepted commands and Error(error) for rejected commands.
A single command may produce at most 100 events. This is the DynamoDB TransactWriteItems hard limit; the AWS adapter rejects oversized commands up front with a clear error. Larger fan-outs must be split across multiple commands.
Cost model (DynamoDB adapter)
A command that produces:
| Events | DynamoDB call | Write cost per event |
|---|---|---|
| 1 | PutItem | 1× WCU |
| 2 – 100 | TransactWriteItems | 2× WCU |
| > 100 | rejected up front | — |
Multi-event commands always commit atomically: either every event is durable, or none are. The 2× WCU on the 2–100 band is the price DynamoDB charges for transactional atomicity.
Rejection contract
When decide returns Error(error):
- Producers using a synchronous channel (in-memory,
CommandTopicChannel_SQS_Sync) receiveRejected({errorCode, errorDetail})from theCommandResult.errorCodeis the variant tag ofSpec.errorSchema(e.g."AlreadyExists");errorDetailcarries the JSON-stringified payload —Nonefor payload-less variants. - Async producers (
CommandTopicChannel_SQS_FIFO) see the SQS message removed from the queue: domain rejections are deterministic, so retry would not help. - Within a batch of N commands for the same aggregate, a rejected command does not cancel the surviving ones — they still run, produce events, and report
Accepted. - A successful
decidethat returnsOk([])(an idempotent no-op) reportsAccepted({eventCount: 0})— distinct fromRejected.
Call Sequence
EventMappings
Each Aggregate can specify EventMappings from one or more source Aggregates.
EventMappings live in a sibling file <Entity>_Mappings.res annotated with
@@reventless.mappings. The PPX injects module Target = <Entity>,
module type Mapping, let counter = None, let moduleUrl, and
open Reventless.EventMapping (so action constructors like Publish are in scope
unqualified). You write only the per-source mapping modules and the mappings
array:
@@reventless.mappings
module CustomerMapping = {
module Source = Customer
let map = (customerId, event, _queryEngine) =>
switch event {
| Customer.Created(customer) => [
Publish(customerId, Customer.ChangeAddress(customer.address ++ " Suffix")),
]
| _ => []
}
}
let mappings: array<module(Mapping)> = [module(CustomerMapping)]
Mapping module
For each source Aggregate a separate Mapping module is defined.
Source
Module alias to the source Aggregate Spec.
map
Mapping function from source Event to target Command. Action constructors
(Publish, PublishDelayed, PublishAsync, Count, …) are in scope because
@@reventless.mappings injects open Reventless.EventMapping.
mappings
This array has to include all mappings for this Aggregate.
counter
@@reventless.mappings injects let counter = None. Override it with
let counter = Some(module(MyCounter: Counter.T)) to enable a counter for
multi-event coordination.
See Counter component for further details.
Wiring (generated)
You never wire the Aggregate by hand. The plugin generator scans the
Aggregate/ folder and emits the wiring into the generated src/Plugin.res
using the three-arg factory Platform.Aggregate.Make(Spec, Behavior, Mappings):
// With event mappings:
module CustomerAggregate = Platform.Aggregate.Make(Customer, Customer_Behavior, Customer_Mappings)
// Without event mappings, the third slot is NoEventMappings:
module CategoryAggregate = Platform.Aggregate.Make(
Category,
Category_Behavior,
ReventlessInfra.NoEventMappings.Make(Category),
)
Sync vs async command dispatch
By default, aggregate mutations are dispatched synchronously: the AppSync resolver invokes the aggregate's Lambda, the command runs inline, and the mutation resolves to CommandAccepted or CommandRejected. The generated src/Plugin.res emits Platform.Aggregate.Make(...).
For aggregates that should publish-and-forget (high contention, long-running handlers, callers polling Subscription.onX for the outcome), add the @@reventless.async attribute to the spec file:
@@reventless.spec
@@reventless.async
@schema
type command = ...
The plugin generator then emits Platform.Aggregate.MakeAsync(...) instead, routing the aggregate to a FIFO-backed Lambda that returns CommandPending immediately. The async path uses an AllAggregatesAsyncCmdHandler Lambda, separate from the default AllAggregatesCmdHandler Lambda — but both are only provisioned when at least one aggregate of that flavor exists. Sync-only setups (the default) pay no extra Lambda cost.
See CommandTopic for the channel-level details of how each flavor is wired.
Pulumi
The aggregate's Pulumi root component is named in this pattern: Spec.name and has a type of reventless:Aggregate.
Identity and RequestContext
The authenticated user's Identity is available via RequestContext during command processing. The identity.userId is persisted as meta.user on every event produced by the aggregate. The full identity (groups, claims, provider) is transient and available only for the duration of the request.
Application code can use RequestContext.identity for authorization decisions in the decide function by accessing it through the Effect service:
Effect.serviceWith(RequestContext.tag, ctx => {
let identity = ctx.identity
if identity->Identity.hasGroup("admin") {
// authorize admin action
}
})
Related Components
- EventLog - Stores events generated by the Aggregate
- CommandTopic - Delivers commands to the Aggregate
- EventTopic - Distributes events from the Aggregate's EventLog
- CommandGenerator - Generates commands for the Aggregate from API mutations
- EventMapper - Maps events from other Aggregates to commands for this Aggregate
- ReadModel - Consumes events from the Aggregate to build read models
- Extension - Can send commands to the Aggregate via ExtensionPoints
- ExtensionPoint - Can receive commands from Extensions for the Aggregate
- Counter - Used by EventMappers for deduplication when mapping to this Aggregate