SideEffectHandler
For a short summary of SideEffectHandler, see Reventless Components Overview.
This component follows the Reventless Component Structure Pattern, using separate files for interface definitions (SideEffectHandler.res), builder logic (SideEffectHandler_Builder.res), and runtime callbacks (SideEffectHandler_Callback.res).
Overview
The SideEffectHandler enables event-driven integration with external systems by executing side effects in response to domain events. Unlike EventMapper (which generates commands), SideEffectHandler performs actions that don't affect aggregate state—such as sending emails, calling external APIs, or triggering webhooks.
Purpose and Responsibilities
- Responsibility: Listen to events from source aggregates; execute side effects (external API calls, notifications, etc.); provide scheduling capabilities for delayed/recurring side effects; integrate with external systems without affecting domain state
- In: Events from source EventTopics (via EventCollector)
- Out: External API calls, notifications, webhooks, scheduled tasks
SideEffect Spec
Each side effect is defined by implementing the SideEffect.T module type:
module type Source = {
let name: string
@schema
type event
}
module type T = {
module Source: Source
let execute: (
Source.Id.t, // Source aggregate ID
Message.meta, // Event metadata
Source.event, // The event
QueryEngine.operations, // Query operations
) => promise<unit>
}
Usage Pattern
Defining a SideEffect
@@reventless.spec
module Source = Order
let execute = async (orderId, _meta, event, _queryEngine) =>
switch event {
| Order.Placed({customerId}) =>
await EmailService.sendOrderConfirmation(
~email=customerId,
~orderId=orderId->Order.Id.toString,
)
| _ => ()
}
The Source module defines which aggregate's events to subscribe to — typically module Source = Order directly reuses the aggregate module. @@reventless.spec handles wiring; execute receives the decoded event and runs the side effect.
Registering SideEffects
// Collect all side effects for the application
let sideEffects: SideEffectHandler.sideEffects = [
module(Customer_EmailNotification),
module(Order_WebhookNotification),
module(Invoice_PDFGenerator),
]
Creating the SideEffectHandler
let sideEffectHandler = SideEffectHandler.make(
~name="NotificationHandler",
~sideEffects=SideEffects.sideEffects,
~allEventTopics,
~allCommandTopics,
~queryEngine,
~scheduler,
~opts=pulumiOptions,
)
Runtime Behavior
Event Processing Sequence
Event Matching
The SideEffectHandler matches events to side effects based on the source aggregate name:
// For each incoming event:
// 1. Extract event metadata
let eventMeta = event.meta
// 2. Find matching side effect by source name
let sideEffect = sideEffects->Array.find(
(module(SideEffect)) => SideEffect.Source.name == eventMeta.service
)
// 3. Execute if found
switch sideEffect {
| Some(sideEffect) =>
await SideEffect.execute(id, meta, event, queryEngine)
| None => () // No matching side effect
}
Integration Points
With EventCollector
The SideEffectHandler uses an EventCollector to subscribe to source EventTopics:
With Scheduler
The SideEffectHandler provides scheduling operations for delayed or recurring side effects:
type operations = {
enqueueEvent: EventCollector.enqueueEvent,
createSchedule: Schedule.create,
deleteSchedule: Schedule.delete,
}
Schedule Types
type rate =
| Single(year, month, day, hour, minute) // One-time execution
| Minutes(int) // Every N minutes
| Hours(int) // Every N hours
| Days(int) // Every N days
| Daily(hour, minute) // Daily at specific time
| Weekdays(hour, minute) // Weekdays at specific time
| WeekdaysAndSaturday(hour, minute) // Weekdays + Saturday
Creating Schedules
// Schedule a daily report
await sideEffectHandler.operations.createSchedule({
name: "daily-report",
rate: Daily(9, 0), // 9:00 AM daily
payload: "generate-daily-report",
})
// Schedule a one-time reminder
await sideEffectHandler.operations.createSchedule({
name: "reminder-123",
rate: Single(2026, 1, 30, 14, 0), // Jan 30, 2026 at 2:00 PM
payload: "send-reminder-123",
})
// Delete a schedule
await sideEffectHandler.operations.deleteSchedule("reminder-123")
Common Patterns
Email Notifications
@@reventless.spec
module Source = Order
let execute = async (orderId, _meta, event, _queryEngine) =>
switch event {
| Order.Placed({customerId}) =>
await EmailService.sendOrderConfirmation(
~email=customerId,
~orderId=orderId->Order.Id.toString,
)
| _ => ()
}
Webhook Integration
module Source = Order
let execute = async (orderId, meta, event, _queryEngine) =>
switch event {
| Order.Created(orderData) =>
await Webhook.post(
~url="https://partner.example.com/orders",
~payload={
"event": "order.created",
"orderId": orderId->Order.Id.toString,
"data": orderData,
"timestamp": meta.time,
},
)
| Order.Completed(completionData) =>
await Webhook.post(
~url="https://partner.example.com/orders",
~payload={
"event": "order.completed",
"orderId": orderId->Order.Id.toString,
"data": completionData,
"timestamp": meta.time,
},
)
| _ => ()
}
External API Integration
module Source = Payment
let execute = async (paymentId, meta, event, queryEngine) =>
switch event {
| Payment.Authorized({amount, customerId}) => {
let customer = await queryEngine.get(
~table="CustomerReadModel",
~id=customerId,
)
switch customer {
| Some({stripeCustomerId}) =>
let _ = await Stripe.charges.create({
amount: amount,
currency: "usd",
customer: stripeCustomerId,
description: `Payment ${paymentId->Payment.Id.toString}`,
})
| None =>
Js.log(`No Stripe customer for ${customerId}`)
}
}
| Payment.Refunded({amount, reason}) => {
let payment = await queryEngine.get(
~table="PaymentReadModel",
~id=paymentId->Payment.Id.toString,
)
switch payment {
| Some({stripeChargeId}) =>
let _ = await Stripe.refunds.create({
charge: stripeChargeId,
amount: amount,
reason: reason,
})
| None => ()
}
}
| _ => ()
}
PDF Generation
module Source = Invoice
let execute = async (invoiceId, meta, event, queryEngine) =>
switch event {
| Invoice.Generated({items, total, customerId}) => {
let customer = await queryEngine.get(
~table="CustomerReadModel",
~id=customerId,
)
switch customer {
| Some({name, address, email}) => {
// Generate PDF
let pdf = await PDFService.generate({
template: "invoice",
data: {
invoiceId: invoiceId->Invoice.Id.toString,
customerName: name,
customerAddress: address,
items: items,
total: total,
date: meta.time,
},
})
// Store PDF
await S3.putObject({
bucket: "invoices",
key: `${invoiceId->Invoice.Id.toString}.pdf`,
body: pdf,
})
// Send email with PDF
await EmailService.sendWithAttachment({
to: email,
subject: `Invoice #${invoiceId->Invoice.Id.toString}`,
body: "Please find your invoice attached.",
attachment: pdf,
})
}
| None => ()
}
}
| _ => ()
}
Scheduled Side Effects
module Source = Appointment
let execute = async (appointmentId, meta, event, queryEngine) =>
switch event {
| Appointment.Scheduled({date, customerId}) => {
// Schedule reminder 24 hours before
let reminderDate = date->Date.addDays(-1)
await scheduler.createSchedule({
name: `reminder-${appointmentId->Appointment.Id.toString}`,
rate: Single(
reminderDate->Date.getFullYear,
reminderDate->Date.getMonth,
reminderDate->Date.getDate,
9, 0 // 9:00 AM
),
payload: Js.Json.stringify({
"type": "appointment-reminder",
"appointmentId": appointmentId->Appointment.Id.toString,
"customerId": customerId,
}),
})
}
| Appointment.Cancelled => {
// Delete the scheduled reminder
await scheduler.deleteSchedule(
`reminder-${appointmentId->Appointment.Id.toString}`
)
}
| _ => ()
}
Error Handling
The SideEffectHandler includes comprehensive error handling:
Execution Errors:
- Side effect execution failures → logged, event processing continues
- External API errors → caught and logged
- Decoding errors → logged with context
Recovery:
- Failed events remain in EventCollector queue for retry
- Side effects should be idempotent when possible
- Use event metadata (msgId) for deduplication
// Error handling in execute function
let execute = async (id, meta, event, queryEngine) => {
try {
await performSideEffect(id, event)
} catch {
| err =>
// Log error but don't throw - allows other events to process
Js.log2("SideEffect error:", err)
// Optionally: send to error tracking service
await ErrorTracking.capture(err, {
sideEffect: "CustomerEmail",
eventId: meta.msgId,
aggregateId: id->Customer.Id.toString,
})
}
}
Pulumi
The SideEffectHandler component creates these infrastructure resources:
type outputs = {
name: string,
eventCollector: EventCollector.outputs,
}
type operations = {
enqueueEvent: EventCollector.enqueueEvent,
createSchedule: Schedule.create,
deleteSchedule: Schedule.delete,
}
Resource Naming:
- Component type:
reventless:SideEffectHandler - Resource name pattern:
{name}SideEffectHandler
Configuration:
memorySize- Lambda memory allocation (default: 2048 MB)timeout- Lambda timeout (default: 180 seconds)targets- Optional list of CommandTopic targets for permissions
Best Practices
Make Side Effects Idempotent
// ✅ Good: Check before sending
let execute = async (orderId, meta, event, queryEngine) =>
switch event {
| Order.Shipped({trackingNumber}) => {
// Check if notification already sent
let notification = await queryEngine.get(
~table="NotificationLog",
~id=meta.msgId,
)
switch notification {
| Some(_) =>
Js.log("Notification already sent, skipping")
| None => {
await sendNotification(...)
// Record that notification was sent
await queryEngine.save(
~table="NotificationLog",
~id=meta.msgId,
~data={sentAt: Date.now()},
)
}
}
}
| _ => ()
}
Handle External Service Failures Gracefully
// ✅ Good: Graceful degradation
let execute = async (id, meta, event, queryEngine) =>
switch event {
| Order.Created(data) => {
// Try primary service
try {
await PrimaryEmailService.send(...)
} catch {
| _ => {
// Fall back to secondary
try {
await SecondaryEmailService.send(...)
} catch {
| err =>
// Log but don't fail the entire handler
Js.log2("All email services failed:", err)
}
}
}
}
| _ => ()
}
Use QueryEngine for Context
// ✅ Good: Enrich side effect with read model data
let execute = async (orderId, meta, event, queryEngine) =>
switch event {
| Order.Shipped(_) => {
// Get full order details from read model
let order = await queryEngine.get(
~table="OrderReadModel",
~id=orderId->Order.Id.toString,
)
// Get customer details
let customer = await queryEngine.get(
~table="CustomerReadModel",
~id=order.customerId,
)
// Now have all context for rich notification
await sendShippingNotification(order, customer)
}
| _ => ()
}
Keep Side Effects Focused
// ✅ Good: One responsibility per side effect
module Order_EmailNotification = {
// Only handles email notifications
}
module Order_WebhookNotification = {
// Only handles webhooks
}
module Order_AnalyticsTracking = {
// Only handles analytics
}
// Register all separately
let sideEffects = [
module(Order_EmailNotification),
module(Order_WebhookNotification),
module(Order_AnalyticsTracking),
]
Related Components
- EventCollector - Subscribes to source EventTopics
- EventTopic - Source of events for side effects
- EventMapper - Alternative for generating commands (not side effects)
- Scheduler - Provides scheduling capabilities
- QueryDb - Read model access via QueryEngine
AWS Implementation
For detailed AWS implementation, see SideEffectHandler AWS adapter documentation (TBD).