Alpha Version: You are viewing the ALPHA documentation. This is an experimental version and may contain breaking changes.
Skip to main content

DCB (Dynamic Consistency Boundary)

The Plugin component supports an optional DCB (Dynamic Consistency Boundary) event log shared across multiple state change slices. All slices in a plugin read from and write to the same event log, with optimistic concurrency control enforced per command.

Command Flow

d2 diagram

One SQS FIFO queue per plugin receives all commands. The filteringHandler in the Lambda routes each message by its TAG field to whichever state change slices handle that command type. Slices that don't handle a command type are never called.

Module Types

Plugin.DcbSpec

Bundles the shared event type with the list of state change slices. The with type dcbEvent = event constraint ensures compile-time type safety — all slices must use the same event type as the DcbEventLog.

module type DcbSpec = {
@schema
type event

let stateChangeSlices: array<module(StateChangeSlice.T with type dcbEvent = event)>
let stateViewSlices: array<module(StateViewSlice.T with type dcbEvent = event)>
}

Note: there is no plugin-wide type command. Each slice declares its own commandSchema (auto-generated by the @schema ppx on type command).

StateChangeSlice.Spec

module type Spec = {
let name: string

module DcbEventLogSpec: DcbEventLog.Spec // shared event log spec

@schema
type command // ppx generates commandSchema: S.t<command>

@schema
type error

type decisionModel
let initialDecisionModel: decisionModel

let reduce: (decisionModel, DcbEventLogSpec.event) => decisionModel
let decide: (decisionModel, command) => result<array<DcbEventLogSpec.event>, error>
}

StateViewSlice.Spec

module type Spec = {
let name: string

module DcbEventLogSpec: DcbEventLog.Spec // shared event log spec

@schema
type event // event type from DcbEventLog

@schema
type state // state type for the read model

// Projection function: transforms events into projection actions
let project: (option<state>, DcbEventLogSpec.event) => array<Reventless.Projection.Spec.action<string, state>>
}

StateChangeSlice.T

module type T = {
type dcbEvent
module Spec: Spec

let make: (
~dcbEventLog: DcbEventLog.component<DcbEventLog.operations<dcbEvent>>,
~publishJsons: Pulumi.Output.t<CommandTopic.publishJsons>,
~opts: Pulumi.ComponentResource.options=?,
) => component
}

StateViewSlice.T

module type T = {
type dcbEvent
module Spec: Spec

let make: (
~dcbEventLog: DcbEventLog.component<DcbEventLog.operations<dcbEvent>>,
~queryDb: QueryDb.component<QueryDb.operations>,
~opts: Pulumi.ComponentResource.options=?,
) => component
}

CommandTopic.T (relevant additions)

module type T = {
// ...existing fields...

// Register a JSON handler in the global registry under each command type name
let registerHandler: (
~commandTopic: component,
~schema: S.t<unknown>,
~handler: jsonCommandsHandler,
~typeNames: array<string>,
) => unit

// Returns the routing handler output for Lambda runtime connection
let makeFilteringHandler: (
component,
) => Pulumi.Output.t<Runtime.eventHandler<callbackEvent, 'context, unit>>

// ...make, connect, makeHandler...
}

Usage

1. Define the shared event log spec

module MyDcbEventLogSpec = {
@schema
type event =
| ItemCreated({itemId: @s.matches(DcbTag.string) string, name: string})
| ItemRenamed({itemId: @s.matches(DcbTag.string) string, newName: string})
}

Fields marked @s.matches(DcbTag.string) become DCB tags — the event log is queried by these values to build the decision model.

2. Define state change slice specs

Each spec references the shared event log spec and defines its own command type:

module CreateItemSpec = {
let name = "CreateItem"

module DcbEventLogSpec = MyDcbEventLogSpec

@schema
type command = CreateItem({itemId: @s.matches(DcbTag.string) string, name: string})

@schema
type error = ItemAlreadyExists

type decisionModel = {exists: bool}
let initialDecisionModel = {exists: false}

let reduce = (model, event) =>
switch event {
| MyDcbEventLogSpec.ItemCreated(_) => {exists: true}
| _ => model
}

let decide = (model, command) =>
switch command {
| CreateItem({itemId, name}) =>
if model.exists {
Error(ItemAlreadyExists)
} else {
Ok([MyDcbEventLogSpec.ItemCreated({itemId, name})])
}
}

}

module RenameItemSpec = {
let name = "RenameItem"
module DcbEventLogSpec = MyDcbEventLogSpec

@schema
type command = RenameItem({itemId: @s.matches(DcbTag.string) string, newName: string})

@schema
type error = ItemNotFound

type decisionModel = {exists: bool, currentName: option<string>}
let initialDecisionModel = {exists: false, currentName: None}

let reduce = (model, event) =>
switch event {
| MyDcbEventLogSpec.ItemCreated({name}) => {exists: true, currentName: Some(name)}
| MyDcbEventLogSpec.ItemRenamed({newName}) => {...model, currentName: Some(newName)}
| _ => model
}

let decide = (model, command) =>
switch command {
| RenameItem({itemId, newName}) =>
if !model.exists {
Error(ItemNotFound)
} else {
Ok([MyDcbEventLogSpec.ItemRenamed({itemId, newName})])
}
}

}

3. Build state change slices

StateChangeSlice_Builder.Make takes only the spec. The shared CommandTopic and DcbEventLog are injected by the plugin at deploy time.

module CreateItemSlice = StateChangeSlice_Builder.Make(CreateItemSpec)
module RenameItemSlice = StateChangeSlice_Builder.Make(RenameItemSpec)

4. Define the DcbSpec

module MyDcbSpec = {
@schema
type event = MyDcbEventLogSpec.event

let stateChangeSlices = [
module(CreateItemSlice: StateChangeSlice.T with type dcbEvent = event),
module(RenameItemSlice: StateChangeSlice.T with type dcbEvent = event),
]
}

5. Create the plugin

// Inside the plugin's Make functor:
let make = () =>
Platform.Plugin.make(
~name="MyPlugin",
~heartbeatInterval=300,
~stateChangeSlices=[module(CreateItemSlice), module(RenameItemSlice)],
~stateViewSlices=[module(ItemViewSlice)],
)

Plugin Outputs

type outputs = {
// ...existing outputs...
dcbEventLog: Pulumi.Output.t<option<DcbEventLog.outputs>>,
stateChangeSlices: Pulumi.Output.t<dict<StateChangeSlice.outputs>>,
stateViewSlices: Pulumi.Output.t<dict<StateViewSlice.outputs>>,
}
  • dcbEventLog: Some(outputs) when any DCB slice array is non-empty, None otherwise
  • stateChangeSlices: keyed by Spec.name, contains resources for each slice
  • stateViewSlices: keyed by Spec.name, contains QueryDb outputs for each slice

Architecture

Deploy-Time Setup (Plugin_Builder.Make)

When DCB is configured, Plugin_Builder.Make.construct does the following:

  1. Creates one DcbEventLog using DcbSpec.event and the plugin name
  2. Creates one DcbCommandTopic — typed as command = JSON.t (accepts all JSON)
  3. Constructs each StateChangeSlice — passes both shared resources; each slice registers its JSON handler in the global registry
  4. Constructs each StateViewSlice — creates QueryDb and EventCollector for each slice, projects events to read model
  5. Calls DcbCommandTopic.makeFilteringHandler — wires filteringHandler to the SQS channel
  6. Calls PluginRuntimeBuilder.forDcbCommandTopic — creates the Lambda and connects it to SQS
// Inside Plugin_Builder.Make.construct, when dcbSpec = Some(module(DcbSpec):

module DcbEventLog = DcbEventLog_Builder.Make(DcbEventLogSpec, DcbEventLogStorage, DcbEventTopicPublisher)
let dcbEventLog = DcbEventLog.make(~name=name, ~opts) // plugin name becomes the service identifier

module DcbCommandTopic = CommandTopic_Builder.Make(DcbCommandTopicSpec, DcbCommandTopicChannel)
let dcbCommandTopic = DcbCommandTopic.make(~name=`${name}StateChanges`, ~opts)

// Extract publishJsons from the command topic before passing to each slice
// (avoids a type mismatch between DcbCommandTopic.operations and StateChangeSlice.operations)
let publishJsons =
dcbCommandTopic->Component.operations->Pulumi.Output.apply(ops => ops.publishJsons)

// Each StateChangeSlice.make call registers its handler in the global registry
DcbSpec.stateChangeSlices->Array.map(module(Slice: StateChangeSlice.T with type dcbEvent = DcbSpec.event) => {
Slice.make(~dcbEventLog, ~publishJsons, ~opts)
})

// Each StateViewSlice.make creates a QueryDb and subscribes to DcbEventLog events
DcbSpec.stateViewSlices->Array.map(module(ViewSlice: StateViewSlice.T with type dcbEvent = DcbSpec.event) => {
let queryDb = QueryDb_Builder.Make(~name=ViewSlice.Spec.name, ~opts)
ViewSlice.make(~dcbEventLog, ~queryDb, ~opts)
})

// Capture the forDcbCommandTopic call in a closure while DcbCommandTopic is in scope,
// so the local module type does not need to escape the switch arm
let dcbHandler = DcbCommandTopic.makeFilteringHandler(dcbCommandTopic)
let dcbRuntimeSetup = () =>
dcbCommandTopic->PluginRuntimeBuilder.forDcbCommandTopic(~handler=dcbHandler, ~connect=dcbConnectFn)

// In pureOutputs->Pulumi.Output.apply:
dcbRuntimeSetup()

Schema-Based Handler Registration

StateChangeSlice_Builder.Make.construct registers each slice's handler:

dcbEventLog
->Component.operations
->Pulumi.Output.apply(dcbEventLogOps => {
let jsonHandler = makeJsonHandler(dcbEventLogOps) // decodes JSON → Spec.command, calls Callback
CommandTopic.registerHandler(
~schema=commandSchema, // S.t<unknown> cast from Spec.commandSchema
~handler=jsonHandler,
~typeNames=commandTypeNames, // e.g. ["CreateItem"]
)
})

CommandTopic.registerHandler populates globalRegistry — a module-level Dict.t<array<handlerEntry>> keyed by command type name (e.g. "CreateItem", "RenameItem").

Filtering Handler

filteringHandler is defined at module level in CommandTopic_Builder.Make. It is the actual Lambda callback:

let filteringHandler: jsonCommandsHandler = async jsonItems => {
let allResults = []
jsonItems->Array.map(async ({reference, command: json}) => {
let typeName = extractTypeNameFromJson(json) // reads json["TAG"]
let handlers = CommandTopic.getHandlers(typeName) // reads globalRegistry
handlers->Array.map(async ({handler}) => {
let results = await handler([{reference, command: json}])
allResults->Array.pushMany(results)
})
})
allResults
}

CommandTopic.extractTypeNamesFromSchema reads the sury schema to extract variant names from the TAG const fields — supporting both single-variant (Object) and multi-variant (Union) command types.

StateChangeSlice_Callback: Decision Logic

StateChangeSlice_Callback.Make(Spec) produces a module whose handleCommands takes dcbEventLog as an explicit runtime parameter (rather than capturing it via a functor argument). This allows Callback to be created at module level in StateChangeSlice_Builder.Make, where the type system can properly unify Callback.Spec.command with the outer Spec.command.

handleCommands(dcbEventLog, topicItems) processes each command:

  1. Builds the query automatically using DcbTag.buildQueryFromCommand(~eventTypes=queryEventTypes, ~schema=Spec.commandSchema, ~value=command) where queryEventTypes is derived at module init from DcbTag.extractEventTypes(Spec.DcbEventLogSpec.eventSchema). The query mode is determined by schema introspection:
    • Scalar tags only (e.g., itemId: @s.matches(DcbTag.string) string) → single AND clause (standard single-entity query)
    • Tagged array fields (e.g., productId: array<@s.matches(DcbTag.string) string>) → per-element OR clauses (cross-entity query for commands referencing multiple entities)
  2. Queries the event log: dcbEventLog.readStream(~query)
  3. Reduces events into a decision model: fold with Spec.reduce starting from Spec.initialDecisionModel
  4. Calls Spec.decide(decisionModel, command) to produce new events or an error
  5. Appends with optimistic concurrency: dcbEventLog.append(newEvents, ~condition={query, after: headPosition})
  6. Retries up to 3 times on append conflict (position changed between read and write)

For an end-to-end walkthrough of how the query is built from a command and how that condition becomes the per-tag consistency fences enforced atomically on DynamoDB — with worked AddProduct / PlaceOrder / RecordProductDemand examples — see DCB Consistency Checks.

StateViewSlice_Callback: Projection Logic

StateViewSlice_Callback.Make(Spec) produces a module whose handleEvents function processes events from the DcbEventLog and projects them into the QueryDb read model.

handleEvents(dcbEventLog, queryDb, topicItems) processes each event:

  1. Decodes each JSON event using Spec.eventSchema
  2. Retrieves current state from QueryDb: queryDb.get(~key=viewKey)
  3. Applies the projection function: Spec.project(currentState, event) which returns an array of projection actions
  4. Uses Reventless.Projection.Spec.handleActions to process the actions and update state
  5. Writes the updated state back to QueryDb: queryDb.put(~key=viewKey, ~value=updatedState)

The projection pattern allows StateViewSlice to maintain materialized views of the event log state, providing optimized read access to application data.

Plugin_Builder.Make Functor Parameters

The functor requires three DCB-specific adapters alongside the standard ones:

module Make = (
Spec: Spec,
RuntimeEnvironment: Runtime.Environment,
EventCollectorChannel: EventCollector_Adapter.Channel ...,
QueryEngineAdapter: QueryDb_Adapter.QueryEngineAdapter,
CorePluginExtensionPointRemoteChannel: CommandTopic_Adapter.RemoteChannel,
HeartbeatRunner: Heartbeat_Adapter.Runner ...,
PluginRuntimeBuilder: PluginRuntime_Builder.T ...,
DcbEventLogStorage: DcbEventLog_Adapter.Storage, // e.g. DynamoDB adapter
DcbEventTopicPublisher: EventTopic_Adapter.Publisher, // e.g. SNS adapter
DcbCommandTopicChannel: CommandTopic_Adapter.Channel, // e.g. SQS FIFO adapter
): Plugin.T

These are always required as functor parameters even if a specific plugin instance doesn't use DCB (DCB slice arrays are optional at the make call site).

Design Decisions

Global Registry

CommandTopic.globalRegistry is a module-level mutable Dict.t<array<handlerEntry>>. State change slices populate it inside Pulumi.Output.apply callbacks during deploy time.

When Pulumi creates the DCB command topic Lambda using aws.lambda.CallbackFunction, it serializes the filteringHandler closure and its entire module graph, including the populated globalRegistry. The serialized Lambda bundle therefore contains the registered handlers with all their captured state (DcbEventLog DynamoDB table ARN etc.).

Limitation: all registerHandler calls must resolve (i.e., their Pulumi.Output.apply callbacks must run) before Pulumi serializes the Lambda. This is expected to happen in practice since all outputs in the plugin resolve during the same pulumi up execution, but there is no explicit ordering guarantee in the current implementation.

No Obj.magic

The DCB implementation contains no unsafe type casts. Two problems that originally required Obj.magic were resolved structurally:

1. Callback.Spec.command unification in StateChangeSlice_Builder

When Callback = StateChangeSlice_Callback.Make(Spec, Ops) was created inside makeJsonHandler (a function), the type checker treated Callback.Spec.command as a fresh nominal type distinct from the outer Spec.command, even though they are identical at runtime.

Fix: StateChangeSlice_Callback.Make now takes only Spec as a functor parameter. The dcbEventLog operations are passed as a regular runtime argument to handleCommands. Callback is therefore created at module level in StateChangeSlice_Builder.Make, where the type system correctly unifies Callback.Spec.command with Spec.command:

module Callback = StateChangeSlice_Callback.Make(Spec)  // module level — types unify

let makeJsonHandler = (dcbEventLogOps) => {
let handler: CommandTopic.jsonCommandsHandler = async items => {
let decodedItems = items->Array.filterMap(...)
await Callback.handleCommands(dcbEventLogOps, decodedItems) // no cast needed
}
handler
}

2. Type mismatch between DcbCommandTopic.operations and StateChangeSlice.operations in Plugin_Builder

DcbCommandTopic.operations = {publish, publishJsons} did not match StateChangeSlice.T.make's expected CommandTopic.component<{publishJsons}>, even though only publishJsons is used by slices.

Fix: StateChangeSlice.T.make now accepts ~publishJsons: Pulumi.Output.t<CommandTopic.publishJsons> directly instead of the whole command topic component. Plugin_Builder extracts publishJsons before the slice loop:

let publishJsons =
dcbCommandTopic->Component.operations->Pulumi.Output.apply(ops => ops.publishJsons)
Slice.make(~dcbEventLog, ~publishJsons, ~opts)

3. DcbCommandTopic.component type escaping the switch arm in Plugin_Builder

DcbCommandTopic is a locally-defined module inside a switch arm. Storing the component value in the return tuple caused its local type to escape the arm's scope.

Fix: the forDcbCommandTopic call is wrapped in a unit => unit closure while DcbCommandTopic is still in scope. The closure is stored in the tuple instead of the component:

let dcbRuntimeSetup = () =>
dcbCommandTopic->PluginRuntimeBuilder.forDcbCommandTopic(~handler=dcbHandler, ~connect=dcbConnectFn)

// stored as: Some(dcbRuntimeSetup)
// used later as: dcbRuntimeOpt->Option.forEach(setup => setup())

DcbSpec Retains type event

The original plan proposed removing Plugin.DcbSpec entirely and passing stateChangeSlices directly. DcbSpec is kept because the plugin builder needs the event type to instantiate DcbEventLog_Builder.Make(DcbEventLogSpec, ...) — without it, there is no compile-time constraint ensuring all slices share the same event type.

The type command field was removed from DcbSpec since each slice now provides its own commandSchema.

Open Issues

No Explicit Lambda Serialization Ordering

As noted above, there is no explicit Pulumi dependency ensuring that globalRegistry is fully populated before the DCB command topic Lambda is serialized. In practice this works because all outputs resolve synchronously within pulumi up, but it is fragile and could break if the execution order changes.

A more robust alternative would be to store handlers on the DcbCommandTopic component itself (e.g. via a JS property) rather than in a module-level global, so the populated state is always local to the component being serialized.

Event Type Filtering Covers the Full Event Schema

queryEventTypes is derived automatically from DcbEventLogSpec.eventSchema via DcbTag.extractEventTypes at module initialisation time. This means a slice always queries all event types in the shared event log, not just those it handles in reduce. The reduce function's catch-all branch (| _ => model) discards irrelevant events correctly.

A future optimisation could narrow the query to only the event types referenced in reduce, but this would require compile-time introspection of the pattern match, which is not currently supported.

No Multi-Command-Type Support in extractTypeNamesFromSchema

CommandTopic.extractTypeNamesFromSchema handles Union (multiple variants) and Object (single variant). It does not handle payload-less variants (string schemata) — these variants are silently ignored and would never be routed to a handler. Slices whose command type includes a payload-less variant (e.g. | NoOp) should be aware that NoOp commands will not be dispatched by the filtering handler (they will fall through with no result).

Aggregates Intentionally Use makeHandler

Aggregates still create their own CommandTopic per aggregate with their own Lambda. Although the schema-based registerHandler API was added to CommandTopic.T, Aggregate_Builder intentionally continues to use makeHandler with a strongly-typed commandsHandler. This is a deliberate architectural choice, not an oversight:

  1. Type safety: makeHandler accepts a commandsHandler<Message.command'<Spec.Id.t, Spec.command>> — the command type is fully resolved at compile time. No JSON decode step is needed at the handler boundary; the framework can pass a typed value directly. registerHandler (used by DCB slices) takes a jsonCommandsHandler and must decode each command from JSON at runtime, which adds a failure surface.

  2. Isolation: Each aggregate gets its own SQS FIFO queue and its own Lambda. A crash or overload in one aggregate's Lambda cannot affect another aggregate's command processing. DCB slices intentionally share one Lambda (because they share one DcbEventLog), so registerHandler with a global dispatch table is needed there. Aggregates have no such sharing requirement.

  3. Simplicity: registerHandler depends on a global Dict that must be populated before the Lambda serialization completes. A single functor application in the wrong place can silently leave a handler unregistered. makeHandler has no global state — it returns a handler directly from the builder, making the wiring explicit and easy to reason about.

  4. Architecture fit: Each aggregate owns a separate event log and a separate command topic. There is no shared resource that would justify collapsing multiple aggregates into a single Lambda. The cost (complexity, global registry risk) would outweigh any benefit.

In short: DCB slices use registerHandler because they must share a Lambda to share a DcbEventLog. Aggregates use makeHandler because they can have dedicated Lambdas, and the simpler, type-safe path is strictly better for them.