Aggregate-Extension Connection — Framework Implementation
This page covers how the routing between aggregates and extensions is implemented inside the framework. For the application developer's view of how to use this mechanism, see the App Guide.
Overview
Extension points and extensions allow one aggregate's events to trigger commands on another aggregate. The framework implements this as a two-layer routing mechanism:
- Deploy time:
ExtensionPoint.makeandExtension.makewire up the SQS/SNS subscriptions and Lambda functions - Runtime:
EventMapper_Callbackdecodes the incoming event and produces outgoing commands
Deploy-Time Wiring
ExtensionPoint_Builder.Make constructs:
- An EventTopic subscription — SNS/SQS connection from the source aggregate's event topic
- A Lambda function — runs the
EventMapper_Callbackat runtime
Extension_Builder.Make constructs:
- A CommandTopic connection — wires the output of the extension Lambda to the target aggregate's command topic
The connection is established via the connect functor parameter, which calls CommandTopic.connect with the Lambda's output ARN.
Runtime Dispatch
EventMapper_Callback.Make(Spec) produces a handler that:
- Receives a batch of events from the source aggregate's event topic
- Decodes each event using
Spec.eventSchema - Calls
Spec.map(event)to produce zero or more commands for the target aggregate - Encodes commands using
Spec.commandSchemaand publishes them to the target command topic
Module Types
ExtensionPoint.T
module type T = {
module Spec: ExtensionPoint.Spec
let make: (
~eventTopic: EventTopic.component<EventTopic.operations>,
~opts: Pulumi.ComponentResource.options=?,
) => component
}
Extension.T
module type T = {
module Spec: Extension.Spec
let make: (
~extensionPoint: ExtensionPoint.component<ExtensionPoint.operations>,
~commandTopic: CommandTopic.component<CommandTopic.operations>,
~opts: Pulumi.ComponentResource.options=?,
) => component
}
Adapter Interfaces
Extension points use EventCollector_Adapter.Channel for the SNS-to-SQS subscription and Runtime.Environment for the Lambda runtime. Extensions use CommandTopic_Adapter.Channel to publish commands.
These are the same adapter interfaces used by read models and aggregates respectively, keeping the cloud provider adapter surface minimal.