Extension
For a short summary of an Extension, see Reventless Components Overview.
This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (Extension.res), builder logic (Extension_Builder.res), and runtime operations (Extension_Operations.res).
Overview
An Extension enables a Plugin to consume events from and send commands to another Plugin's ExtensionPoint. It acts as the consumer side of cross-Plugin communication, translating external events into internal commands and optionally forwarding internal events back to the ExtensionPoint.
Purpose and Responsibilities
- Event Consumption: Receives events from remote ExtensionPoints
- Command Generation: Transforms incoming events into commands for local Aggregates
- Event Forwarding: Optionally forwards local Aggregate events back to ExtensionPoints
- ReadModel Updates: Can forward events to local ReadModels for projection
- Cross-Plugin Integration: Enables loose coupling between Plugins
Extension vs ExtensionPoint
| Aspect | ExtensionPoint | Extension |
|---|---|---|
| Role | Provider (publishes events, receives commands) | Consumer (receives events, sends commands) |
| Location | In the Plugin that owns the data | In the Plugin that needs the data |
| Events | Publishes events to Extensions | Receives events from ExtensionPoints |
| Commands | Receives commands from Extensions | Sends commands to ExtensionPoints |
| Infrastructure | Has CommandTopic and EventTopic | No dedicated infrastructure |
Extension Mappings
ExtensionMappings define how external ExtensionPoint events are translated to internal commands and how internal events are forwarded back.
Mapping Spec
An Extension uses the same Spec as the ExtensionPoint it connects to:
module type Spec = {
let name: string // ExtensionPoint name: "PluginName.ExtensionPointName"
@schema
type command // Commands that can be sent to the ExtensionPoint
@schema
type event // Events received from the ExtensionPoint
@schema
type directive // Directives (side effects) — mirrored from the ExtensionPoint
}
Mapping Implementation
module type Impl = {
module ExtensionPoint: Spec // The ExtensionPoint spec being consumed
module Aggregate: Aggregate.Spec // The local Aggregate spec
// Map incoming ExtensionPoint events to local Aggregate commands
let mapIncomingEvent: mapIncomingEvent<
ExtensionPoint.event,
Aggregate.command,
ExtensionPoint.command,
ExtensionPoint.directive,
>
// Map outgoing Aggregate events to ExtensionPoint commands (optional)
let mapOutgoingEvent: option<
mapOutgoingEvent<Aggregate.event, ExtensionPoint.command, ExtensionPoint.directive>,
>
}
Mapping Functions
mapIncomingEvent
Maps events received from the ExtensionPoint to commands for local Aggregates:
type mapIncomingEvent<'epEvent, 'aggCommand, 'epCommand, 'directive> = (
string, // Event ID
'epEvent, // Incoming ExtensionPoint event
Message.meta, // Message metadata
PluginExtensionPointSpec.pluginDefinition, // Plugin definition for routing
QueryEngine.operations, // Query engine for lookups
) => array<incomingCommandAction<'aggCommand, 'epCommand, 'directive>>
mapOutgoingEvent
Maps local Aggregate events to commands for the ExtensionPoint:
type mapOutgoingEvent<'aggEvent, 'epCommand, 'directive> = (
string, // Event ID
'aggEvent, // Local Aggregate event
Message.meta, // Message metadata
PluginExtensionPointSpec.pluginDefinition, // Plugin definition for routing
) => array<outgoingCommandAction<'epCommand, 'directive>>
Incoming Command Actions
When mapping incoming events, you can return these actions:
type incomingCommandAction<'aggregateCommand, 'extensionPointCommand, 'directive> =
| PublishAggregateCommand(id, 'aggregateCommand) // Send command to local Aggregate
| PublishAggregateCommandAsync(promise<(id, 'aggregateCommand)>)
| PublishAggregateCommandsAsync(promise<array<(id, 'aggregateCommand)>>)
| PublishStateChangeSliceCommand('aggregateCommand) // Send command to local StateChangeSlice
| PublishStateChangeSliceCommandAsync(promise<'aggregateCommand>)
| PublishStateChangeSliceCommandsAsync(promise<array<'aggregateCommand>>)
| PublishExtensionPointCommand(id, 'extensionPointCommand) // Send command back to ExtensionPoint
| ForwardCommand(forwardCommand) // Forward to another ExtensionPoint
| HandleDirective(Reventless.Handler.handler<'directive>, 'directive) // Run a directive side-effect
Use PublishAggregateCommand(id, …) when the Delegate is an Aggregate — the
id is the aggregate's identity. Use PublishStateChangeSliceCommand(…) when
the Delegate is a StateChangeSlice — no id is needed because the framework
derives the FIFO grouping id from the command's @partitionTag (or
@compositePartitionTag) field.
Outgoing Command Actions
When mapping outgoing events, you can return these actions:
type outgoingCommandAction<'extensionPointCommand, 'directive> =
| PublishExtensionPointCommand(id, 'extensionPointCommand) // Send command to ExtensionPoint
| ForwardCommand(forwardCommand) // Forward to another ExtensionPoint
| HandleDirective(Reventless.Handler.handler<'directive>, 'directive) // Run a directive side-effect
Forward Command
For routing commands to specific ExtensionPoints:
type forwardCommand = {
extensionPointName: string, // Target ExtensionPoint name
id: string, // Entity ID
commandJson: Js.Json.t, // Serialized command
}
Directives (side effects)
Both mapIncomingEvent and mapOutgoingEvent can return a
HandleDirective(handler, directive) action to perform a side effect outside
the event-sourced flow (call an external API, send a notification). The directive
type is mirrored from the ExtensionPoint. Unlike an ExtensionPoint directive
handler, an Extension's is a bare function — no scheduler, no query-engine
argument. That is a wiring difference (the extension runs on the consumer's
EventCollector path, which the framework doesn't wire a scheduler into), not a
statement about who owns the code — extension mappings are app code just like
ExtensionPoint mappings:
// Reventless.Handler.handler
type handler<'directive> = 'directive => promise<unit>
let handleDirective: Reventless.Handler.handler<ExtensionPoint.directive> = directive =>
switch directive {
| NotifyDownstream({url}) => ExternalAPI.notify(url)
}
let mapIncomingEvent = (_id, event, _meta, _pluginDef, _queryEngine) =>
switch event {
| CustomerCreated({customerId, email}) => [
PublishStateChangeSliceCommand(SyncCustomer({customerId, email})),
HandleDirective(handleDirective, NotifyDownstream({url: "https://…"})),
]
}
If you need a schedule you cannot create it here (extensions are never handed a scheduler) — route to a local command or declare the directive on the ExtensionPoint side instead. See the Directives concept guide for the full comparison.
Example Extension
Scenario
Plugin B (Order Plugin) needs to react to customer events from Plugin A (Customer Plugin):
ExtensionPoint Spec (in the provider plugin's spec package)
The Extension references the provider's *_ExtensionPoint spec. It lives in the
provider plugin's spec package and is annotated with @@reventless.spec:
// In CustomerSpec — the stable public API from Customer to Order.
@@reventless.spec
@schema
type command = unit // read-only: no inbound commands
@schema
type event =
| CustomerCreated({customerId: string, name: string, email: string})
| CustomerUpdated({customerId: string, name: string, email: string})
| CustomerDeleted({customerId: string})
// Side effects an Extension subscribing to this EP can fire from
// `mapIncomingEvent`. Defaults to `unit`. For a typed example with two
// constructors (`EmitOrderRecordedTelemetry` / `EmitOrderCancelledTelemetry`)
// fired from a subscriber alongside its state-change commands, see
// `examples/online-shop-hybrid/ordering-spec/src/Orders_ExtensionPoint.res`
// and `examples/online-shop-hybrid/catalog/src/Extension/Orders_Extension.res`.
@schema
type directive = unit
Extension Mapping
An Extension is a single file <Name>_Extension.res annotated with
@@reventless.extension. It exposes a module Mapping that references the
remote ExtensionPoint spec and the local Delegate (an Aggregate or
StateChangeSlice). The PPX injects the action constructors (PublishAggregateCommand,
PublishStateChangeSliceCommand, etc.) — never open them manually.
// Order's extension subscribing to Customer's CustomerExtensionPoint.
@@reventless.extension
module Mapping = {
module ExtensionPoint = CustomerSpec.CustomerExtensionPoint
module Delegate = Order
open ExtensionPoint
open Order
// Map incoming events from the Customer ExtensionPoint to Order commands
let mapIncomingEvent = (_id, event, _meta, _pluginDef, _queryEngine) =>
switch event {
| CustomerCreated({customerId, name, email}) => [
PublishAggregateCommand(customerId, Order.CreateCustomerProfile({customerId, name, email})),
]
| CustomerUpdated({customerId, name, email}) => [
PublishAggregateCommand(customerId, Order.UpdateCustomerProfile({name, email})),
]
| CustomerDeleted({customerId}) => [
PublishAggregateCommand(customerId, Order.DeactivateCustomer),
]
}
// Optionally map outgoing Order events back to the Customer ExtensionPoint
let mapOutgoingEvent = None
}
Targeting a StateChangeSlice instead of an Aggregate
When the local Delegate is a StateChangeSlice (the DCB write side) rather than
an Aggregate, use PublishStateChangeSliceCommand(command) — no id is needed
because the framework derives the FIFO grouping id from the command's
@partitionTag (or @compositePartitionTag) field:
@@reventless.extension
module Mapping = {
module ExtensionPoint = OrderingSpec.Orders_ExtensionPoint
module Delegate = RecordProductDemand // a StateChangeSlice
open ExtensionPoint
open RecordProductDemand
let mapIncomingEvent = (_id, event, _meta, _pluginDef, _queryEngine) =>
switch event {
| ItemOrdered({productId, orderId}) => [
PublishStateChangeSliceCommand(RecordDemand({productId, orderId})),
]
| ItemOrderCancelled({productId, orderId}) => [
PublishStateChangeSliceCommand(RevokeDemand({productId, orderId})),
]
}
let mapOutgoingEvent = None
}
Runtime Behavior
Incoming Event Flow
When an event arrives from an ExtensionPoint:
Outgoing Event Flow
When a local Aggregate emits an event that should be forwarded:
Component Outputs
An Extension produces the following outputs:
type outputs = {
name: string, // Full extension name: "ExtensionPointName.MappingName"
extensionPointName: string, // Name of the consumed ExtensionPoint
aggregateNames: array<string>, // Names of Aggregates with outgoing mappings
}
Integration with Plugin
Extensions are wired automatically by the generated Plugin.res. You never
hand-write the plugin composition root — the plugin generator scans the
Extension/ folder and emits the wiring. For reference, the generated form is:
module Make = (Platform: ReventlessInfra.Platform.T) => {
// Extensions
module Customer_Extension = Platform.Extension.Make(Customer_Extension.Mapping)
// ... aggregates, read models, slices, etc.
let make = (~uiBundleUrl=?) =>
Platform.Plugin.make(
~name="Ordering",
~extensions=[module(Customer_Extension)],
~aggregates=[module(OrderAggregate)],
// ... other components
)
}
Platform.Extension.Make takes a single argument: the Mapping module from your
<Name>_Extension.res file.
Async Command Generation
For complex scenarios requiring async operations, return an async action
constructor from mapIncomingEvent:
let mapIncomingEvent = (_id, event, _meta, _pluginDef, queryEngine) =>
switch event {
| CustomerCreated({customerId, name, email}) =>
// Async command generation with query
[
PublishAggregateCommandAsync(
queryEngine.query("CustomerDefaults", customerId)->Promise.thenResolve(defaults => (
customerId,
Order.CreateCustomerProfile({customerId, name, email, defaultShipping: defaults.shippingMethod}),
)),
),
]
| _ => []
}
Best Practices
Mapping Design
- Single Responsibility: Each mapping should handle one Aggregate's concerns
- Idempotency: Design mappings to handle duplicate events gracefully
- Error Handling: Handle missing or invalid data in events
- Selective Mapping: Only map events that are relevant to your Plugin
Cross-Plugin Communication
- Loose Coupling: Don't assume internal details of the source Plugin
- Event Filtering: Filter events early to reduce processing
- Async Operations: Use async actions for complex transformations
- Query Sparingly: Minimize QueryEngine usage in mappings
Testing
- Unit Test Mappings: Test mapping functions in isolation
- Integration Tests: Test full event flow between Plugins
- Edge Cases: Test handling of missing or malformed events
Pulumi
Extensions don't deploy dedicated infrastructure. They are registered with the Plugin and their handlers are connected to the Plugin's EventCollector.
The Extension's name follows the pattern: {ExtensionPointName}.{MappingName} and is registered in the Plugin's extension list.
Related Components
- Plugin - Container for Extensions
- ExtensionPoint - The interface being consumed
- Aggregate - Target of generated commands
- ReadModel - Can receive forwarded events
- EventCollector - Delivers events to Extension handlers