Asynchronous Handlers in Brighter

Introduction In distributed systems, I/O operations like HTTP requests, database queries, and message streaming are foundational to workflows. While .NET’s async/await pattern efficiently manages these operations, integrating asynchronous handlers with Brighter requires explicit configuration. This article explores how to enable async request handlers in Brighter and explains the architectural rationale behind its design, rooted in the Reactor and Proactor patterns. The Problem: Async Handlers Without Configuration When using Brighter's SqsSubscription or KafkaSubscription, attempting to use RequestHandlerAsync without proper configuration results in errors: Paramore.Brighter.ConfigurationException: Error when building pipeline, see inner Exception for details ---> System.InvalidCastException: Unable to cast object of type 'GreetingHandler' to type 'Paramore.Brighter.IHandleRequests'. This occurs because Brighter defaults to the Reactor pattern (synchronous I/O) and cannot infer whether your handler requires the Proactor pattern (asynchronous I/O). Solution: Enable Async Processing To resolve this, explicitly set isAsync: true in your subscription configuration: .AddServiceActivator(opt => { opt.Subscriptions = [ new SqsSubscription( new SubscriptionName("greeting-subscription"), // Optional new ChannelName("greeting-queue"), // SQS queue name new RoutingKey("greeting.topic".ToValidSNSTopicName()), // SNS Topic Name bufferSize: 2, isAsync: true) // Enable async processing ]; opt.ChannelFactory = new ChannelFactory(connection); }) This allows you to use RequestHandlerAsync for non-blocking I/O: public class GreetingHandler(ILogger logger) : RequestHandlerAsync { public override Task HandleAsync(Greeting command) { logger.LogInformation("Processing {Name}", command.Name); await Task.Delay(1_000); // Simulate async I/O (e.g., HTTP call) logger.LogInformation("Hello {Name}", command.Name); return await base.HandleAsync(command); } } The Message Pump: Core of Brighter's Processing Brighter uses a single-threaded message pump to ensure message ordering and avoid race conditions. The pump follows three steps: GetMessage: Read from the queue. Translate Message: Deserialize into a .NET type. Dispatch Message: Route to the appropriate handler. Why Single-Threaded Pumps? Alternative approaches, like BlockingCollection or thread-per-message, introduce critical flaws: Approach Issues Multithreaded Pump Risks reordering messages, violating FIFO guarantees. Thread Pool Exhausts threads under load, causing bottlenecks with semaphores. Brighter's solution is a single-threaded message pump, ensuring in-order processing while avoiding thread contention. Reactor vs. Proactor Patterns Brighter's message pump can operate in two modes, determined by the isAsync flag: Reactor Pattern (isAsync: false) Synchronous I/O : Processes messages sequentially on a single thread. Predictable Performance : Avoids thread pool overhead and context switching. Limitation: Blocks on I/O, reducing throughput for long-running operations. Proactor Pattern (isAsync: true) Asynchronous I/O: Uses async/await to avoid blocking, enabling higher throughput. Thread Pool Integration: Leverages .NET’s SynchronizationContext to preserve message order while handling I/O concurrently. Trade-off: Slightly higher overhead due to async state management. Why Brighter Requires isAsync Brighter cannot auto-detect whether your handler uses synchronous or asynchronous I/O because: Resource Allocation: The Reactor/Proactor choice impacts thread management and memory usage. Deadlock Prevention: Async handlers require a dedicated pipeline to avoid thread pool starvation. Performance Guarantees: Explicit configuration ensures optimal throughput and latency for your workload. Conclusion Brighter's isAsync flag is a deliberate design choice that balances performance and scalability: Avoid Runtime Errors: Explicitly declare async handlers to prevent InvalidOperationException. Preserve Order: Single-threaded pumps ensure messages are processed sequentially, even with async I/O. By aligning with established patterns, Brighter delivers thread-safe, efficient messaging for distributed systems. Reference Brighter ADR: Single-Threaded Message Pump Brighter ADR: Async Pipeline Support GitHub full code

May 2, 2025 - 09:29
 0
Asynchronous Handlers in Brighter

Introduction

In distributed systems, I/O operations like HTTP requests, database queries, and message streaming are foundational to workflows. While .NET’s async/await pattern efficiently manages these operations, integrating asynchronous handlers with Brighter requires explicit configuration. This article explores how to enable async request handlers in Brighter and explains the architectural rationale behind its design, rooted in the Reactor and Proactor patterns.

The Problem: Async Handlers Without Configuration

When using Brighter's SqsSubscription or KafkaSubscription, attempting to use RequestHandlerAsync without proper configuration results in errors:

Paramore.Brighter.ConfigurationException: Error when building pipeline, see inner Exception for details
 ---> System.InvalidCastException: Unable to cast object of type 'GreetingHandler' to type 'Paramore.Brighter.IHandleRequests'.

This occurs because Brighter defaults to the Reactor pattern (synchronous I/O) and cannot infer whether your handler requires the Proactor pattern (asynchronous I/O).

Solution: Enable Async Processing

To resolve this, explicitly set isAsync: true in your subscription configuration:

.AddServiceActivator(opt =>
            {
                opt.Subscriptions = [
                    new SqsSubscription<Greeting>(
                        new SubscriptionName("greeting-subscription"), // Optional
                        new ChannelName("greeting-queue"), // SQS queue name
                        new RoutingKey("greeting.topic".ToValidSNSTopicName()), // SNS Topic Name
                        bufferSize: 2,
                        isAsync: true) // Enable async processing
                ];
                opt.ChannelFactory = new ChannelFactory(connection);
            })

This allows you to use RequestHandlerAsync for non-blocking I/O:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandlerAsync<Greeting>
{
    public override Task<Greeting> HandleAsync(Greeting command)
    {
        logger.LogInformation("Processing {Name}", command.Name);
        await Task.Delay(1_000);  // Simulate async I/O (e.g., HTTP call)
        logger.LogInformation("Hello {Name}", command.Name);
        return await base.HandleAsync(command);
    }
}

The Message Pump: Core of Brighter's Processing

Brighter uses a single-threaded message pump to ensure message ordering and avoid race conditions. The pump follows three steps:

  1. GetMessage: Read from the queue.
  2. Translate Message: Deserialize into a .NET type.
  3. Dispatch Message: Route to the appropriate handler.

Why Single-Threaded Pumps?

Alternative approaches, like BlockingCollection or thread-per-message, introduce critical flaws:

Approach Issues
Multithreaded Pump Risks reordering messages, violating FIFO guarantees.
Thread Pool Exhausts threads under load, causing bottlenecks with semaphores.

Brighter's solution is a single-threaded message pump, ensuring in-order processing while avoiding thread contention.

Reactor vs. Proactor Patterns

Brighter's message pump can operate in two modes, determined by the isAsync flag:

Reactor Pattern (isAsync: false)

  • Synchronous I/O : Processes messages sequentially on a single thread.
  • Predictable Performance : Avoids thread pool overhead and context switching.
  • Limitation: Blocks on I/O, reducing throughput for long-running operations.

Proactor Pattern (isAsync: true)

  • Asynchronous I/O: Uses async/await to avoid blocking, enabling higher throughput.
  • Thread Pool Integration: Leverages .NET’s SynchronizationContext to preserve message order while handling I/O concurrently.
  • Trade-off: Slightly higher overhead due to async state management.

Why Brighter Requires isAsync

Brighter cannot auto-detect whether your handler uses synchronous or asynchronous I/O because:

  1. Resource Allocation: The Reactor/Proactor choice impacts thread management and memory usage.
  2. Deadlock Prevention: Async handlers require a dedicated pipeline to avoid thread pool starvation.
  3. Performance Guarantees: Explicit configuration ensures optimal throughput and latency for your workload.

Conclusion

Brighter's isAsync flag is a deliberate design choice that balances performance and scalability:

  • Avoid Runtime Errors: Explicitly declare async handlers to prevent InvalidOperationException.
  • Preserve Order: Single-threaded pumps ensure messages are processed sequentially, even with async I/O.

By aligning with established patterns, Brighter delivers thread-safe, efficient messaging for distributed systems.

Reference