StateChangeSlice
For a short summary of StateChangeSlice, see Reventless Components Overview.
This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (StateChangeSlice.res), builder logic (StateChangeSlice_Builder.res), and callback/handler logic (StateChangeSlice_Callback.res).
Overview
The StateChangeSlice is a DCB (Dynamic Consistency Boundary) component that processes commands against a shared event-sourced state. It implements a decision model pattern where commands are evaluated against accumulated events to produce new events or errors.
Purpose and Responsibilities
- Responsibility: Process commands using a state (decision model) built from event history; append new events to the shared DcbEventLog; handle optimistic concurrency conflicts
- In: Commands from CommandTopic (routed by command type)
- Out: Events to DcbEventLog (via append operation)
- Key Feature: Multiple slices can coexist in a single plugin, each handling different command types but sharing the same event log
Relationship with DCB
StateChangeSlice is a core component of the DCB architecture:
Component Spec
The StateChangeSlice component requires a spec that defines its name, command type, error type, and decision logic:
module type Spec = {
let name: string
// Local subset of events this slice reads to rebuild its decision state.
// Declared locally — there is no shared DcbEventLogSpec module.
@schema
type consumedEvent
@schema
type command
@schema
type error
// Events this slice emits from `decide`.
@schema
type event
type state
let initialState: state
let evolve: (state, consumedEvent) => state
let decide: (state, command) => result<array<event>, error>
}
Spec Fields Explained
| Field | Type | Description |
|---|---|---|
name | string | Unique identifier for this slice |
consumedEvent | @schema type | Local subset of events this slice reads to build its decision state |
command | @schema type | Command type using @schema ppx for auto-generated schema |
error | @schema type | Error type for command processing failures |
event | @schema type | Events this slice emits from decide |
state | type | The state type built from accumulated events |
initialState | state | Starting state for new aggregates/entities |
evolve | (state, event) => state | Fold function to accumulate events into state |
decide | (state, command) => result<events, error> | Business logic to produce events from command |
Runtime Behavior
Command Processing Flow
Optimistic Concurrency Control
StateChangeSlice implements optimistic concurrency to handle concurrent command processing:
// The callback reads the current head position
let readResult = await dcbEventLog.read(~query)
// Uses the position as a condition for append
let condition: DcbTag.appendCondition = {
query,
after: ?readResult.headPosition,
}
// If another process appended between read and write, retry
switch await dcbEventLog.append(newEvents, ~condition) {
| Ok(position) => // Success
| Error(err) =>
if retries > 0 {
// Retry with fresh read
await attempt(~retries=retries - 1)
} else {
// Exhausted retries
Error("conflict: retries exhausted")
}
}
Key points:
- Reads event log state before processing
- Records
headPosition(sequence position of last event) - Uses conditional append: only succeeds if no events were added after
headPosition - Retries up to 3 times on conflict
- Provides detailed logging for debugging
Automatic Query Construction
The query is built automatically from the command schema via DcbTag.buildQueryFromCommand:
- Scalar tagged fields (e.g.,
itemId: stringauto-tagged by PPX) — all tags go into a single AND clause (single-entity query) - Tagged array fields (e.g.,
productId: array<string>auto-tagged on elements) — each element becomes its own OR clause (cross-entity query)
No configuration is needed — the schema determines the query mode automatically.
When a variant has multiple *Id fields, use @partitionTag on the field that should be the partition key, or @compositePartitionTag on multiple fields to form a composite key joined in declaration order — see PPX annotations.
Error Handling
Error Types
StateChangeSlice defines error types for business logic failures:
@schema
type error =
| ItemNotFound
| ItemAlreadyExists
| InsufficientStock(int) // With payload
| ValidationError(string)
Error Processing
Errors are:
- Returned from the
decidefunction - Logged with full context (slice name, command, error details)
- Converted to JSON using the error schema
- Passed back to the caller via CommandTopic reference
| Error(error) =>
let errorJson = error->S.reverseConvertToJsonOrThrow(Spec.errorSchema)->JSON.stringify
Logger.error(~loc=__LOC__, `StateChangeSlice(${Spec.name}): decide error`, errorJson)
Error(errorJson)
Conflict Resolution
When concurrent modifications cause conflicts:
| Error(err) =>
if retries > 0 {
Logger.info(~loc=__LOC__, `StateChangeSlice(${Spec.name}): conflict, retrying`, err)
await attempt(~retries=retries - 1)
} else {
Logger.error(
~loc=__LOC__,
`StateChangeSlice(${Spec.name}): conflict, retries exhausted`,
err,
)
Error("conflict: retries exhausted")
}
Pulumi Outputs
type outputs = {
resources: array<Reventless.Adapter.resource>,
}
The StateChangeSlice reuses resources from the shared DcbEventLog:
- DynamoDB table for event storage
- SNS topic for event publishing
- Related IAM roles and policies
Related Components
- DcbEventLog - Shared event log for DCB slices
- CommandTopic - Command routing and filtering
- Plugin - Hosts DCB slices and creates shared infrastructure
- EventCollector - Consumes events from DcbEventLog
- ReadModel - Builds read models from DcbEventLog events
- Usage Guide - How to use StateChangeSlice in your application
AWS Implementation
For detailed implementation with AWS services (DynamoDB for storage, SNS for publishing, SQS for commands), see StateChangeSlice AWS Adapter Documentation (reuses EventLog infrastructure).