EventCollector → SQS FIFO + DynamoDB
EventCollector → SQS FIFO
The EventCollector adapter provides event consumption capabilities using SQS FIFO queues subscribed to SNS topics, enabling ReadModels and other components to receive and process events with ordering guarantees.
Queue Configuration
The EventCollector creates an SQS FIFO queue that subscribes to one or more EventTopic SNS topics:
let queue = PulumiAws.SQS.Queue.make(
~name,
~args={
PulumiAws.SQS.Queue.fifoQueue: true->Pulumi.Input.make,
deduplicationScope: MessageGroup,
fifoThroughputLimit: PerMessageGroupId,
contentBasedDeduplication: true->Pulumi.Input.make,
visibilityTimeoutSeconds: 30->Pulumi.Input.make,
redrivePolicy: Util_DeadLetterQueue.fifoQueue.arn
->Pulumi.Output.apply(dlqArn =>
PulumiAws.SQS.Queue.RedrivePolicy.make(~deadLetterTargetArn=dlqArn, ~maxReceiveCount=5)
)
->Pulumi.Output.asInput,
sqsManagedSseEnabled: false->Pulumi.Input.make,
tags: AWS.Tags.make(~name, Reventless.EventCollector.componentType),
},
~opts,
)
Configuration details:
fifoQueue: true- Enables FIFO ordering to preserve event order from EventTopicscontentBasedDeduplication: true- Prevents duplicate event processing using message body hashvisibilityTimeoutSeconds: 30- Events become invisible for 30 seconds while being processed- Prevents other consumers from processing the same event concurrently
- Shorter timeout than CommandTopic since event processing is typically faster
deduplicationScope: MessageGroup- Deduplication applied per message group (per aggregate)fifoThroughputLimit: PerMessageGroupId- High throughput mode per message group- Enables up to 3,000 messages per second per aggregate instance
redrivePolicy- Failed events move to dead letter queue after 5 retries- Shares the common FIFO dead letter queue (
Util_DeadLetterQueue.fifoQueue)
- Shares the common FIFO dead letter queue (
Event Ordering
FIFO Guarantees
Events are ordered by message group (aggregate ID):
- Events for the same aggregate are processed in strict order
- Events for different aggregates can be processed in parallel
- No reordering within a message group
// These events are guaranteed to be processed in order
// Event 1: Customer-1 Created
// Event 2: Customer-1 AddressChanged
// Event 3: Customer-1 Deleted
// This event can be processed in parallel
// Event 4: Customer-2 Created
Deduplication
Content-based deduplication prevents duplicate processing:
- 5-minute deduplication window
- SHA-256 hash of event content
- Per message group deduplication
Error Handling
Automatic Retries
Failed events are automatically retried:
- Event processing fails
- Message visibility timeout expires (30 seconds)
- Event becomes visible in queue again
- Lambda is triggered again
- Process repeats up to 5 times (maxReceiveCount)
Dead Letter Queue
After 5 failed attempts, events move to the Dead Letter Queue:
- Shared FIFO DLQ for all EventCollectors
- Failed events can be inspected manually
- Events can be reprocessed after fixing issues
- Prevents blocking of subsequent events
Partial Failure Handling
The EventCollector handles batch processing failures:
// Process batch of events
await handleEvents(events)
// Delete successfully processed SQS messages
// DynamoDB stream position is automatically checkpointed
Performance Considerations
Throughput
- Per aggregate: Up to 3,000 events/second
- System-wide: Limited only by number of distinct aggregates
- Batching: Multiple events processed per Lambda invocation
Latency
- SNS → SQS: ~100-200ms from publish to processing
- DynamoDB Streams: ~50-100ms from write to processing
- Factors: Queue backlog, Lambda cold starts, event complexity
Cost Optimization
- Batch processing: Process multiple events per Lambda invocation
- Visibility timeout: Set appropriately to avoid premature retries
- DLQ monitoring: Clean up failed events to avoid storage costs
- Right-size Lambda: Match memory/timeout to processing needs
Pulumi
The EventCollector component creates these infrastructure resources:
type outputs = {
name: string, // Component name
resources: array<resource>, // SQS queue resources
}
SNS Topic Subscription
The EventCollector automatically subscribes to EventTopic SNS topics during deployment:
let eventTopicResources =
eventTopics
->Js.Dict.values
->Array.map(outputs => outputs.resources->Array.getUnsafe(0))
Subscription flow:
- EventTopic publishes - Events are published to SNS FIFO topic by EventLog or Aggregate
- SNS delivers - SNS pushes events to all subscribed SQS FIFO queues
- SQS queues - EventCollector's SQS queue receives events with ordering preserved
- Lambda polls - Lambda continuously polls the queue and invokes handlers
Key benefits of SNS → SQS pattern:
- Decoupling - EventTopics don't need to know about EventCollectors
- Buffering - SQS queues buffer events during processing delays or Lambda cold starts
- Parallel consumers - Multiple EventCollectors can subscribe to the same EventTopic
- Retry isolation - Failed events in one EventCollector don't affect others
- FIFO preservation - Ordering is maintained end-to-end from EventTopic through SQS to Lambda
Multi-Source Support
The EventCollector can consume events from two sources:
SNS → SQS Pattern
Events flow from EventTopic (SNS) to EventCollector (SQS FIFO):
Benefits:
- Decoupling - EventTopics don't need to know about EventCollectors
- Buffering - SQS queues buffer events during processing delays
- Parallel consumers - Multiple EventCollectors can subscribe to same EventTopic
- Retry isolation - Failed events in one EventCollector don't affect others
- FIFO preservation - Ordering maintained end-to-end
DynamoDB Streams Pattern
Events flow directly from EventLog DynamoDB table via streams:
Benefits:
- Lower latency - Direct stream from source, no intermediate topic
- Simpler architecture - Fewer components in the path
- Automatic checkpointing - Lambda manages stream position
- Change data capture - Access to both old and new item images
Handle Channel Event
let handleDynamoDbOrSqsEvent = (queue, handleEvents) => async (
event: PulumiAws.Lambda.CallbackFunction.event,
_,
) => {
let records = event.records
// Parse events from either SQS or DynamoDB Stream records
let jsons = records->Array.filterMap(record =>
switch record.eventSource {
| "aws:sqs" =>
record->PulumiAws.SQS.Queue.asRecord->Util.SQS_Runtime.parseSqsRecord
| "aws:dynamodb" =>
switch record
->PulumiAws.DynamoDb.Stream.asRecord
->Util.DynamoDbStream_Runtime.parseDynamoDbStreamRecordEvent {
| NewImage(_, newImage)
| NewAndOldImage(_, newImage, _) =>
Some(newImage)
| _ =>
Js.log(__MODULE__ ++ ".handleChannelEvent: no NewImage included in Stream event !")
None
}
| eventSource =>
Js.log2(__MODULE__ ++ ".handleChannelEvent: ignoring record from eventSource:", eventSource)
None
}
)
// Process events
switch await handleEvents(jsons) {
| _ =>
// Delete successfully processed SQS messages
let entries =
records
->Array.filter(record =>
switch record.eventSource {
| "aws:sqs" => true
| _ => false
}
)
->Array.mapWithIndex((record, idx): AwsSdk.SQS.DeleteMessageBatchCommand.deleteMessageBatchEntry => {
id: idx->string_of_int,
receiptHandle: (record->PulumiAws.SQS.Queue.asRecord).receiptHandle,
})
switch entries {
| [] => ()
| entries => await Util.SQS_Runtime.deleteMessages(entries, queue)
}
}
}
Processing flow:
- Detect event source - Inspect
record.eventSourceto determine if the event came from SQS or DynamoDB - Parse events - Use appropriate parser for each source:
- SQS events: Extract JSON message body from
record.body - DynamoDB events: Extract
NewImagefrom stream record (new or updated item)
- SQS events: Extract JSON message body from
- Handle events - Pass all parsed events to the
handleEventscallback function - Acknowledge SQS messages - Delete successfully processed SQS messages from the queue
- DynamoDB stream handling - No acknowledgment needed; stream position is managed by Lambda
Key features:
- Multi-source support - Single handler processes events from both SQS and DynamoDB Streams
- Source-specific parsing - Different parsing logic for each event source
- Selective acknowledgment - Only deletes SQS messages (DynamoDB stream position is automatic)
- Error resilience - Parse errors are logged but don't crash the handler
- Batch processing - All events in the Lambda invocation are processed together
DynamoDB Stream Event Types
When consuming from DynamoDB Streams, the EventCollector handles different event types:
NewImage- New item inserted into EventLog tableNewAndOldImage- Item updated in EventLog table (both old and new versions)OldImage- Item removed from EventLog table (ignored by EventCollector)KeysOnly- Only key attributes included (ignored by EventCollector)
The EventCollector only processes NewImage and NewAndOldImage events, extracting the new item data for event handling.
Enqueue Event Operation
The EventCollector also provides an enqueueEvent function for programmatically adding events to its queue:
let enqueueEvent = queue
->Util_SQS.toRuntimeQueueOutput
->Pulumi.Output.apply(runtimeQueue =>
EventCollectorChannel_SQS_Runtime.enqueueEvent(runtimeQueue, ...)
)
Runtime implementation:
let enqueueEvent = (queue: Util_SQS_Runtime.runtimeQueue, delay, _id, messageBody) => {
let queueName = queue.name
Js.log4(__MODULE__ ++ ".enqueueEvent:", delay, messageBody, queueName)
queue->Util_SQS_Runtime.sendMessage(~delay, messageBody)
}
Key features:
- Programmatic enqueueing - Allows components to directly enqueue events without publishing to SNS
- Delay support - Events can be delayed before becoming visible in the queue
- Direct queue access - Bypasses SNS topic, useful for testing or internal event routing
When to Use DynamoDB Streams vs SQS Subscription
Use SQS subscription (SNS → SQS) when:
- Consuming events from EventTopics that may have multiple publishers
- Need buffering and retry isolation from other EventCollectors
- Want to decouple EventCollectors from event sources
- Prefer SNS fan-out pattern for multiple subscribers
- Need to subscribe/unsubscribe dynamically
Use DynamoDB Stream subscription when:
- Consuming events directly from EventLog table
- Need lowest possible latency (no SNS hop)
- Want guaranteed delivery of all events (stream is source of truth)
- Processing events in near real-time as they're written to EventLog
- Building projections that must stay synchronized with EventLog
Deploy-time to Runtime Flow
The EventCollector adapter follows the standard deploy-time/runtime pattern:
let make = (~name, ~eventTopics, ~opts) => {
// 1. Deploy-time: Create SQS FIFO queue
let queue = PulumiAws.SQS.Queue.make(~name, ~args={...}, ~opts)
// 2. Subscribe to EventTopics (SNS subscriptions created automatically)
let eventTopicResources =
eventTopics
->Js.Dict.values
->Array.map(outputs => outputs.resources->Array.getUnsafe(0))
{
parts: {queue: queue},
resources: eventTopicResources->Array.concat([queue->Util_SQS.toResource]),
// 3. Runtime: Enqueue events programmatically
enqueueEvent: queue
->Util_SQS.toRuntimeQueueOutput
->Pulumi.Output.apply(runtimeQueue =>
EventCollectorChannel_SQS_Runtime.enqueueEvent(runtimeQueue, ...)
),
// 4. Runtime: Handle incoming events from SQS or DynamoDB
handleChannelEvent: handleEvents =>
queue
->Util_SQS.toRuntimeQueueOutput
->Pulumi.Output.apply(runtimeQueue =>
runtimeQueue->(EventCollectorChannel_SQS_Runtime.handleDynamoDbOrSqsEvent(handleEvents, ...))
),
}
}
Flow steps:
- Create SQS queue - Pulumi provisions the SQS FIFO queue resource
- Subscribe to topics - SNS subscriptions are created for each EventTopic
- Extract metadata -
toRuntimeQueueOutputconverts the queue to runtime metadata (URL, name, ARN) - Bind runtime functions -
Pulumi.Output.applybinds handler functions to queue metadata - Lambda execution - Runtime functions execute in Lambda, polling SQS and processing events