Alpha Version: You are viewing the ALPHA documentation. This is an experimental version and may contain breaking changes.
Skip to main content

EventMapper

For a short summary of EventMapper, see Reventless Components Overview.

Framework Implementation

This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (EventMapper.res), builder logic (EventMapper_Builder.res), and runtime callbacks (EventMapper_Callback.res).

Overview

d2 diagram

The EventMapper enables event-driven command generation by mapping events from one or more source aggregates to commands for a target aggregate. This is a key component for implementing saga patterns, process managers, and reactive business logic across aggregate boundaries.

Purpose and Responsibilities

  • Responsibility: Listen to events from source aggregates; transform source events into target commands; optionally use Counter for deduplication/coordination; publish commands to target aggregate
  • In: Events from source EventTopics (via EventCollector)
  • Out: Commands to target CommandTopic

Event Mapping Spec

EventMappings live in an Aggregate sibling file <Entity>_Mappings.res annotated with @@reventless.mappings. The PPX infers the domain from the Aggregate/ folder (Reventless.EventMapping) and injects open Reventless.EventMapping, module Target = <Entity>, module M = Reventless.EventMapping.Mappings.Make(Target), module type Mapping = M.Mapping, let moduleUrl, and let counter = None. You only write the per-source mapping modules and the mappings array.

The structure the PPX constructs (you do not write this by hand):

module type Mappings = {
module Target: Reventless.Aggregate.Spec // Target aggregate (injected)
module type Mapping = Reventless.EventMapping.T with module Target := Target
let mappings: array<module(Mapping)> // Array of event mappings (you write this)
let counter: option<module(Counter.T)> // Optional counter for coordination (defaults to None)
}

EventMapping Interface

Each mapping defines how to transform events from one source aggregate:

module type EventMapping = {
module Source: Spec // Source aggregate spec
module Target: Spec // Target aggregate spec

let map: (
Source.Id.t, // Source aggregate ID
Source.event, // Source event
QueryEngine.operations, // Query operations
) => array<action<Target.Id.t, Target.command>>
}

Mapping Actions

The map function returns an array of actions that specify what to do with the event:

type action<'id, 'command> =
| Publish('id, 'command) // Publish command immediately
| PublishDelayed('id, 'command, int) // Publish command after delay (seconds)
| PublishAsync(promise<array<('id, 'command)>>) // Async command generation
| AddToCounterTarget(counterTarget) // Add counter target
| Count(counterId) // Increment counter
| CountMulti(counterId, int) // Increment counter by N

Usage Pattern

Basic Event Mapping Example

Here's a complete example of mapping Customer events to Order commands. The file is the <Entity>_Mappings.res sibling of the target Aggregate. @@reventless.mappings injects module Target, module type Mapping, let counter = None, and open Reventless.EventMapping (so action constructors like Publish are in scope — never qualify them with Reventless.EventMapping.):

Order/Aggregate/Order_Mappings.res
@@reventless.mappings

// Mapping from Customer events to Order commands.
module CustomerMapping = {
module Source = Customer

let map = (customerId, event, queryEngine) =>
switch event {
| Customer.Created({name, address}) => [
// When customer is created, create a welcome order
Publish(
Order.Id.make(),
Order.CreateWelcomeOrder({customerId, customerName: name, deliveryAddress: address}),
),
]
| Customer.AddressChanged(newAddress) =>
// When address changes, update pending orders via an async query first
let ordersPromise = queryEngine.query(
~table="PendingOrders",
~key="customerId",
~value=customerId->Customer.Id.toString,
)
[
PublishAsync(
ordersPromise->Promise.thenResolve(orders =>
orders->Array.map(order => (order.orderId, Order.UpdateDeliveryAddress(newAddress)))
),
),
]
| Customer.Deleted => [
Publish(
customerId->Customer.Id.toString->Order.Id.fromString,
Order.CancelAllForCustomer,
),
]
| _ => [] // Other events don't trigger order commands
}
}

let mappings: array<module(Mapping)> = [module(CustomerMapping)]

Event Mapping with Counter

For more complex scenarios requiring deduplication or coordination across multiple events, set let counter = Some(...) (this overrides the PPX-injected default of None):

Invoice/Aggregate/Invoice_Mappings.res
@@reventless.mappings

module OrderMapping = {
module Source = Order

let map = (orderId, event, _queryEngine) =>
switch event {
| Order.ItemAdded({itemId, quantity, price}) => [
// Add this item to the invoice counter
AddToCounterTarget({
counterId: orderId->Order.Id.toString,
target: {itemId, quantity, price},
}),
// Increment the counter
Count(orderId->Order.Id.toString),
]
| Order.Completed(_) => [
// Counter triggers invoice generation when the count matches
Count(orderId->Order.Id.toString),
]
| _ => []
}
}

module CounterMapping = {
module Source = Counter.Source

let map = (counterId, event, _queryEngine) =>
switch event {
| Counter.Source.Triggered({targets}) =>
// Counter triggered — all items collected, generate invoice
let items =
targets->Array.map(target => {itemId: target.itemId, quantity: target.quantity, price: target.price})
[Publish(counterId->Invoice.Id.fromString, Invoice.Generate({items: items}))]
| _ => []
}
}

let mappings: array<module(Mapping)> = [module(OrderMapping), module(CounterMapping)]

// Enable counter for coordination (overrides the default `let counter = None`)
let counter = Some(module(Invoice_Counter: Counter.T))

Runtime Behavior

Event Processing Sequence

d2 diagram

Integration Points

With EventCollector

The EventMapper uses an EventCollector to subscribe to source EventTopics:

d2 diagram

With Aggregate

EventMappers are defined as the <Entity>_Mappings.res sibling of an Aggregate. You never wire them by hand — the plugin generator passes the mappings module as the third argument to the Aggregate factory in the generated Plugin.res:

src/Plugin.res (generated — do not edit)
module Make = (Platform: ReventlessInfra.Platform.T) => {
// Aggregates
module OrderAggregate = Platform.Aggregate.Make(Order, Order_Behavior, Order_Mappings)

// ... other components
}

(An Aggregate with no event mappings is wired with ReventlessInfra.NoEventMappings.Make(Order) in that third slot instead.)

The framework automatically creates and wires the EventMapper as part of the Aggregate deployment.

Common Patterns

These per-source modules live inside an @@reventless.mappings file, so the action constructors (Publish, PublishAsync, PublishDelayed, …) are in scope unqualified.

Saga Pattern - Order Fulfillment

// Order aggregate triggers fulfillment saga
module InventoryMapping = {
module Source = Order

let map = (orderId, event, _queryEngine) =>
switch event {
| Order.Created({items}) =>
items->Array.map(item => Publish(item.inventoryId, Inventory.Reserve({orderId, quantity: item.quantity})))
| Order.Cancelled => [Publish(orderId->getInventoryId, Inventory.Release({orderId: orderId}))]
| _ => []
}
}

Process Manager - Multi-Step Workflow

// Coordinate payment after inventory reservation
module PaymentMapping = {
module Source = Inventory

let map = (_inventoryId, event, queryEngine) =>
switch event {
| Inventory.Reserved({orderId}) =>
// Check if all inventory is reserved
let checkPromise =
queryEngine.query(~table="OrderInventory", ~key="orderId", ~value=orderId)
->Promise.thenResolve(items =>
if items->allReserved {
[(orderId, Payment.Process({orderId: orderId}))] // All reserved, proceed with payment
} else {
[] // Still waiting for more reservations
}
)
[PublishAsync(checkPromise)]
| _ => []
}
}

Delayed Command Execution

// Send reminder email after 24 hours
module ReminderMapping = {
module Source = Order

let map = (orderId, event, _queryEngine) =>
switch event {
| Order.Created(_) => [PublishDelayed(orderId, Order.SendReminder, 24 * 60 * 60)] // 24 hours in seconds
| _ => []
}
}

Cross-Aggregate Consistency

// Keep customer aggregate in sync with orders
module CustomerSyncMapping = {
module Source = Order

let map = (_orderId, event, _queryEngine) =>
switch event {
| Order.Completed({customerId, totalAmount}) => [Publish(customerId, Customer.UpdateOrderTotal(totalAmount))]
| _ => []
}
}

Error Handling

The EventMapper includes comprehensive error handling:

Mapping Errors:

  • Invalid event JSON → logged, event skipped
  • Missing mapping for source → logged, event skipped
  • Decoding errors → logged with context, event skipped
  • Map function exceptions → caught, logged, event skipped

Publishing Errors:

  • Command publishing failures → retried by CommandTopic
  • Counter operation failures → automatic retry with exponential backoff

Recovery:

  • Failed events remain in EventCollector queue for retry
  • Poison messages can be configured to move to dead-letter queue
  • All errors logged with full context (source, event, error details)

Pulumi

The EventMapper component creates these infrastructure resources:

type outputs = {
name: string,
eventCollector: Pulumi.Output.t<EventCollector.outputs>,
counter?: Counter.outputs,
}

Resource Naming:

  • Component type: reventless:EventMapper
  • Resource name pattern: {targetAggregateName}EventMapper

Dependencies:

  • EventMapper depends on target Aggregate's CommandTopic
  • EventMapper depends on source Aggregates' EventTopics
  • EventMapper optionally depends on Counter component

Configuration:

  • memorySize - Lambda memory allocation (default: 2048 MB)
  • timeout - Lambda timeout (default: 180 seconds)

Best Practices

Keep Mappings Pure and Focused

// ❌ Bad: blocking/complex logic in a (pure) mapping
let map = (id, event, queryEngine) => {
// Don't do complex calculations or block on awaits here
let result = expensiveSyncCalc()
let data = queryEngine.query(...) // returns a promise — don't try to await it inline
[Publish(id, SomeCommand(result))]
}

// ✅ Good: Simple, focused transformation
let map = (id, event, _queryEngine) =>
switch event {
| SourceEvent(data) => [Publish(id, TargetCommand(data))]
| _ => []
}

Use QueryEngine for Read-Side Queries Only

// ✅ Good: Query read models for decision-making via PublishAsync
let map = (id, event, queryEngine) =>
switch event {
| OrderCreated({customerId}) =>
let customerPromise = queryEngine.query(~table="CustomerReadModel", ~key="id", ~value=customerId)
[
PublishAsync(
customerPromise->Promise.thenResolve(customer =>
if customer.vipStatus {
[(id, ApplyVipDiscount)]
} else {
[]
}
),
),
]
| _ => []
}

Handle Missing Mappings Gracefully

// Always include a default case
let map = (id, event, _queryEngine) =>
switch event {
| EventWeCareAbout(data) => [/* commands */]
| _ => [] // Explicitly ignore other events
}

Use Counter for Complex Coordination

When multiple events must be collected before taking action, use a Counter instead of trying to track state in the mapping function.

AWS Implementation

For detailed AWS implementation, see EventMapper AWS adapter documentation (TBD).