.NET Client for Apache Kafka

Confluent develops and maintains confluent-kafka-dotnet, a .NET library that provides a high-level producer, consumer and AdminClient compatible with all Apache Kafka® brokers version 0.8 and later, Confluent Cloud and Confluent Platform. You can find a changelog of release updates in the GitHub client repo.

Note

.NET Client Installation

confluent-kafka-dotnet is made available via NuGet. It’s a binding to the C client librdkafka, which is provided automatically via the dependent librdkafka.redist package for a number of popular platforms - linux-x64, osx-arm64 (Apple Silicon), osx-x64, win-x64, and win-x86.

confluent-kafka-dotnet is compatible with the .NET Framework >= v4.6.2, .NET Core >= v1.0 and .NET Standard >= v1.3. Mono is not officially supported.

In addition to the Confluent.Kafka package, we provide the Confluent.SchemaRegistry, Confluent.SchemaRegistry.Serdes.Protobuf, Confluent.SchemaRegistry.Serdes.Json and Confluent.SchemaRegistry.Serdes.Avro packages for integration with Confluent Schema Registry.

.NET Client Example Code

For a step-by-step tutorial using the .NET client including code samples for the producer and consumer see this guide.

There are also a number of examples demonstrating various aspects of the client in the client github repo and an additional example here.

Producer

To create a .NET Producer, first construct an instance of the ProducerConfig class, then pass this into the ProducerBuilder’s constructor:

using Confluent.Kafka;
using System.Net;

...

var config = new ProducerConfig
{
    BootstrapServers = "host1:9092",
    ...
};

using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
    ...
}

To write messages to Kafka, you can use the ProduceAsync method:

var result = await producer.ProduceAsync("weblog", new Message<Null, string> { Value="a log message" });

The Task returned by ProduceAsync will not complete until the success (or otherwise) of the request is known. As such, await``ing the ``ProduceAsync call will result in very low throughput except in highly concurrent applications such as an ASP.NET request handlers. In that case, near concurrent requests will be automatically batched together for efficiency by the client behind the scenes.

For high throughput processing, you can utilize Tasks.WhenAll to await a block of ProduceAsync requests simultaneously. Alternatively, you can use the Produce method:

public static void handler(DeliveryReport<Null, string>)
{
    ...
}

public static process(...)
{
    ...
    producer.Produce(
        "my-topic", new Message<Null, string> { Value = "hello world" }, handler);
}

The Produce method is also asynchronous, in that it never blocks. Message delivery information is made available out-of-band via the (optional) delivery report handler on a background thread. The Produce method maps more directly to the underlying librdkafka produce API, so comes with a bit less overhead than ProduceAsync. ProduceAsync is still very performant though - capable of producing hundreds of thousands of messages per second on typical hardware.

The .NET client is mature and fully featured. For in depth information on its capability, refer to the librdkafka documentation. One feature worth calling out is support for idempotent produce - simply set the EnableIdempotence configuration property to true for exactly once, in order delivery guarantees. Another is support for the transactional producer API. For example of how to use transactions from .NET, refer to the ExactlyOnce example in the GitHub repo.

Consumer

Initialization

To create a .NET Consumer, first construct an instance of the ConsumerConfig class, then pass this into the ConsumerBuilder’s constructor:

using System.Collections.Generic;
using Confluent.Kafka;

...

var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    ...
}

The GroupId property is mandatory and specifies which consumer group the consumer is a member of. The AutoOffsetReset property specifies what offset the consumer should start reading from ONLY in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

The Consume Loop

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the Consume method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in background threads. Before entering the consume loop, you’ll typically use the Subscribe method to join a group. The consumer instance will then be assigned to fetch from an exclusive subset of the partitions managed by the group.

...

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    consumer.Subscribe(topics);

    while (!cancelled)
    {
        var consumeResult = consumer.Consume(cancellationToken);

        // handle consumed message.
        ...
    }

    consumer.Close();
}

Note that disposing the consumer instance after you are finished using it (achieved with the using block in the above example) will ensure that active sockets are closed and internal state is cleaned up. In order to leave the group cleanly - i.e. commit final offsets and trigger a group rebalance which ensures that any partitions owned by the consumer are re-assigned to other members in the group in a timely fashion - you additionally need to call the Close method prior to disposing.

Auto Offset Commit

By default, the .NET Consumer will commit offsets automatically. This is done periodically by a background thread at an interval specified by the AutoCommitIntervalMs config property. An offset becomes eligible to be committed immediately prior to being delivered to the application via the Consume method.

This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread.

Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically.

Synchronous Commits

The C# client allows you to commit offsets explicitly via the Commit method. In the following example, a synchronous commit is triggered every commitPeriod messages:

var config = new ConsumerConfig
{
    ...
    // Disable auto-committing of offsets.
    EnableAutoCommit = false
}

...

while (!cancelled)
{
    var consumeResult = consumer.Consume(cancellationToken);

    // process message here.

    if (consumeResult.Offset % commitPeriod == 0)
    {
        try
        {
            consumer.Commit(consumeResult);
        }
        catch (KafkaException e)
        {
            Console.WriteLine($"Commit error: {e.Error.Reason}");
        }
    }
}

For processing failures that result in an application crash, the Commit method provides “at least once” delivery semantics. In this scenario, the offset corresponding to a message is only committed after the message has been successfully processed. For errors where there is no application crash, the application will call the Seek() method and move backward to the failed offset.

If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get “at most once” delivery semantics.

Note

You should generally avoid blocking network calls (including synchronous use of Commit) because of the ramifications for throughput.

Store Offsets

The auto offset commit capability in the .NET Client is actually quite flexible. As outlined above, by default, the offsets to be committed to Kafka are updated immediately prior to the Consume method that delivers messages to the application. However, you can prevent this from happening by setting the EnableAutoOffsetStore config property to false. You can then use the StoreOffset method to specify the offsets you would like to be committed by the background thread. You can call this as many times as you like, but the offset commit request will only sent to the group coordinator periodically with the most recently specified offset(s) for the owned partitions. This approach is preferred over the synchronous commit approach outlined in the previous section.

The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop:

var config = new ConsumerConfig
{
    ...
    EnableAutoCommit = true // (the default)
    EnableAutoOffsetStore = false
}

...

while (!cancelled)
{
    var consumeResult = consumer.Consume(cancellationToken);

    // process message here.

    consumer.StoreOffset(consumeResult);
}

Committing During A Rebalance

A consumer that is subscribed to a group is only permitted to commit offsets for the subset of partitions that it owns. By default, a group rebalance is handled automatically by the consumer and happens autonomously behind the scenes. These semantics introduce a potential race condition where you may try to commit or store an offset, but meanwhile a rebalance has caused the consumer to lose ownership of the partition(s) you are trying to commit the offset for. If this happens, the Commit or StoreOffset call will throw a KafkaException with ErrorCode equal to Local_State (“Erroneous state”). In the context of at-least once processing, you can interpret this error as a warning that the message will be processed again by another consumer in the group (the new owner) and ignore it.

Alternatively, you can specify a (possibly empty) partitions revoked handler. This will effectively avoid the race condition because rebalance handlers are called as a side effect of a call to Consume on the application thread - the rebalance is effectively blocked whilst you process the consumed message and store or commit offsets.

...

using (var consumer = new ConsumerBuilder<Ignore, string>(config)
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        ...
    })
    .Build())
{
    ...
}

API Documentation

Click here to view the .NET Client API documentation.