Developing .NET Core Applications for Apache Kafka and Amazon MSK
Skip the detail and show me the solution.
This is a multipart series on Developing for Apache Kafka and Amazon MSK (Amazon Managed Streaming for Apache Kafka). Checkout these other tutorials to get started with Apache Kafka.
How to Build an Apache Kafka Development Server
How to Build a Distributed Apache Kafka Development Cluster
Apache Kafka is an open-source, distributed platform for building high performance, real-time streaming data pipelines/applications as well as a platform that allows you to integrate messaging into your applications – think pub/sub. In the last few years Apache Kafka’s popularity has sky rocketed because it helps solve so many cloud computing problems due to its high throughput, scalability and high availability.
In the past, Kafka used to be a bit of a bear to properly configure and deploy. Amazon thankfully came along and created a service that hides a lot of the complexity involved in building, configuring and deploying a Kafka cluster. Enter Amazon MSK. However, Apache Kafka is now easier than ever to spin up in a development environment. So, we will start developing .NET constructs in a development environment and then take a look at migrating to MSK. And, the code will need little modifications as we will be utilizing native Apache Kafka APIs.
Before you start, it will help if you are familiar with Apache Kafka zookeepers and Apache Kafka brokers and their relationship. Also, we will not address securing Apache Kafka as that is out of scope for this tutorial.
The Solution
In this example we will build a messaging solution. We will first build a message consumer that listens for records that come into an Apache Kafka topic. We will then build a message producer that will create records for the Apache Kafka topic. This solution will hardly flex the muscles of Apache Kafka, but it will be a good introduction to see how some of the different pieces of Apache Kafka fit together.
Remember, for any example solution from AWS with .NET, we focus on the code that exemplifies the problem we are trying to solve. We don’t include logging, input validation, exception handling, etc., and we embed the configuration data within classes instead of using environment variables, configuration files, key/value stores and the like. These items should not be skipped for proper solutions.
Prerequisites
To complete this solution you will need the .NET CLI which is included in the .NET SDK.
Warning: some AWS services may have fees associated with them.
1. Make note of some properties from your environment.
Record the name of the topic that you will be producing to and consuming from.
Record the FQDN or IP address of the broker. If you are using a distributed environment, choose one of the brokers.
2. Developing The .NET Apache Kafka Consumer.
Let’s start out by creating a .NET Core application for the consumer.
$ dotnet new console --name KafkaConsumer
While we are in a command line state of mind, let’s add a reference to the Confluent.Kafka package which contains the Kafka API. First we must cd into the KafkaConsumer directory and then run the following dotnet command.
$ dotnet add package Confluent.Kafka --version 1.5.2
Next, we will create the class for our Kafka Consumer.
namespace KafkaConsumer
{
public class KafkaMessageConsumer{
public KafkaMessageConsumer()
{
}
}
}
Now we can add the fields that will hold our configuration data.
private readonly string _bootstrapServers;
private readonly string _groupId;
private readonly AutoOffsetReset _autoOffsetReset;
private readonly List<String> _topics;
We’ll also add a few more fields. A field for our looping control variable and a field for the cancelation token.
private bool _isConsuming;
private readonly CancellationTokenSource _cts;
In the constructor we will set the fields for our Kafka configuration. For this example, we will use a topic with the name of, “test-topic”. The _bootstrapServers field is a string that holds a space delimited “list” of our brokers. The _groupId is the unique identifier for the group of clients that we will use to consume the topic. In this case, we only have one consumer, but we could have a fleet of consumers that eat and process the records from this topic and they all would share the same group Id for Apache Kafka. It is important to note that whether you have one client or a fleet of clients, Apache Kafka will only deliver a message to a group one time.
public KafkaMessageConsumer()
{
_bootstrapServers = "192.168.1.42:9092";
_groupId = "dotnet-kafka-client";
_autoOffsetReset = AutoOffsetReset.Earliest;
_topics = new List<String>() { "test-topic" };
_cts = new CancellationTokenSource();
Console.CancelKeyPress += new ConsoleCancelEventHandler(CancelKeyPressHandler);
}
Finally, let’s take a look at the Consume method from top to bottom.
public void Consume(){
var config = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
GroupId = _groupId,
AutoOffsetReset = _autoOffsetReset
};
try{
_consumer = new ConsumerBuilder<String, String>(config).Build();
_consumer.Subscribe(_topics);
_isConsuming = true;
int i = 0;
while (_isConsuming){
i++;
Console.WriteLine(i + ": ");
ConsumeResult<String, String> consumeResult = _consumer.Consume(_cts.Token);
Console.WriteLine(consumeResult.Message.Value);
}
}
catch (OperationCanceledException ex){
Console.WriteLine("Application was ended: " + ex.Message.ToString());
}
catch(Exception ex){
Console.WriteLine("Application Crashed: " + ex.Message);
}
finally{
if (_consumer != null){
_consumer.Close();
((IDisposable)_consumer).Dispose();
}
}
}
Let’s take a closer look at the Consume method. First, we create the ConsumerConfig object, passing in the details about the Apache Kafka cluster that we built.
var config = new ConsumerConfig{
BootstrapServers = _bootstrapServers,
GroupId = _groupId,
AutoOffsetReset = _autoOffsetReset
};
We then set the _consumer to an instance of IConsumer via the ConsumerBuilder and then subscribe to the topics.
_consumer = new ConsumerBuilder<String, String>(config).Build();
_consumer.Subscribe(_topics);
Now the loop kicks off and as we iterate, we consume messages from our Apache Kafka installation. As messages come in from Apache Kafka we write the message to the console.
while (_isConsuming)
{
i++;
Console.WriteLine(i + ": ");
ConsumeResult<String, String> consumeResult = _consumer.Consume(_cts.Token);
Console.WriteLine(consumeResult.Message.Value);
}
To round out the KafkaConsumer, all we need to do is instantiate a KafkaMessageConsumer object and call the Consume method within Program.cs.
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var kafkaConsumer = new KafkaMessageConsumer();
kafkaConsumer.Consume();
}
}
}
Open a terminal in the KafkaConsumer directory/folder and run our Apache Kafka consumer app with the following command.
$ dotnet run
3. The .NET Apache Kafka Producer.
Let’s start out by creating a .NET Core application for our Apache Kafka producer app.
$ dotnet new console --name KafkaProducer
Again, let’s add a reference to the Confluent.Kafka package which contains the Kafka API. We’ll need to cd into the KafkaProducer directory and then run the following dotnet command.
$ dotnet add package Confluent.Kafka --version 1.5.2
Next, we create the class for our KafkaMessageProducer.
namespace KafkaProducer
{
public class KafkaMessageProducer{
public KafkaMessageProducer()
{
}
}
}
Just like in the consumer, we will store the Apache Kafka configuration data in fields.
private readonly String _bootstrapServers;
private readonly String _topic;
And, in the constructor, we will set the fields. For this example, we will use a topic with the name of, “test-topic”. The _bootstrapServers field is a string that holds a space delimited “list” of our brokers.
public KafkaMessageProducer()
{
_bootstrapServers = "192.168.1.42:9092";
_topic = "test-topic";
}
Let’s take a look at the Produce method in its entirety.
public void Produce(){
var config = new ProducerConfig{
BootstrapServers = _bootstrapServers
};
try{
_producer = new ProducerBuilder<String, String>(config).Build();
_producer.Produce(_topic, new Message<string, string> {
Key = Guid.NewGuid().ToString(),
Value = "New Message: " + DateTime.Now.ToString() }, deliveryReportHandler);
_producer.Flush(TimeSpan.FromSeconds(_flushTime));
}
catch(Exception ex){
Console.WriteLine("Application Crashed: " + ex.Message);
}
finally{
if (_producer != null){
((IDisposable)_producer).Dispose();
}
}
}
Now, let’s take a closer look at the Produce method. First, we create the ProducerConfig object, passing in the details about the Apache Kafka Cluster.
var config = new ProducerConfig{
BootstrapServers = _bootstrapServers
};
We then instantiate an IProducer using the ProducerBuilder and then call Produce on the IProducer to send the message to Apache Kafka.
_producer = new ProducerBuilder<String, String>(config).Build();
_producer.Produce(_topic, new Message<string, string> {
Key = Guid.NewGuid().ToString(),
Value = "New Message: " + DateTime.Now.ToString() },
deliveryReportHandler
);
_producer.Flush(TimeSpan.FromSeconds(_flushTime));
And, the final piece is instantiating the KafkaMessageProducer object and calling the Produce method within Program.cs. Each time the KafkaMessageProducer app is run, it will create one message to the “test-topic” Apache Kafka topic.
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
var kafkaMessageProducer = new KafkaMessageProducer();
kafkaMessageProducer.Produce();
}
}
}
Open a terminal in the KafkaProducer directory/folder and run our Apache Kafka producer app with the following command.
$ dotnet run
Now, go over to your consumer and you should see the message that was just sent by the producer.
See the full example at GitHub.
That’s a Wrap!
That’s all there is to it. You have now setup an interapplication messaging solution using .NET and Apache Kafka. Remember, this is just the tip of the iceberg for everything that Apache Kafka can do.
Stay tuned for our next Apache Kafka tutorial featuring Amazon MSK (Amazon Managed Streaming for Apache Kafka).