Create a Simple .NET Application to Consume an Amazon SQS Queue
This is a multipart series on Interapplication Messaging with .NET and AWS Simple Queue Service (SQS). Checkout all the tutorials in the series.
Simple Interapplication Messaging with .NET and AWS Simple Queue Service
Create an AWS SQS Queue from the Console
Creating a Simple .NET AWS SQS Message Producer Application
Create a Simple .NET Application to Consume an AWS SQS Queue
In the last post, we focused on creating a .NET console application that created a message and then sends it to an Amazon SQS queue. In this post, we will turn our sights to creating an application that consumes messages from an Amazon SQS queue.
The Solution
In this post we will create a simple .NET console application that consumes messages from an Amazon SQS queue.
Remember, for any example solution from AWS with .NET, we focus on the code that exemplifies the problem we are trying to solve. We don’t include logging, input validation, exception handling, etc., and we embed the configuration data within classes instead of using environment variables, configuration files, key/value stores and the like. These items should not be skipped for proper solutions.
Prerequisites
To complete this solution you will need the .NET CLI which is included in the .NET SDK. In addition, you will need to create an AWS IAM user with programmatic access and with the appropriate permissions. After creating your IAM user, record the access key and secret for later use. You can omit these steps if you have already completed them from the previous post. If you have not completed the exercise in the first tutorial in this series, you will also need to create an SQS queue and record the queue URL.
Warning: some AWS services may have fees associated with them.
Developing The .NET Amazon SQS Consumer
First, we’ll create the AWS SQS Consumer by entering the following command in the console.
$ dotnet new console --name SQSConsumer
After creating the console application, we need to navigate to the directory of that app.
$ cd SQSConsumer
Now, let’s add a package reference to the latest .NET SQS SDK.
$ dotnet add package AWSSDK.SQS
OK, now that we have our application and a reference to the SQS SDK, let’s create a class for our SQS Consumer.
namespace SQSConsumer
{
public class SQSMessageConsumer{
public SQSMessageConsumer()
{
}
}
}
NOTE: The method of storing and using credentials below is used for simplicity. Following one of these other methods to store your AWS credentials with .NET is recommended.
Now that we have our class, let’s flesh out some fields to hold our data.
private String _accessKey = "<AWS ACCESS KEY>";
private String _secret = "<AWS SECRET>";
private String _queueUrl = "<AWS SQS QUEUE URL>";
private String _awsregion = "<AWS REGION>";
private String _messageWaitTimeSeconds = 20;
private String _maxNumberOfMessages = 10;
private String _delay = 0;
private AmazonSQSClient _sqsClient;
private bool _isPolling;
private CancellationTokenSource _source;
With the fields complete, let’s configure the class in the constructor. We set the _sqsClient field to a new instance of AmazonSQSClient and we wire up a new event handler to the Console.CancelKeyPress event. More about that later.
public SQSMessageConsumer()
{
BasicAWSCredentials basicCredentials = new BasicAWSCredentials(_accessKey, _secret);
RegionEndpoint region = RegionEndpoint.GetBySystemName(_awsregion);
_sqsClient = new AmazonSQSClient(basicCredentials, region);
Console.CancelKeyPress += new ConsoleCancelEventHandler(CancelKeyPressHandler);
}
With the fields initialized and the class configured, we are now ready to consume items from the SQS queue. Let’s encapsulate the polling logic in a method named, Listen.
In the Listen method, we first set the _isPolling field to true. We will iterate through a while loop until the _isPolling field is set to false. We’ll touch on the _isPolling field a bit more later on. The Listen methods hands off a lot of the heavy lifting to the FetchFromQueue method. The _delay field with the Console.Sleep method, simply gives the loop a pause in between iterations to give us a bit of control on how chatty this solution is. Of note, is the catch and finally blocks; The catch block will catch any TaskCanceledExceptions and the finally block will dispose of the CancellationTokenSource variable. We’ll see how all that fits together a little later.
public async Task Listen()
{
_isPolling = true;
int i = 0;
try{
_source = new CancellationTokenSource();
_token = _source.Token;
while(_isPolling){
i++;
Console.Write(i + ": ");
await FetchFromQueue();
Thread.Sleep(_delay);
}
}
catch(TaskCanceledException ex){
Console.WriteLine("Application Terminated: " + ex.Message);
}
finally{
_source.Dispose();
}
}
Let’s take a look at the FetchFromQueue method. Here we abstract the SQS SDK calls. First we need to new up a ReceiveMessageRequest object. From there, we configure the ReceiveMessageRequest object by setting the QueueUrl, WaitTimeSeconds and MaxNumberOfMessages properties. The QueueUrl property specifies the queue that we are going to poll for data, the MaxNumberOfMessages property lets us specify how large of a batch of data we would like to pull out of the queue, and finally, the WaitTimeSeconds is the amount of time that the request will wait for messages to be delivered to the queue, before the request returns.
Next, we call ReceiveMessageAsync on the AmazonSQSClient object, _sqsClient, that we created in the constructor. For this call we pass in the ReceiveMessageRequest object as well as the cancelation token, _token.
We then wait for the call to return and inspect the ReceiveMessageResponse that was returned. If there are any messages from the SQS queue, we loop through them and output the message text to the console. After the message text is output to the console, the message is deleted from the SQS queue by calling DeleteMessageAsync.
private async Task FetchFromQueue(){
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
receiveMessageRequest.QueueUrl = _queueUrl;
receiveMessageRequest.MaxNumberOfMessages = _maxNumberOfMessages;
receiveMessageRequest.WaitTimeSeconds = _messageWaitTimeSeconds;
ReceiveMessageResponse receiveMessageResponse = await
_sqsClient.ReceiveMessageAsync(receiveMessageRequest, _token);
if (receiveMessageResponse.Messages.Count != 0)
{
for (int i = 0; i < receiveMessageResponse.Messages.Count; i++)
{
string messageBody = receiveMessageResponse.Messages[i].Body;
Console.WriteLine("Message Received: " + messageBody);
await DeleteMessageAsync(receiveMessageResponse.Messages[i].ReceiptHandle);
}
}
else{
Console.WriteLine("No Messages to process");
}
}
For the DeleteMessageAsync method we abstract the SQS SDK calls to delete the message from SQS. We first need to instantiate and configure a DeleteMessageRequest object. Similarly to the ReceiveMessageRequest object, we need to set the QueueUrl property. In addition, we need to set the ReceiptHandle property. The ReceiptHandle is used to identify the request that contained the SQS message. Finally, we call DeleteMessageAsync passing in the DeleteMessageRequest object.
private async Task DeleteMessageAsync(string recieptHandle)
{
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest();
deleteMessageRequest.QueueUrl = _queueUrl;
deleteMessageRequest.ReceiptHandle = recieptHandle;
DeleteMessageResponse response = await
_sqsClient.DeleteMessageAsync(deleteMessageRequest);
}
You may remember that in the Listen method we are iterating through a while loop and the loop will continue to run until _isPolling is set to false. We also wired up a new event handler for the Console.CancelKeyPress event.
In the handler, we want to cancel any SDK request and stop the loop. This will allow the user to safely end the application by typing Control + C. Let’s take a look at the handler.
protected void CancelKeyPressHandler(object sender, ConsoleCancelEventArgs args){
args.Cancel = true;
_source.Cancel();
_isPolling = false;
}
The final step is to instantiate an SQSMessageConsumer object and call the Listen method within Program.cs.
namespace SQSConsumer
{
class Program
{
static async Task Main(string[] args)
{
SQSMessageConsumer sqSConsumer = new SQSMessageConsumer();
await sqSConsumer.Listen();
}
}
}
That’s it. You have completed developing a simple .NET application to consume an Amazon SQS queue.
Continue on to the next tutorial in this series: Simple Interapplication Messaging with .NET and AWS Simple Queue Service.