Alpha Version: You are viewing the ALPHA documentation. This is an experimental version and may contain breaking changes.
Skip to main content

Aggregate

For a short summary of an Aggregate, see Reventless Components Overview.

Framework Implementation

This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (Aggregate.res), builder logic (Aggregate_Builder.res), and runtime callbacks (Aggregate_Callback.res).

d2 diagram

An Aggregate's business logic is defined by it's Spec, Behavior, Config and Event Mappings.

Commands are requests for change, which may be accepted (or not). Several Components can act as a Command Source: Command Generator, Task, Event Mapper, Extension, and Extension Point. Commands will never be stored. Accepted Commands result in any number of Events. Events are factual statements of the past, which cannot change. (Only new events may be created.) Events will be persisted in the Event Log. An Event Log is an "append-only" storage. In an event-sourced system Events are the single source of truth. (note: any system based on Reventless is an event-sourced system!)

Aggregate Spec

An Aggregate Spec defines the id, name, command and event types of an Aggregate in a declarataive manner. The Spec is used at any place, where a programmatic interaction with the aggregate is desired. (Aggregate Behavior, EventMapper, ReadModel Projections, Extensionpoint Mappings, Extension Mappings)

Example

Customer.res
@@reventless.spec

@schema
type command =
| Register({email: string, address: string})
| UpdateEmail({email: string})
| UpdateAddress({address: string})
| Deactivate

@schema
type event =
| Registered({email: string, address: string})
| EmailUpdated({email: string})
| AddressUpdated({address: string})
| Deactivated

@schema
type error =
| CustomerAlreadyRegistered
| CustomerNotFound
| CustomerAlreadyDeactivated

The @@reventless.spec annotation auto-injects let name (derived from filename) and other boilerplate. For information about @schema see Schema annotation.

Id

See Id.

name

A name is a string which must be unique in the scope of Aggregate names in one plugin and should describe the aggregate aptly. The name will also be used "behind the scenes" to e.g. route payload to the right mappings etc.

command

The command type declares the possible inputs of the aggregate.
There are no explicit constraints for the command type (developer can choose whichever type is best suited — provided the @schema annotation is applied for serialization), but usually variants are the ideal choice.

tip

Command Variant Constructors should be formulated as imperative.

event

The event type declares the possible results of the aggregate.
There are no explicit constraints for the event type (developer can choose whichever type is best suited — provided the @schema annotation is applied for serialization), but usually variants are the ideal choice.

tip

Event Variant Constructors should be formulated in past tense.

error

The error type declares possible unrecoverable errors of the aggregate. The semantic may be chosen by the developer, but usually variants are the ideal choice.

Behavior

The aggregate specific business logic is implemented in a behavior module.

It defines:

  • the initial state and how to evolve it from historic events.
  • how to decide on commands based on the current state.

Example

Customer_Behavior.res
@@reventless.behavior

@schema
type state =
| NotCreated
| Active({email: string, address: string})
| Deactivated

let initialState = NotCreated

// (state, event) => state
let evolve = (state, event) =>
switch (state, event) {
| (NotCreated, Registered({email, address})) => Active({email, address})
| (Active(_), Registered({email, address})) => Active({email, address})
| (Active(s), EmailUpdated({email})) => Active({...s, email})
| (Active(s), AddressUpdated({address})) => Active({...s, address})
| (Active(_), Customer.Deactivated) => Deactivated
| (Deactivated, _) => state
| (NotCreated, _) => state
}

// (state, command) => result<array<event>, error>
let decide = (state, command) =>
switch (state, command) {
| (NotCreated, Register({email, address})) => Ok([Registered({email, address})])
| (NotCreated, UpdateEmail(_)) => Error(CustomerNotFound)
| (NotCreated, UpdateAddress(_)) => Error(CustomerNotFound)
| (NotCreated, Deactivate) => Error(CustomerNotFound)
| (Active(_), Register(_)) => Error(CustomerAlreadyRegistered)
| (Active(s), UpdateEmail({email})) if email == s.email => Ok([]) // idempotent
| (Active(_), UpdateEmail({email})) => Ok([EmailUpdated({email})])
| (Active(s), UpdateAddress({address})) if address == s.address => Ok([])
| (Active(_), UpdateAddress({address})) => Ok([AddressUpdated({address})])
| (Active(_), Deactivate) => Ok([Customer.Deactivated])
| (Deactivated, Register(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, UpdateEmail(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, UpdateAddress(_)) => Error(CustomerAlreadyDeactivated)
| (Deactivated, Deactivate) => Ok([]) // idempotent
}

Spec

This is a module alias to the Aggregate Spec to be used.

state

Defines the type of the state, which will be calculated by the evolve function based on historic events, starting from initialState. You can choose whatever type suites your needs. Very often this will be a record.

initialState

The initialState value is the starting state before any events have been applied. This represents the state of a "not yet created" aggregate instance.

evolve

The evolve function calculates the next state based on the current state and the next event. It combines the former init and apply functions into a single function, with initialState providing the starting value.

decide

The decide function takes the current state and a command, and returns result<array<event>, error>. It combines the former create and execute functions into a single function. Return Ok([...events]) for accepted commands and Error(error) for rejected commands.

Event-count cap

A single command may produce at most 100 events. This is the DynamoDB TransactWriteItems hard limit; the AWS adapter rejects oversized commands up front with a clear error. Larger fan-outs must be split across multiple commands.

Cost model (DynamoDB adapter)

A command that produces:

EventsDynamoDB callWrite cost per event
1PutItem1× WCU
2 – 100TransactWriteItems2× WCU
> 100rejected up front

Multi-event commands always commit atomically: either every event is durable, or none are. The 2× WCU on the 2–100 band is the price DynamoDB charges for transactional atomicity.

Rejection contract

When decide returns Error(error):

  • Producers using a synchronous channel (in-memory, CommandTopicChannel_SQS_Sync) receive Rejected({errorCode, errorDetail}) from the CommandResult. errorCode is the variant tag of Spec.errorSchema (e.g. "AlreadyExists"); errorDetail carries the JSON-stringified payload — None for payload-less variants.
  • Async producers (CommandTopicChannel_SQS_FIFO) see the SQS message removed from the queue: domain rejections are deterministic, so retry would not help.
  • Within a batch of N commands for the same aggregate, a rejected command does not cancel the surviving ones — they still run, produce events, and report Accepted.
  • A successful decide that returns Ok([]) (an idempotent no-op) reports Accepted({eventCount: 0}) — distinct from Rejected.

Call Sequence

d2 diagram

EventMappings

Each Aggregate can specify EventMappings from one or more source Aggregates. EventMappings live in a sibling file <Entity>_Mappings.res annotated with @@reventless.mappings. The PPX injects module Target = <Entity>, module type Mapping, let counter = None, let moduleUrl, and open Reventless.EventMapping (so action constructors like Publish are in scope unqualified). You write only the per-source mapping modules and the mappings array:

Customer_Mappings.res
@@reventless.mappings

module CustomerMapping = {
module Source = Customer

let map = (customerId, event, _queryEngine) =>
switch event {
| Customer.Created(customer) => [
Publish(customerId, Customer.ChangeAddress(customer.address ++ " Suffix")),
]
| _ => []
}
}

let mappings: array<module(Mapping)> = [module(CustomerMapping)]

Mapping module

For each source Aggregate a separate Mapping module is defined.

Source

Module alias to the source Aggregate Spec.

map

Mapping function from source Event to target Command. Action constructors (Publish, PublishDelayed, PublishAsync, Count, …) are in scope because @@reventless.mappings injects open Reventless.EventMapping.

mappings

This array has to include all mappings for this Aggregate.

counter

@@reventless.mappings injects let counter = None. Override it with let counter = Some(module(MyCounter: Counter.T)) to enable a counter for multi-event coordination.

See Counter component for further details.

Wiring (generated)

You never wire the Aggregate by hand. The plugin generator scans the Aggregate/ folder and emits the wiring into the generated src/Plugin.res using the three-arg factory Platform.Aggregate.Make(Spec, Behavior, Mappings):

src/Plugin.res (generated — do not edit)
// With event mappings:
module CustomerAggregate = Platform.Aggregate.Make(Customer, Customer_Behavior, Customer_Mappings)

// Without event mappings, the third slot is NoEventMappings:
module CategoryAggregate = Platform.Aggregate.Make(
Category,
Category_Behavior,
ReventlessInfra.NoEventMappings.Make(Category),
)

Sync vs async command dispatch

By default, aggregate mutations are dispatched synchronously: the AppSync resolver invokes the aggregate's Lambda, the command runs inline, and the mutation resolves to CommandAccepted or CommandRejected. The generated src/Plugin.res emits Platform.Aggregate.Make(...).

For aggregates that should publish-and-forget (high contention, long-running handlers, callers polling Subscription.onX for the outcome), add the @@reventless.async attribute to the spec file:

HighContentionAggregate.res
@@reventless.spec
@@reventless.async

@schema
type command = ...

The plugin generator then emits Platform.Aggregate.MakeAsync(...) instead, routing the aggregate to a FIFO-backed Lambda that returns CommandPending immediately. The async path uses an AllAggregatesAsyncCmdHandler Lambda, separate from the default AllAggregatesCmdHandler Lambda — but both are only provisioned when at least one aggregate of that flavor exists. Sync-only setups (the default) pay no extra Lambda cost.

See CommandTopic for the channel-level details of how each flavor is wired.

Pulumi

The aggregate's Pulumi root component is named in this pattern: Spec.name and has a type of reventless:Aggregate.

Identity and RequestContext

The authenticated user's Identity is available via RequestContext during command processing. The identity.userId is persisted as meta.user on every event produced by the aggregate. The full identity (groups, claims, provider) is transient and available only for the duration of the request.

Application code can use RequestContext.identity for authorization decisions in the decide function by accessing it through the Effect service:

Effect.serviceWith(RequestContext.tag, ctx => {
let identity = ctx.identity
if identity->Identity.hasGroup("admin") {
// authorize admin action
}
})
  • EventLog - Stores events generated by the Aggregate
  • CommandTopic - Delivers commands to the Aggregate
  • EventTopic - Distributes events from the Aggregate's EventLog
  • CommandGenerator - Generates commands for the Aggregate from API mutations
  • EventMapper - Maps events from other Aggregates to commands for this Aggregate
  • ReadModel - Consumes events from the Aggregate to build read models
  • Extension - Can send commands to the Aggregate via ExtensionPoints
  • ExtensionPoint - Can receive commands from Extensions for the Aggregate
  • Counter - Used by EventMappers for deduplication when mapping to this Aggregate