Messages in Reventless
Messages are the fundamental communication mechanism in the reventless event-sourcing framework. They enable decoupled, scalable systems by providing structured ways to express intent (commands) and facts (events) while maintaining traceability and correlation across distributed services.
Introduction & Conceptual Overview
What are Messages in Event-Sourcing?
In event-sourcing architecture, messages serve as the primary means of communication between different parts of the system. They represent either:
- Commands: Intent to change state ("I want to create a customer")
- Events: Immutable facts about what happened ("Customer was created")
Messages in reventless carry not just the payload data, but also essential metadata for distributed tracing, correlation, and system observability.
How Messages Enable Decoupled, Scalable Systems
Messages provide several key benefits:
- Temporal Decoupling: Producers and consumers don't need to be online simultaneously
- Spatial Decoupling: Services don't need to know about each other's locations
- Synchronization Decoupling: Operations can be asynchronous
- Failure Isolation: One service's failure doesn't cascade to others
The Journey from Command to Event
The typical message flow in reventless follows this pattern:
- Client sends a
command'to express intent - Aggregate processes the command and produces
event'instances - EventLog persists the events for durability
- EventTopic publishes events to interested subscribers
- ReadModels consume events to update their projections
Message Types Overview
Reventless defines several core message types:
meta: Metadata for tracing and correlationcontext: Command processing contextevent'<'id, 'event>: Structured event with metadatacommand'<'id, 'command>: Structured command with metadatacommandJson: Serialized command for transport
Core Message Types
Meta Type (meta)
The meta type provides envelope metadata for every message in the system. It is shared by both event' and command' envelopes — fields are kept generic so they apply equally to commands and events.
type meta = {
service: service, // producer service name
time: string, // ISO-8601 producer timestamp
ip?: string, // producer IP — absent when unknown (e.g. serverless)
user?: string, // acting user — absent for system-initiated messages
msgId: string, // unique id of this message
correlationId: string, // root id of the correlation chain (defaults to msgId)
causationId?: string, // id of the *direct* parent that caused this message
traceparent?: string, // W3C Trace Context header, opaque pass-through
schemaVersion?: string, // schema version of this message's payload variant
headers?: dict<string>, // extensible bag (tenantId, feature flags, etc.)
}
Field semantics:
service/time/msgIdare always present.correlationIdis always present; defaults tomsgIdwhen this message is the chain root.causationIdidentifies the direct parent (the immediately preceding message in the chain); absent at the chain root. Together withcorrelationIdthis gives you both "the whole chain root" and "my parent" — enough to reconstruct the full causation graph.traceparentis stored under that exact lowercase spelling because it is the literal W3C Trace Context HTTP header name, used verbatim by OpenTelemetry SDKs and AWS X-Ray. Adapters do zero case-translation between the HTTP header and the meta field.ip/userareoption<string>— absent means "unknown" / "system". There are no""/"unknown"sentinel values to special-case.headersis an extensible string-keyed bag for cross-cutting context (tenant id, feature flags, request-scoped state); absent when empty. Read it withmeta.headers->Option.getOr(Dict.make()).
Building meta:
Message.generateMeta(~service, ~ip=?, ~user=?, ~causationId=?, ~traceparent=?, ~correlationId=?, ~schemaVersion=?, ~headers=?)— root meta for messages starting a new chain.Message.deriveMeta(~parent, ~service=?)— child meta for messages emitted in response to a triggering message. SetscausationId = parent.msgId, inheritscorrelationId(so the chain root id stays stable), inheritsip/user/traceparent/schemaVersion/headers, and produces a freshmsgId+time. The framework uses this inAggregate_Callback,StateChangeSlice_Callback,EventMapper_Callback, andExtension_Operations.forwardCommand.
Purpose:
- Distributed Tracing: Track messages across service boundaries;
traceparenthands off to OpenTelemetry / X-Ray. - Causation Graph:
causationId(direct parent) +correlationId(chain root) reconstruct the full message graph. - Audit Trails: Maintain a complete history of who did what when.
- Multi-tenant Routing:
headers.tenantIdis the conventional slot for tenant context.
Context Type (context)
The context type provides processing context to command handlers.
type context = {
id: string,
meta: meta,
}
Purpose:
- Provide aggregate ID and metadata to command handlers
- Enable aggregates to access correlation information
- Support audit and tracing within aggregate processing
Usage Patterns:
- Passed to aggregate command handlers
- Contains the target aggregate ID
- Carries forward the original message metadata
Event' Type (event'<'id, 'event>)
The event' type represents structured events with full metadata.
type event'<'id, 'event> = {
id: 'id, // aggregate or entity identifier
meta: meta, // message metadata
event: 'event, // the actual event payload
}
Purpose:
- Represent immutable facts about what happened in the system
- Carry complete context for event processing
- Enable event replay and projection building
Usage Patterns:
- Produced by aggregates after successful command processing
- Stored in event logs for persistence
- Published to event topics for consumption by read models
- Used in event replay scenarios
Command' Type (command'<'id, 'command>)
The command' type represents structured commands with metadata.
type command'<'id, 'command> = {
id: 'id, // target aggregate identifier
meta: meta, // message metadata
command: 'command, // the actual command payload
}
Purpose:
- Express intent to change system state
- Carry routing information (target aggregate ID)
- Maintain traceability from command to resulting events
Usage Patterns:
- Created by clients or other services
- Routed to appropriate aggregates based on ID
- Validated and processed by command handlers
- Generate events upon successful processing
CommandJson Type (commandJson)
The commandJson type represents serialized commands for transport.
type commandJson = {
id: string, // target aggregate identifier as string
meta: meta, // message metadata
commandJson: Js.Json.t, // serialized command payload
delay?: int, // optional delay in milliseconds
}
Purpose:
- Enable command transport over message queues
- Support delayed command execution
- Provide schema-agnostic command serialization
Usage Patterns:
- Used in message queue implementations
- Supports command scheduling with delay
- Enables cross-service command routing
Message Flow Patterns
Command-to-Event Flow
The fundamental flow pattern in reventless shows how commands transform into events:
This flow demonstrates:
- Client creates and sends a
command' - CommandTopic routes the command to the appropriate aggregate
- Aggregate processes the command and produces
event'instances - EventLog persists events for durability and replay
- EventTopic publishes events to subscribers
- ReadModel consumes events to update projections
Message Correlation
Message correlation enables tracing related messages through the system:
Key Principles:
- Original commands set
correlationIdequal to theirmsgId - Resulting events maintain the original
correlationId - New commands triggered by events use the triggering event's
msgIdas theircorrelationId
Message Transformation
Messages undergo various transformations as they flow through the system:
Transformation Types:
- Serialization:
command'tocommandJsonfor transport - Encoding/Decoding: Type-safe conversion to/from JSON
- Schema Evolution: Handling version changes in message formats
Practical Examples
Creating and Sending Commands
// Example: Creating a customer creation command
let createCustomerCommand = {
id: "customer-123",
meta: Message.generateMeta(~service="CustomerService", ~user="john.doe"),
command: Customer.Create({
name: "John Doe",
email: "john@example.com",
address: "123 Main St"
})
}
// The generateMeta function creates proper metadata
let meta = Message.generateMeta(~service="OrderService", ~user="jane.smith")
// Results in:
// {
// service: "OrderService",
// time: "2024-08-13T10:30:00.000Z",
// ip: "",
// user: "jane.smith",
// msgId: "uuid-generated-id",
// correlationId: "uuid-generated-id" // same as msgId for original commands
// }
Processing Events
// Example: Event handler processing customer events
let handleCustomerEvent = (event': Message.event'<string, CustomerEvent.t>) => {
// Log the event for debugging
Js.log3("Processing event:", event'.meta.msgId, event'.event)
switch event'.event {
| CustomerCreated(data) =>
// Update read model with new customer
CustomerView.create(event'.id, {
name: data.name,
email: data.email,
createdAt: event'.meta.time,
createdBy: event'.meta.user
})
| CustomerUpdated(data) =>
// Handle customer update
CustomerView.update(event'.id, {
name: data.name,
email: data.email,
updatedAt: event'.meta.time,
updatedBy: event'.meta.user
})
| CustomerDeleted(_) =>
// Handle customer deletion
CustomerView.delete(event'.id)
}
}
Message Correlation Example
// Original command that starts a workflow
let originalCommand = {
id: "order-456",
meta: {
service: "OrderService",
time: "2024-08-13T10:30:00.000Z",
ip: "192.168.1.100",
user: "customer@example.com",
msgId: "msg-001",
correlationId: "msg-001", // Same as msgId for original commands
},
command: Order.Create({
customerId: "customer-123",
items: [{productId: "prod-1", quantity: 2}],
totalAmount: 99.98
})
}
// Resulting event maintains correlation
let resultingEvent = {
id: "order-456",
meta: {
service: "OrderService",
time: "2024-08-13T10:30:01.000Z",
ip: "192.168.1.100",
user: "customer@example.com",
msgId: "msg-002",
correlationId: "msg-001", // Links back to original command
},
event: Order.Created({
customerId: "customer-123",
items: [{productId: "prod-1", quantity: 2}],
totalAmount: 99.98,
status: "Pending"
})
}
// Subsequent command triggered by the event
let followupCommand = {
id: "inventory-prod-1",
meta: {
service: "InventoryService",
time: "2024-08-13T10:30:02.000Z",
ip: "192.168.1.101",
user: "system",
msgId: "msg-003",
correlationId: "msg-002", // Links to the triggering event
},
command: Inventory.Reserve({
productId: "prod-1",
quantity: 2,
orderId: "order-456"
})
}
Message Utility Functions
Message Creation
generateMeta()
Creates message metadata with proper defaults and unique identifiers.
let generateMeta: (~service: string, ~ip: string=?, ~user: string=?) => meta
Usage:
let meta = Message.generateMeta(~service="CustomerService", ~user="john.doe")
// Generates unique msgId and sets correlationId to the same value
uuid()
Generates unique identifiers for messages and entities.
let uuid: unit => string
nowAsISOString()
Creates ISO timestamp strings for message timing.
let nowAsISOString: unit => string
// Returns: "2024-08-13T10:30:00.000Z"
Message Encoding/Decoding
encode() / decode()
Schema-based serialization for type-safe message handling.
let encode: ('a, S.t<'a>) => Js.Json.t
let decode: (Js.Json.t, S.t<'a>) => 'a
encodeEvent'() / decodeEvent'()
Specialized encoding/decoding for event messages.
let encodeEvent': (event'<'id, 'event>, S.t<'id>, S.t<'event>) => Js.Json.t
let decodeEvent': (Js.Json.t, S.t<'id>, S.t<'event>) => event'<'id, 'event>
encodeCommand'() / decodeCommand'()
Specialized encoding/decoding for command messages.
let encodeCommand': (command'<'id, 'command>, S.t<'id>, S.t<'command>) => Js.Json.t
let decodeCommand': (Js.Json.t, S.t<'id>, S.t<'command>) => command'<'id, 'command>
Message Analysis
serviceNameOfMsg()
Extracts the service name from a message JSON.
let serviceNameOfMsg: Js.Json.t => option<string>
Usage:
let serviceName = Message.serviceNameOfMsg(messageJson)
// Returns: Some("CustomerService") or None if parsing fails
eventNameOfEvent'Json()
Gets the event type name from an event message JSON.
let eventNameOfEvent'Json: Js.Json.t => string
variantNameOfJson()
Extracts variant information from JSON representations.
let variantNameOfJson: Js.Json.t => string
Advanced Patterns
Message Splitting and Combining
splitMessage()
Decomposes messages into type and payload components.
let splitMessage: Js.Json.t => (string, Dict.t<Js.Json.t>)
Usage:
let (messageType, payload) = Message.splitMessage(messageJson)
// Returns: ("CustomerCreated", {name: "John", email: "john@example.com"})
combineMessage()
Reconstructs messages from type and data components.
let combineMessage: (string, Dict.t<Js.Json.t>) => Js.Json.t
Use Cases:
- Message transformation and routing
- Protocol adaptation between services
- Message filtering and enrichment
Error Handling Patterns
Exception Types
exception InvalidEvent(Js.Json.t)
exception InvalidCommand(Js.Json.t)
Error Propagation
When message processing fails, errors should:
- Preserve the original message for debugging
- Maintain correlation information
- Generate appropriate error events
- Support retry mechanisms
Dead Letter Queue Patterns
// Example error handling in message processing
let processMessage = (messageJson) => {
try {
let command = Message.decodeCommand'(messageJson, idSchema, commandSchema)
// Process command...
} catch {
| Message.InvalidCommand(json) =>
// Send to dead letter queue with correlation info
DeadLetterQueue.send(~reason="Invalid command format", ~originalMessage=json)
| exn =>
// Log error and potentially retry
Js.log2("Message processing failed:", exn)
RetryQueue.schedule(~message=messageJson, ~delay=5000)
}
}
Message Versioning
Schema Evolution Strategies
- Additive Changes: New optional fields can be added safely
- Field Renaming: Use schema transformations to map old to new names
- Breaking Changes: Require version-specific handlers
Backward Compatibility
// Example: Handling multiple versions of customer events
let handleCustomerEventV1 = (event) => {
// Handle old format
switch event {
| CustomerCreated({name, email}) =>
// Convert to new format with default values
CustomerCreated({name, email, address: None, phone: None})
}
}
let handleCustomerEventV2 = (event) => {
// Handle new format directly
event
}
Migration Patterns
- Use event upcasting to transform old events to new formats
- Maintain multiple schema versions during transition periods
- Implement gradual rollout strategies for schema changes
Reference Section
Type Definitions
From reventless-spec/src/Message.res
type service = string
type meta = {
service: service,
time: string,
ip: string,
user: string,
msgId: string,
correlationId: string,
}
type context = {
id: string,
meta: meta,
}
type event'<'id, 'event> = {
id: 'id,
meta: meta,
event: 'event,
}
type command'<'id, 'command> = {
id: 'id,
meta: meta,
command: 'command,
}
type commandJson = {
id: string,
meta: meta,
commandJson: Js.Json.t,
delay?: int,
}
type statusChange = {
at: string,
by: string,
}
From reventless/src/Message.res
module type Service = {
module Id: Reventless.Id.T
type id = Id.t
type command
type event
type error
let name: string
}
type handler<'msg> = 'msg => Js.Promise.t<unit>
type commandHandler<'id, 'command> = command'<'id, 'command> => Js.Promise.t<unit>
type commandsHandler<'id, 'command> = ('id, array<command'<'id, 'command>>) => Js.Promise.t<unit>
type eventsHandler<'id, 'event> = ('id, array<event'<'id, 'event>>) => Js.Promise.t<unit>
type errorHandler<'error, 'command, 'event> = ('error, 'command, context) => array<'event>
exception InvalidEvent(Js.Json.t)
exception InvalidCommand(Js.Json.t)
Function Reference
Message Creation
generateMeta(~service, ~ip=?, ~user=?)- Generate message metadatauuid()- Generate unique identifiernowAsISOString()- Current timestamp as ISO stringnow()- Current timestamp as float
Encoding/Decoding
encode(value, schema)- Encode value to JSON using schemadecode(json, schema)- Decode JSON to value using schemaencodeEvent'(event', idSchema, eventSchema)- Encode event messagedecodeEvent'(json, idSchema, eventSchema)- Decode event messageencodeCommand'(command', idSchema, commandSchema)- Encode command messagedecodeCommand'(json, idSchema, commandSchema)- Decode command message
Message Analysis
serviceNameOfMsg(json)- Extract service name from messageeventNameOfEvent'Json(json)- Get event name from event JSONvariantNameOfJson(json)- Extract variant name from JSONidOfEvent'Json(json)- Extract ID from event JSONidMetaEventOfEvent'Json(json)- Extract ID, meta, and event from JSON
Message Transformation
splitMessage(json)- Split message into type and payloadcombineMessage(type, data)- Combine type and data into messagecommandJsonOfCommand'(~idToString, ~commandSchema, command')- Convert command' to commandJsontoMessageBody(commandJson)- Convert commandJson to message body string
Utility Functions
decomposeMeta(meta)- Decompose meta into key-value pairscomposeMeta(dict)- Compose meta from dictionarycomposeEventJson'(id, meta, eventJson)- Compose event JSONlog(value, str)- Log value with message and return value
Common Patterns
Command Creation Pattern
let createCommand = (~id, ~service, ~user, ~commandData) => {
id,
meta: Message.generateMeta(~service, ~user),
command: commandData
}
Event Processing Pattern
let processEvents = (events: array<event'<'id, 'event>>) => {
events->Array.forEach(event' => {
Js.log2("Processing event:", event'.meta.msgId)
// Handle event based on type
handleEvent(event')
})
}
Correlation Tracking Pattern
let trackCorrelation = (originalMsgId, newMessage) => {
{
...newMessage,
meta: {
...newMessage.meta,
correlationId: originalMsgId
}
}
}
Error Handling Pattern
let safeMessageProcessing = (messageJson, processor) => {
try {
processor(messageJson)
} catch {
| Message.InvalidEvent(json) =>
Js.log2("Invalid event:", json)
Error("Invalid event format")
| Message.InvalidCommand(json) =>
Js.log2("Invalid command:", json)
Error("Invalid command format")
| exn =>
Js.log2("Processing error:", exn)
Error("Processing failed")
}
}
This comprehensive documentation provides both conceptual understanding for newcomers and detailed reference material for experienced developers. The message system forms the backbone of reventless's event-sourcing architecture, enabling reliable, traceable, and scalable distributed systems.