CommandTopic
For a short summary of CommandTopic, see Reventless Components Overview.
This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (CommandTopic.res), builder logic (CommandTopic_Builder.res), adapter interface (CommandTopic_Adapter.res), runtime operations (CommandTopic_Operations.res), and callback handlers (CommandTopic_Callback.res).
Overview
The CommandTopic is the message queue component that delivers commands to Aggregates with strict ordering guarantees and reliable delivery. It ensures commands are processed exactly once per aggregate instance, in the order they were sent.
Purpose and Responsibilities
- Responsibility: Deliver commands to Aggregates with typed results; support synchronous execution (with immediate
CommandResultfeedback) and async fire-and-forget (with FIFO ordering guarantees) - In: Commands from API (via CommandGenerator), EventMapper, Extensions, or ExtensionPoints
- Out: Commands to Aggregate command handlers;
CommandResultoutcomes back to callers
Component Spec
The CommandTopic requires a spec defining the Aggregate's id type and command type:
module type Spec = {
@schema
type command
}
Take the following spec for a Customer aggregate as an example:
@@reventless.spec
@schema
type command =
| Register({email: string, address: string})
| UpdateEmail({email: string})
| UpdateAddress({address: string})
| Deactivate
This spec is used to create a type-safe CommandTopic for the Customer aggregate.
Usage Pattern
CommandTopics are typically created as part of an Aggregate component and used internally by the framework for command delivery.
Creating a CommandTopic
To create a CommandTopic module you have to provide the spec and a channel adapter:
module CustomerCommandTopic = Reventless.CommandTopic_Builder.Make(
Customer,
CommandTopicChannel.SQS_Sync, // default — synchronous result
)
let commandTopic = CustomerCommandTopic.make(
~name="Customer",
~opts=pulumiOptions,
)
The channel adapter controls whether the mutation returns an immediate CommandResult or a CommandPending response:
| Channel | Queue type | Mutation return | Use when |
|---|---|---|---|
CommandTopicChannel.SQS_Sync | Standard SQS | CommandAccepted or CommandRejected | Default — user-facing CRUD |
CommandTopicChannel.SQS_Async | FIFO SQS | CommandPending | High-contention or internal automation |
Sync vs async
App developers don't pick the channel by hand. The plugin generator selects sync (Make) or async (MakeAsync) per component based on the spec file:
- Default — sync. Aggregate and StateChangeSlice spec files without any flag get
Platform.Aggregate.Make/Platform.StateChangeSlice.Make, wired toSQS_Sync. The AppSync Lambda dispatches the command inline (viapublishJsonsAndWait→runInlineAndCollect) and the mutation resolves toCommandAccepted/CommandRejected. - Opt-in — async. Add
@@reventless.asyncat the top of the spec file. The generator emitsMakeAsync, wires toSQS_Async(FIFO), and the AppSync Lambda fire-and-forgets to the queue and returnsCommandPending. The actual command handler runs later via the SQS event source.
The Lambda layout follows: async aggregates land in AllAggregatesAsyncCmdHandler (separate from the default AllAggregatesCmdHandler); async StateChangeSlices share a per-plugin <Plugin>DcbAsyncCmdHandler Lambda (separate from the default <Plugin>DcbCmdHandler). Async Lambdas are only provisioned when at least one component opts in — sync-only setups pay no extra Lambda cost.
Each of the four command-handler Lambdas (sync / async × aggregates / state-changes) can be tuned independently via Platform.MakeWithConfig's commandHandlerConfig record — memory, timeout, reserved concurrency, SQS batch size, ephemeral /tmp, log retention, and extra env vars. See the Tuning the command-handler Lambdas section in the AWS Lambda deployment guide.
CommandTopic Operations
The CommandTopic provides operations for publishing and handling commands:
Publish a Single Command
The publish operation sends a single typed command to the topic:
type publish<'id, 'command> = (
Message.command'<'id, 'command>
) => promise<unit>
Usage:
await commandTopic.publish({id, meta, command})
Publish Multiple Commands
The publishJsons operation sends multiple JSON commands efficiently:
type publishJsons = (
array<Reventless.Message.commandJson>
) => promise<unit>
This is used internally for batch command publishing (e.g. from EventMappers). Each command is a record with the following structure (as defined in the reventless-spec package):
type commandJson = {
id: string,
meta: meta,
commandJson: Js.Json.t,
delay?: int,
}
Runtime Behavior
Command Publishing Flow
Command Handling Flow
Integration with Aggregate
The CommandTopic is the delivery mechanism between command sources and Aggregates:
Flow:
- Command sources publish commands to the CommandTopic queue
- Lambda is triggered when messages arrive
- Handler decodes commands and calls the Aggregate's command handler
- Aggregate processes commands and appends events to EventLog
CommandResult
Every GraphQL mutation returns a CommandResult union. The variant depends on which channel the CommandTopic is configured with:
union CommandResult = CommandAccepted | CommandRejected | CommandPending
type CommandAccepted {
msgId: ID!
entityId: ID # id of the created/modified entity (absent for extension point commands)
eventCount: Int! # number of events appended; 0 for idempotent no-ops
}
type CommandRejected {
msgId: ID!
errorCode: String! # Spec.error variant name, e.g. "AlreadyExists"
errorDetail: String # full serialized Spec.error JSON (for debugging)
}
type CommandPending {
msgId: ID! # use msgId to subscribe for the eventual result
}
CommandAccepted— command was valid, business rules passed, events committed (SQS_Sync).entityIdlets the client navigate directly to the affected entity;eventCountis 0 for idempotent no-ops.CommandRejected—decidereturnedError— business rule violated; state unchanged (SQS_Sync)CommandPending— command queued fire-and-forget; result not yet known (SQS_Async)
Client example:
mutation RegisterCustomer($id: ID!, $email: String!, $address: String!) {
registerCustomer(id: $id, email: $email, address: $address) {
__typename
... on CommandAccepted { msgId entityId eventCount }
... on CommandRejected { msgId errorCode errorDetail }
... on CommandPending { msgId }
}
}
Command Structure
All commands include metadata for traceability:
type command'<'id, 'command> = {
id: 'id, // Aggregate instance id
meta: meta, // Metadata
command: 'command, // The actual command
}
type meta = {
service: service, // service name that created event or is addressed by command
time: string, // when message was created
ip: string, // IP of service that created message
user: string, // user name that initiated message (if any)
msgId: string, // unique message id
correlationId: string, // id of message that caused this message
}
This metadata enables:
- Command tracing across the system
- Debugging command flows
- Auditing who initiated commands
- Causality tracking for event sourcing
Related Components
- Aggregate - Consumes commands from CommandTopic
- CommandGenerator - Publishes commands to CommandTopic from API
- EventMapper - Publishes commands to CommandTopic based on events
- Extension - Publishes commands to ExtensionPoint CommandTopics
- EventTopic - Similar pattern for event distribution
AWS Implementation
For detailed implementation, see CommandTopic AWS Adapter Documentation.