Alpha Version: You are viewing the ALPHA documentation. This is an experimental version and may contain breaking changes.
Skip to main content

CommandTopic

For a short summary of CommandTopic, see Reventless Components Overview.

Framework Implementation

This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (CommandTopic.res), builder logic (CommandTopic_Builder.res), adapter interface (CommandTopic_Adapter.res), runtime operations (CommandTopic_Operations.res), and callback handlers (CommandTopic_Callback.res).

Overview

d2 diagram

The CommandTopic is the message queue component that delivers commands to Aggregates with strict ordering guarantees and reliable delivery. It ensures commands are processed exactly once per aggregate instance, in the order they were sent.

Purpose and Responsibilities

  • Responsibility: Deliver commands to Aggregates with typed results; support synchronous execution (with immediate CommandResult feedback) and async fire-and-forget (with FIFO ordering guarantees)
  • In: Commands from API (via CommandGenerator), EventMapper, Extensions, or ExtensionPoints
  • Out: Commands to Aggregate command handlers; CommandResult outcomes back to callers

Component Spec

The CommandTopic requires a spec defining the Aggregate's id type and command type:

module type Spec = {
@schema
type command
}

Take the following spec for a Customer aggregate as an example:

Customer.res
@@reventless.spec

@schema
type command =
| Register({email: string, address: string})
| UpdateEmail({email: string})
| UpdateAddress({address: string})
| Deactivate

This spec is used to create a type-safe CommandTopic for the Customer aggregate.

Usage Pattern

CommandTopics are typically created as part of an Aggregate component and used internally by the framework for command delivery.

Creating a CommandTopic

To create a CommandTopic module you have to provide the spec and a channel adapter:

Customer_Aggregate.res
module CustomerCommandTopic = Reventless.CommandTopic_Builder.Make(
Customer,
CommandTopicChannel.SQS_Sync, // default — synchronous result
)

let commandTopic = CustomerCommandTopic.make(
~name="Customer",
~opts=pulumiOptions,
)

The channel adapter controls whether the mutation returns an immediate CommandResult or a CommandPending response:

ChannelQueue typeMutation returnUse when
CommandTopicChannel.SQS_SyncStandard SQSCommandAccepted or CommandRejectedDefault — user-facing CRUD
CommandTopicChannel.SQS_AsyncFIFO SQSCommandPendingHigh-contention or internal automation

Sync vs async

App developers don't pick the channel by hand. The plugin generator selects sync (Make) or async (MakeAsync) per component based on the spec file:

  • Default — sync. Aggregate and StateChangeSlice spec files without any flag get Platform.Aggregate.Make / Platform.StateChangeSlice.Make, wired to SQS_Sync. The AppSync Lambda dispatches the command inline (via publishJsonsAndWaitrunInlineAndCollect) and the mutation resolves to CommandAccepted / CommandRejected.
  • Opt-in — async. Add @@reventless.async at the top of the spec file. The generator emits MakeAsync, wires to SQS_Async (FIFO), and the AppSync Lambda fire-and-forgets to the queue and returns CommandPending. The actual command handler runs later via the SQS event source.

The Lambda layout follows: async aggregates land in AllAggregatesAsyncCmdHandler (separate from the default AllAggregatesCmdHandler); async StateChangeSlices share a per-plugin <Plugin>DcbAsyncCmdHandler Lambda (separate from the default <Plugin>DcbCmdHandler). Async Lambdas are only provisioned when at least one component opts in — sync-only setups pay no extra Lambda cost.

Each of the four command-handler Lambdas (sync / async × aggregates / state-changes) can be tuned independently via Platform.MakeWithConfig's commandHandlerConfig record — memory, timeout, reserved concurrency, SQS batch size, ephemeral /tmp, log retention, and extra env vars. See the Tuning the command-handler Lambdas section in the AWS Lambda deployment guide.

CommandTopic Operations

The CommandTopic provides operations for publishing and handling commands:

Publish a Single Command

The publish operation sends a single typed command to the topic:

type publish<'id, 'command> = (
Message.command'<'id, 'command>
) => promise<unit>

Usage:

await commandTopic.publish({id, meta, command})

Publish Multiple Commands

The publishJsons operation sends multiple JSON commands efficiently:

type publishJsons = (
array<Reventless.Message.commandJson>
) => promise<unit>

This is used internally for batch command publishing (e.g. from EventMappers). Each command is a record with the following structure (as defined in the reventless-spec package):

type commandJson = {
id: string,
meta: meta,
commandJson: Js.Json.t,
delay?: int,
}

Runtime Behavior

Command Publishing Flow

d2 diagram

Command Handling Flow

d2 diagram

Integration with Aggregate

The CommandTopic is the delivery mechanism between command sources and Aggregates:

d2 diagram

Flow:

  1. Command sources publish commands to the CommandTopic queue
  2. Lambda is triggered when messages arrive
  3. Handler decodes commands and calls the Aggregate's command handler
  4. Aggregate processes commands and appends events to EventLog

CommandResult

Every GraphQL mutation returns a CommandResult union. The variant depends on which channel the CommandTopic is configured with:

union CommandResult = CommandAccepted | CommandRejected | CommandPending

type CommandAccepted {
msgId: ID!
entityId: ID # id of the created/modified entity (absent for extension point commands)
eventCount: Int! # number of events appended; 0 for idempotent no-ops
}

type CommandRejected {
msgId: ID!
errorCode: String! # Spec.error variant name, e.g. "AlreadyExists"
errorDetail: String # full serialized Spec.error JSON (for debugging)
}

type CommandPending {
msgId: ID! # use msgId to subscribe for the eventual result
}
  • CommandAccepted — command was valid, business rules passed, events committed (SQS_Sync). entityId lets the client navigate directly to the affected entity; eventCount is 0 for idempotent no-ops.
  • CommandRejecteddecide returned Error — business rule violated; state unchanged (SQS_Sync)
  • CommandPending — command queued fire-and-forget; result not yet known (SQS_Async)

Client example:

mutation RegisterCustomer($id: ID!, $email: String!, $address: String!) {
registerCustomer(id: $id, email: $email, address: $address) {
__typename
... on CommandAccepted { msgId entityId eventCount }
... on CommandRejected { msgId errorCode errorDetail }
... on CommandPending { msgId }
}
}

Command Structure

All commands include metadata for traceability:

type command'<'id, 'command> = {
id: 'id, // Aggregate instance id
meta: meta, // Metadata
command: 'command, // The actual command
}

type meta = {
service: service, // service name that created event or is addressed by command
time: string, // when message was created
ip: string, // IP of service that created message
user: string, // user name that initiated message (if any)
msgId: string, // unique message id
correlationId: string, // id of message that caused this message
}

This metadata enables:

  • Command tracing across the system
  • Debugging command flows
  • Auditing who initiated commands
  • Causality tracking for event sourcing
  • Aggregate - Consumes commands from CommandTopic
  • CommandGenerator - Publishes commands to CommandTopic from API
  • EventMapper - Publishes commands to CommandTopic based on events
  • Extension - Publishes commands to ExtensionPoint CommandTopics
  • EventTopic - Similar pattern for event distribution

AWS Implementation

For detailed implementation, see CommandTopic AWS Adapter Documentation.