.NET Client for Apache Kafka¶
Confluent develops and maintains confluent-kafka-dotnet, a .NET library that provides a high-level producer, consumer and AdminClient compatible with all Apache Kafka® brokers version 0.8 and later, Confluent Cloud and Confluent Platform. You can find a changelog of release updates in the GitHub client repo.
Note
- For a step-by-step guide on building a .NET Client client application for Kafka, see Getting Started with Apache Kafka and .NET.
- For a training course, see Apache Kafka for .NET Developers.
.NET Client Installation¶
confluent-kafka-dotnet is made available via NuGet. It’s a binding to the C client librdkafka, which is provided automatically via the dependent librdkafka.redist package for a number of popular platforms - linux-x64, osx-arm64 (Apple Silicon), osx-x64, win-x64, and win-x86.
confluent-kafka-dotnet is compatible with the .NET Framework >= v4.6.2, .NET Core >= v1.0 and .NET Standard >= v1.3. Mono is not officially supported.
In addition to the Confluent.Kafka
package, we provide the Confluent.SchemaRegistry
,
Confluent.SchemaRegistry.Serdes.Protobuf
, Confluent.SchemaRegistry.Serdes.Json
and
Confluent.SchemaRegistry.Serdes.Avro
packages for integration with Confluent Schema Registry.
.NET Client Example Code¶
For a step-by-step tutorial using the .NET client including code samples for the producer and consumer see this guide.
There are also a number of examples demonstrating various aspects of the client in the client github repo and an additional example here.
Producer¶
To create a .NET Producer, first construct an instance of the ProducerConfig
class,
then pass this into the ProducerBuilder
’s constructor:
using Confluent.Kafka;
using System.Net;
...
var config = new ProducerConfig
{
BootstrapServers = "host1:9092",
...
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
...
}
To write messages to Kafka, you can use the ProduceAsync
method:
var result = await producer.ProduceAsync("weblog", new Message<Null, string> { Value="a log message" });
The Task
returned by ProduceAsync
will not complete until the success (or otherwise) of the request is
known. As such, await``ing the ``ProduceAsync
call will result in very low throughput except in highly
concurrent applications such as an ASP.NET request handlers. In that case, near concurrent requests will be
automatically batched together for efficiency by the client behind the scenes.
For high throughput processing, you can utilize Tasks.WhenAll
to await
a block of ProduceAsync
requests simultaneously. Alternatively, you can use the Produce
method:
public static void handler(DeliveryReport<Null, string>)
{
...
}
public static process(...)
{
...
producer.Produce(
"my-topic", new Message<Null, string> { Value = "hello world" }, handler);
}
The Produce
method is also asynchronous, in that it never blocks. Message delivery
information is made available out-of-band via the (optional) delivery report handler on a background
thread. The Produce
method maps more directly to the underlying librdkafka produce API, so
comes with a bit less overhead than ProduceAsync
. ProduceAsync
is still very performant
though - capable of producing hundreds of thousands of messages per second on typical hardware.
The .NET client is mature and fully featured. For in depth information on its capability, refer to the
librdkafka documentation.
One feature worth calling out is support for idempotent produce - simply set the EnableIdempotence
configuration property to true
for exactly once, in order delivery guarantees. Another is support
for the transactional producer API. For example of how to use transactions from .NET, refer to the
ExactlyOnce example
in the GitHub repo.
Consumer¶
Initialization¶
To create a .NET Consumer, first construct an instance of the ConsumerConfig
class,
then pass this into the ConsumerBuilder
’s constructor:
using System.Collections.Generic;
using Confluent.Kafka;
...
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
...
}
The GroupId
property is mandatory and specifies which consumer group the
consumer is a member of. The AutoOffsetReset
property specifies what
offset the consumer should start reading from ONLY in the event there are no
committed offsets for a partition, or the committed offset is invalid
(perhaps due to log truncation).
The Consume Loop¶
A typical Kafka consumer application is centered around a consume loop, which
repeatedly calls the Consume
method to retrieve records one-by-one that
have been efficiently pre-fetched by the consumer in background threads. Before
entering the consume loop, you’ll typically use the Subscribe
method to
join a group. The consumer instance will then be assigned to fetch from an
exclusive subset of the partitions managed by the group.
...
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// handle consumed message.
...
}
consumer.Close();
}
Note that disposing the consumer instance after you are finished
using it (achieved with the using
block in the above example)
will ensure that active sockets are closed and internal state is
cleaned up. In order to leave the group cleanly - i.e. commit
final offsets and trigger a group rebalance which ensures that
any partitions owned by the consumer are re-assigned to other
members in the group in a timely fashion - you additionally need
to call the Close
method prior to disposing.
Auto Offset Commit¶
By default, the .NET Consumer will commit offsets automatically. This
is done periodically by a background thread at an interval specified
by the AutoCommitIntervalMs
config property. An offset becomes
eligible to be committed immediately prior to being delivered to
the application via the Consume
method.
This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread.
Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically.
Synchronous Commits¶
The C# client allows you to commit offsets explicitly via the
Commit
method. In the following example, a synchronous commit
is triggered every commitPeriod
messages:
var config = new ConsumerConfig
{
...
// Disable auto-committing of offsets.
EnableAutoCommit = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
For processing failures that result in an application crash, the Commit
method provides “at least once” delivery semantics. In this scenario, the offset
corresponding to a message is only committed after the message has been
successfully processed. For errors where there is no application crash, the
application will call the Seek()
method and move backward to the failed offset.
If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get “at most once” delivery semantics.
Note
You should generally avoid blocking network calls (including synchronous
use of Commit
) because of the ramifications for throughput.
Store Offsets¶
The auto offset commit capability in the .NET Client is actually quite flexible.
As outlined above, by default, the offsets to be committed to Kafka are updated
immediately prior to the Consume
method that delivers messages to the
application. However, you can prevent this from happening by setting the
EnableAutoOffsetStore
config property to false
. You can then use the
StoreOffset
method to specify the offsets you would like to be committed by
the background thread. You can call this as many times as you like, but the
offset commit request will only sent to the group coordinator periodically with
the most recently specified offset(s) for the owned partitions. This approach is
preferred over the synchronous commit approach outlined in the previous section.
The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop:
var config = new ConsumerConfig
{
...
EnableAutoCommit = true // (the default)
EnableAutoOffsetStore = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
consumer.StoreOffset(consumeResult);
}
Committing During A Rebalance¶
A consumer that is subscribed to a group is only permitted to commit offsets
for the subset of partitions that it owns. By default, a group rebalance
is handled automatically by the consumer and happens autonomously
behind the scenes. These semantics introduce a potential race condition
where you may try to commit or store an offset, but meanwhile a rebalance
has caused the consumer to lose ownership of the partition(s) you
are trying to commit the offset for. If this happens, the Commit
or
StoreOffset
call will throw a KafkaException
with ErrorCode
equal to Local_State
(“Erroneous state”). In the context of at-least
once processing, you can interpret this error as a warning that the
message will be processed again by another consumer in the group (the new
owner) and ignore it.
Alternatively, you can specify a (possibly empty) partitions revoked
handler. This will effectively avoid the race condition because rebalance
handlers are called as a side effect of a call to Consume
on
the application thread - the rebalance is effectively blocked whilst
you process the consumed message and store or commit offsets.
...
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetPartitionsRevokedHandler((c, partitions) =>
{
...
})
.Build())
{
...
}
Suggested Resources¶
- Developer tutorial: Getting Started with Apache Kafka and .NET
- Free course: Apache Kafka 101
- Blog post: Designing the .NET API for Apache Kafka
- Blog post: Implement a Cross-Platform Apache Kafka Producer and Consumer with C# and .NET