Asynchronous processing is a common pattern in modern applications. It allows you to offload long-running tasks and events to a background job, so the user does not need to wait. In .NET applications, a background service is available to handle these tasks. This technique can be used on the AWS cloud platform to build a scalable and reliable background processor.
In this take, you will learn how to build a background processor using AWS Fargate and Amazon Simple Queue Service (SQS). Fargate is a serverless compute engine for containers that allows you to run Docker containers in the cloud. You will use the AWS Cloud Development Kit (CDK) to define the infrastructure as code. You will build a Docker image for the background processor and deploy it to AWS Fargate via the CDK stack. You will also test the processor by sending messages to an SQS queue.
The sample code for this is available on GitHub at net-aws-asyncprocessor. I highly recommend you clone the repository and follow along because it will help you understand the concepts better. The repository contains the source code for the background processor, and the CDK stack to deploy it to AWS. Because there are many steps involved, it is easier to follow along by running the code than doing this from scratch.
I will assume you have some experience with .NET and AWS. You should have the .NET SDK installed on your machine and an AWS account. You should also have the AWS CLI and CDK tools installed on your machine. If you don’t have these tools installed, you can follow the instructions in the AWS CDK documentation.
Tools and Technologies
The following tools and technologies will be used in this article:
To validated these are installed, you can run the following commands:
1 2 3 4 |
dotnet --version cdk --version aws --version docker --version |
If any of these commands fail, you need to install the tool. Instructions for installing these tools are available on their respective websites and it is beyond the scope of this article.
Processor Docker Image
The background processor is a .NET console application that listens to an SQS queue for messages. When a message is received, it processes the message and deletes it from the queue.
The BackgroundService
base class makes it seamless to create an asynchronous background task. The processor can run continuously, handle any crashes, and restart gracefully. You inherit from the BackgroundService
class and override the ExecuteAsync
method to implement the processing logic. A cancellation token is passed to the method to handle graceful shutdowns.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public class WorkerProcessor( ILogger<WorkerProcessor> _logger, IAmazonSQS _sqsClient, IWorkerHandler _handler, IOptions<QueueConfiguration> config) : BackgroundService { private readonly string _sqsUrl = config.Value.SqsUrl; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("WorkerProcessor is starting."); while (!stoppingToken.IsCancellationRequested) { try { var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = _sqsUrl, // only ten messages at a time MaxNumberOfMessages = 10, // poll for messages every 20 seconds WaitTimeSeconds = 20 }; var response = await _sqsClient.ReceiveMessageAsync( receiveMessageRequest, stoppingToken); foreach (var message in response.Messages) { // handle the message one at a time and await await _handler.ProcessMessage(message, stoppingToken); } } catch (TaskCanceledException) { _logger.LogInformation("WorkerProcessor is stopping."); } catch (Exception ex) { _logger.LogError(ex, ex.Message); } } } } |
The WorkerProcessor
class listens to an SQS queue for messages only ten at a time and it polls for messages every 20 seconds. When a message is received, it is passed to the IWorkerHandler
interface for processing. If an error occurs, the message is put back in the queue. The cancellation token throws an exception which is caught to stop the processor gracefully.
The IWorkerHandler
simply processes a single message and deletes it from the queue when it is successfully processed. SQS automatically retries messages that are not deleted from the queue via the VisibilityTimeout
setting. When a message is received, it is hidden from other consumers for a specified time because it is in flight. If the message is not deleted within the timeout, it becomes visible again for other consumers to process. After a certain number of retries, the message is moved to a dead-letter queue for further investigation.
This is what the logic looks like in general in a flowchart.
The code to process a message is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public interface IWorkerHandler { Task ProcessMessage(Message message, CancellationToken cancellationToken); } public class WorkerHandler( ILogger<WorkerHandler> _logger, IAmazonSQS _sqsClient, IOptions<QueueConfiguration> config) : IWorkerHandler { private readonly string _sqsUrl = config.Value.SqsUrl; public async Task ProcessMessage(Message message, CancellationToken cancellationToken) { try { _logger.LogInformation("Processing message {MessageId}", message.MessageId); // Simulate processing time await Task.Delay(2000, cancellationToken); _logger.LogInformation("Body: {Body}", message.Body); _logger.LogInformation("Message {MessageId} processed", message.MessageId); await _sqsClient.DeleteMessageAsync(_sqsUrl, message.ReceiptHandle, cancellationToken); } catch (TaskCanceledException) { throw; } catch (Exception ex) { _logger.LogError(ex, ex.Message); } } } |
Because SQS handles retries and dead-letter queues automatically, you don’t have to worry about message processing failures. Any exceptions thrown in the message processing logic are caught and logged, and the message is left in the queue for retry. If the message is processed successfully, it is deleted from the queue.
The cancellation token is rethrown to stop the processor gracefully. The processor stops when the cancellation token is signaled. This can happen when the application is stopped or when the processor is scaled down.
The CDK sets an environment variable with the SQS name for the processor to use in the docker container. To configure the processor, you simply read the QUEUE_NAME
environment variable to get the SQS URL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public static class DependencyResolution { public static IServiceCollection AddAsyncProcessor(this IServiceCollection services) { services.Configure<QueueConfiguration>(queue => { queue.Region = ApplicationConstants.Region; queue.Account = ApplicationConstants.Account; queue.Name = Environment.GetEnvironmentVariable("QUEUE_NAME") ?? throw new ArgumentNullException(nameof(queue.Name)); }); services.AddHostedService<WorkerProcessor>(); services.AddSingleton<IWorkerHandler, WorkerHandler>(); services.AddSingleton<IAmazonSQS, AmazonSQSClient>(); return services; } } |
In the sample code, there is an account number and region in the ApplicationConstants
class. To make this docker image run in Fargate, be sure to set the correct account and region in the ApplicationConstants
class.
1 2 3 4 5 |
public static class ApplicationConstants { public const string Account = "123456789012"; public const string Region = "us-east-1"; } |
To test the processor locally, simply do dotnet run
in the src/AsyncProcessor
directory. Be sure to create a queue in your AWS account and set the `QUEUE_NAME
` environment variable to the queue name.
Lastly, build the Docker image for the processor by running docker build -t asyncprocessor .
in the root directory. This will build the Docker image with the processor code and dependencies.
The Dockerfile is as follows:
1 2 3 4 5 6 7 8 9 10 11 |
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build WORKDIR /src COPY ["src/AsyncProcessor/AsyncProcessor.csproj" , "src/AsyncProcessor/AsyncProcessor.csproj"] RUN dotnet restore "src/AsyncProcessor/AsyncProcessor.csproj" COPY ["src/AsyncProcessor", "src/AsyncProcessor"] RUN dotnet publish "src/AsyncProcessor/AsyncProcessor.csproj" -c Release -o /src/publish --runtime linux-x64 --self-contained false FROM base as final WORKDIR /app COPY --from=build /src/publish . ENTRYPOINT ["dotnet", "AsyncProcessor.dll"] |
Fargate runs the Docker image in a Linux container. The Docker image is built with the runtime as the base image and the SDK as the build image. The final image is built with the runtime image and the published code. This technique reduces the size of the Docker image and makes it more efficient to run in Fargate.
The processor code remains unit testable because everything is nicely decoupled. I will omit the unit tests for brevity, but you can find them in the sample code.
CDK Stack
The CDK stack defines the infrastructure for the background processor so it can be deployed to AWS. The stack creates an SQS queue, an ECS cluster, and a Fargate service to run the processor. The processor is run as a task in the Fargate service. The CDK stack is defined in the src/AsyncProcessor.Cdk
directory.
Luckily, the CDK has constructs for all the AWS services you need to deploy. This keeps the code simple and easy to understand. The CDK stack is defined in the ServiceStack
class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class ServiceStack : Stack { public ServiceStack( Construct scope, string id, IStackProps? props = null) : base(scope, id, props) { var vpc = new Vpc(this, "vpc"); var cluster = new Cluster(this, "cluster", new ClusterProps { Vpc = vpc }); _ = new QueueProcessingFargateService( this, "queue-processing-service", new QueueProcessingFargateServiceProps { Cluster = cluster, Image = ContainerImage.FromAsset("."), LogDriver = new AwsLogDriver(new AwsLogDriverProps { StreamPrefix = "async-processor-service", LogRetention = RetentionDays.ONE_DAY }) }); } } |
This seems small, but it does a lot. The QueueProcessingFargateService
creates an ECS service that runs the processor as a task in Fargate. This auto scales the processor based on the number of messages in the queue via alerts in CloudWatch. When the CPU or memory usage is high, the service scales up to handle the load by spinning up more tasks. When the load is low, the service scales down to save costs by spinning down tasks. This is the beauty of Fargate and ECS by doing this via the CDK stack.
The default scaling settings are fairly reasonable. You can adjust the settings in the `QueueProcessingFargateServiceProps` class. These are the properties available:
- Cpu: The number of CPU units to allocate to the task. Default is 256.
- MemoryLimitMiB: The amount of memory to allocate to the task. Default is 512.
- Cooldown: The time to wait before scaling again. Default is 300 seconds.
- MinScalingCapacity: The minimum number of tasks to run; must be at least 1. Default is 1.
- MaxScalingCapacity: The maximum number of tasks to run. Default is 2.
- CpuTargetUtilizationPercent: The target CPU utilization percentage to maintain. Default is 50.
These properties can be set directly in the props class.
The SQS auto scaling is based on the number of messages in the queue. The default is 100 messages. When the number of messages in the queue is above the threshold, the service scales up. This setting cannot be adjusted in the construct class directly.
A dead-letter queue is automatically created for the main SQS queue. This is where messages are moved when they fail to process after a certain number of retries. The default is five retries. You can adjust the number of retries in the QueueProcessingFargateServiceProps
class.
The docker image is built from the current directory because this is where the Dockerfile is located. The CDK command will build the Docker image and push it to the ECR repository. This is done automatically via the ContainerImage.FromAsset()
method. One alternative is to build the Docker image manually and push it to the ECR repository. This is more work, but it is still useful when you want to test the image locally before deploying it to AWS.
The log driver is set to CloudWatch logs with a retention of one day. The default is never to delete logs. You can adjust the retention period in the AwsLogDriverProps
class.
The `Cluster` is a construct that creates an ECS cluster in the VPC. This is where the Fargate service runs the processor tasks. You can scale the cluster horizontally by adding more instances to the cluster. Or you can scale vertically by increasing the CPU and memory of the instances.
Be sure to set the region and account in the main Program.cs
file. This is used to set the environment settings for the CDK stack.
1 2 3 4 5 6 7 8 |
_ = new ServiceStack(app, "async-processor-stack", new StackProps { Env = new Amazon.CDK.Environment { Account = "123456789012", // your account Region = "us-east-1" // your region } }); |
The QueueProcessingFargateService
construct automatically creates an SQS queue. There is no need to create the queue separately because there is only one queue per Fargate service. This is how the service scales based on the number of messages in the queue. The construct remains opinionated and recommends only one SQS queue per Fargate service.
Deploy the CDK Stack
With the CDK stack defined, you can deploy it to AWS. The CDK CLI is used to deploy the stack to AWS. The CDK CLI is a wrapper around the CloudFormation template that is generated from the CDK code. The CDK CLI should be installed as a global tool on your machine.
To deploy the stack, you need to run the following commands:
1 2 |
cdk bootstrap cdk deploy |
You will only need to bootstrap once to set up the CDK environment in your AWS account. Any subsequent deploys will use the existing environment. The cdk deploy
command will deploy the stack to AWS. The CDK CLI will prompt you to confirm the deployment.
The whole deployment process takes a few minutes to complete. If this takes longer, you may have an issue with your AWS account or the CDK environment. Be sure to check the logs for any errors.
Test the Processor via SQS
Once the stack is deployed, make note of the SQS URL in the AWS console. You will need this to send messages to the queue.
To send messages to the queue, you can use the AWS CLI. The following command sends a message to the queue:
1 |
aws sqs send-message --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/async-processor-queue --message-body "Hello, World!" |
You can now track the message in the processor via CloudWatch logs. The message will be processed and deleted from the queue. If an error occurs, the message will be left in the queue for retry an eventually be moved to the dead-letter queue.
Be sure to clean up the resources after you are done testing. You can do this by running the following command:
1 |
cdk destroy |
Conclusion
In this article, you learned how to build a background processor using AWS Fargate and SQS. You used the AWS CDK to define the infrastructure as code. You built a Docker image for the processor and deployed it to AWS Fargate via the CDK. Lastly, you tested the processor by sending messages to an SQS queue. All this while ensuring the processor is scalable, reliable, and with minimal code.
Load comments