KRaft Configuration for Confluent Platform¶
This document covers hardware recommendations, configuration, debugging tools, and monitoring options for running Apache Kafka® in KRaft (pronounced craft) mode.
Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
Hardware and JVM requirements¶
A production KRaft server can cover a wide variety of use cases. In general, you should run KRaft on a server with similar specifications to a server running ZooKeeper. In summary, for production, this is:
- Minimum of 4 GB of RAM
- Dedicated CPU core should be considered when the server is shared
- An SSD disk at least 64 GB in size is highly recommended
- JVM heap size of at least 1 GB is recommended
Currently, it is recommended that you run at least three (3) KRaft controllers in production.
For more details, see Hardware.
Configuration options¶
Consider that a KRaft controller is also a Kafka broker processing event records that contain metadata related to the Kafka cluster. This means that in most cases, if you set properties on brokers, you should apply the same property settings to your KRaft controllers.
There are some settings that must be included for a cluster to run in KRaft mode, and are unique per server, but there are other settings that you configure for a controller because you also have also set those properties for brokers in the cluster.
For a full list of configuration properties, see Kafka Broker and Controller Configuration Reference for Confluent Platform.
Settings for KRaft mode are listed in the following sections with links to the configuration reference for those properties.
Required settings¶
These entries must be included for each server (controllers and brokers) running in KRaft mode.
- process.roles
When you operate Apache Kafka® in KRaft mode, you must set the
process.roles
property. This property specifies whether the server acts as a controller, broker, or both, although currently both is not supported for production workloads. In KRaft mode, specific Kafka servers are selected to be controllers, storing metadata for the cluster in the metadata log, and other servers are selected to be brokers. The servers selected to be controllers will participate in the metadata quorum. Each controller is either an active or a hot standby for the current active controller.In a production environment, the controller quorum will be deployed on multiple nodes. This is called an ensemble. An ensemble is a set of 2n + 1 controllers where n is any number greater than 0. The odd number of controllers allows the controller quorum to perform majority elections for leadership. At any given time, there can be up to n failed servers in an ensemble and cluster will keep quorum. For example, with three controllers, the cluster can tolerate one controller failure. If at any time, quorum is lost, the cluster will go down. For production, you should have typically have 3 or 5 controllers, but at least 3. For more information, see Hardware.
- Type: string
- Default:
- Importance: required for KRaft mode
process.roles
can have the following values:Value Result Not set The server is assumed to be in ZooKeeper mode broker
The server operates only as a broker. controller
The server operates in isolated mode as a controller only. broker,controller
The server operates in combined mode, where it is both a broker and a controller. Combined mode is considered an early access feature and Confluent does not currently support combined mode for production workloads. However, combined mode can be used for local testing. For an example of combined mode, see the confluent-local Docker image. - node.id
The unique identifier for this server. Each node ID must be unique across all the brokers and controllers in a particular cluster. No two servers can have the same node ID regardless of their
process.roles
value. This identifier replacesbroker.id
, which is used when operating in ZooKeeper mode.- Type: int
- Default:
- Importance: required for KRaft mode
- controller.quorum.voters
A comma-separated list of quorum voters. All of the servers (controllers and brokers) in a Kafka cluster discover the quorum voters using this property, and you must identify all of the controllers by including them in the list you provide for the property.
Each controller is identified with their ID, host and port information in the format of
{id}@{host}:{port}
. Multiple entries are separated by commas and might look like the following:controller.quorum.voters=1@host1:port1,2@host2:port2,3@host3:port3
The node ID supplied in the
controller.quorum.voters
property must match the corresponding ID on the controller servers. For example, on controller1,node.id
must be set to1
. If a server is a broker only, its node ID should not appear in thecontroller.quorum.voters
list.- Type: string
- Default:
- Importance: required for KRaft mode
- controller.listener.names
A comma-separated list of
listener_name
entries for listeners used by the controller. On a node withprocess.roles=broker
, only the first listener in the list will be used by the broker. ZooKeeper-based brokers should not set this value. For KRaft controllers in isolated or combined mode, the node will listen as a KRaft controller on all listeners that are listed for this property, and each must appear in thelisteners
property. They shouldn’t appear in theadvertised.listeners
property, which is used in ZooKeeper mode.- Type: string
- Default: null
- Importance: required for KRaft mode.
Inter-broker listeners¶
Listeners are an important part of your configuration. In addition to controller.listener.names
described in the
previous section, you should configure how KRaft controllers will communicate with brokers. This can be
done with the security.inter.broker.protocol
property or the inter.broker.listener.name
property, but not both.
If inter.broker.listener.name
is set then it will be used as a key for lookup
in the listener.security.protocol.map
property to yield a security protocol,
otherwise security.inter.broker.protocol
will be used. The default for security.inter.broker.protocol
is PLAINTEXT
,
which is what will be used for communication with brokers if neither property is explicitly set.
Note that controllers do not listen at the inter.broker.listener.name
value, but this property defines
a listener that the brokers create, and controllers must specify in their security protocol and
configuration so it can communicate with the brokers.
Following are descriptions of these properties:
- inter.broker.listener.name
The listener name that is used for inter-broker communication. If this is not set, inter-broker communication is defined by the
security.inter.broker.protocol
property. Set one of these, but not both, or an error will occur. This property must be set on KRaft brokers, but note that you must also set this property for KRaft controllers because controllers sometimes need to talk to Kafka brokers in Confluent Platform.The inter-broker listener name for a controller node must not appear in
controller.listener.names
property, and this applies regardless of whether the node is a controller in isolated or combined mode. Following is an example configuration file for a KRaft controller that shows how to configure this property:process.roles=controller node.id=100 controller.quorum.voters=100@node1:9093,101@node2:9093,102@node3:9093 controller.listener.names=CONTROLLER listeners=CONTROLLER://:9093 inter.broker.listener.name=BROKER listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL # Define the controller's listener and how we will use it. listener.name.controller.ssl.keystore.location=/some/keystore/path listener.name.controller.ssl.truststore.location=/some/truststore/path # etc... # Define how we will use the broker's listener. # No keystore needed since the controller isn't listening here; only need a truststore. listener.name.broker.ssl.truststore.location=/some/truststore/path # etc...
- listener.security.protocol.map
- The security protocol to use for inter-broker communication specified by the
inter.broker.listener.name
property. The security protocol to use for the declared listener names. Note that this includes controller-to-broker communication with the listener identified by theinter.broker.listener.name
property for the controller. - security.inter.broker.protocol
- Security protocol used to communicate between brokers. Set this property or
inter.broker.listener.name
, but not both.
Other listeners and logs¶
Following are additional properties you should be familiar with.
- listeners
A comma-separated list of addresses where the socket server listens.
For controllers in isolated mode: Only controller listeners are allowed in this list when
process.roles=controller
, and this listener should be consistent withcontroller.quorum.voters
value. If not configured, the host name will be equal to the value ofjava.net.InetAddress.getCanonicalHostName()
with thePLAINTEXT
listener name, and port9092
.For controllers in combined mode, you should list the controller listeners as well as the broker listeners. For brokers: see listeners.
- Type: string with the format
listener_name://host_name:port
- Default: If not configured, the host name will be equal to the value of
java.net.InetAddress.getCanonicalHostName()
, withPLAINTEXT
listener name, and port9092
. Example:listeners=PLAINTEXT://your.host.name:9092
- Importance: high
- Type: string with the format
- metadata.log.dir
Use to specify where the metadata log for clusters in KRaft mode is placed after storage is formatted as described in Generate and format IDs. If not set, the metadata log is placed in the first log directory specified in the
log-dirs
property described below.- Type: string
- Default: null
- Importance: high
- log.dirs
If
metadata.log.dir
is not specified, the KRaft metadata log is placed in the first log directory specified by this property after storage is formatted as described in Generate and format IDs.- Type: string
- Default: null
- Importance: high
Controller configuration example¶
You can find the example KRaft configuration files in /etc/kafka/kraft/
.
You will see three different example files in this folder after you install Confluent Platform:
broker.properties
- An example of the settings to use when the server is a broker only.controller.properties
- An example of the settings to use when the server is a controller only.server.properties
- An example of the settings to use when the server is both a broker and a controller. This configuration is not supported for production use.
Following is an example excerpt from a properties file for a controller on a system with three controllers.
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode.
process.roles=controller
# The node id associated with this instance's roles.
node.id=1
# The connect string for the controller quorum.
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=CONTROLLER://controller1.example.com:9093
# A comma-separated list of the names of the listeners used by the controller.
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# How to communicate with brokers.
inter.broker.listener.name=BROKER
# Maps listener names to security protocols, the default is for them to be the same.
listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-controller-logs
# ... # Additional property settings to match broker settings.
Other properties¶
In most cases, if you have a property set on your brokers, the KRaft controller should have the same property setting. The following list provides an example of some of the settings you might have for a broker running in ZooKeeper mode, which also should be present for a KRaft controller. This is not an exhaustive list.
auto.create.topics.enable
compression.type
confluent.metrics.reporter.bootstrap.servers
confluent.license.topic.replication.factor
confluent.metadata.topic.replication.factor
default.replication.factor
delete.topic.enable
message.max.bytes
metrics.reporters
min.insync.replicas
num.partitions
offsets.retention.minutes
offsets.topic.replication.factor
transaction.state.log.replication.factor
transaction.state.log.min.isr
unclean.leader.election.enable
Settings for other Kafka and Confluent Platform components¶
When you use KRaft instead of ZooKeeper, you must use current, non-deprecated, configurations settings. The settings to use are described in the following table.
Feature | Allowed with ZooKeeper | Required with KRaft |
---|---|---|
Clients and services | zookeeper.connect=zookeeper:2181 |
bootstrap.servers=broker:9092 |
Schema Registry | kafkastore.connection.url=zookeeper:2181 |
kafkastore.bootstrap.servers=broker:9092 |
Administrative tools | kafka-topics --zookeeper zookeeper:2181 (deprecated) |
|
Retrieve Kafka cluster ID | zookeeper-shell zookeeper:2181 get/cluster/id |
From the command line, use kafka-metadata-quorum (See kafka-metadata-quorum)
or confluent cluster describe --url ,
or view metadata.properties .
or http://broker:8090 --output json |
Enable Confluent Metrics Reporter¶
You must enable the Metrics Reporter on each broker and controller in KRaft mode to see broker metrics in Confluent Control Center. Uncomment the following lines in the properties file for each broker and controller.
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
For more information, see Enabling Metrics Reporter.
Configure Confluent Control Center¶
The configuration settings for Confluent Control Center running in KRaft mode or ZooKeeper mode are mostly the same. However, for Confluent Control Center to function in KRaft mode, you must enable Confluent Metrics Reporter, which is disabled by default, for brokers and KRaft controllers. For details on how to enable it for standalone installations, see Enable Confluent Metrics Reporter.
For general Confluent Control Center configuration, see General settings.
Generate and format IDs¶
Before you start Kafka, you must use the kafka-storage tool with the random-uuid
command
to generate a cluster ID for each new cluster. You only need one cluster ID, which you will use to format
each node in the cluster.
bin/kafka-storage random-uuid
This results in output like the following:
q1Sh-9_ISia_zwGINzRvyQ
Then use the cluster ID to format storage for each node in the cluster with the kafka-storage
tool that is provided with Confluent Platform,
and the format
command like the following example, specifying the properties file for a controller.
bin/kafka-storage format -t q1Sh-9_ISia_zwGINzRvyQ -c etc/kafka/kraft/controller.properties
Previously, Kafka would format blank storage directories automatically and generate a new cluster ID automatically.
One reason for the change is that auto-formatting can sometimes obscure an
error condition. This is particularly important for the metadata log maintained by the controller and broker servers.
If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with
missing committed data. To configure the log directory, either set metadata.log.dir
or log.dirs
. For more
information, see Inter-broker listeners.
Configure SCRAM¶
To configure SCRAM for brokers in a Kafka cluster running in KRaft mode, you must create the credentials
before your brokers are up and running. You then use the --add-scram
option with the kafka-storage
tool.
For more information, see SASL for KRaft-based clusters.
Tools for debugging KRaft mode¶
Kafka provides tools to help you debug a cluster running in KRaft-mode.
Describe runtime status¶
You can describe the runtime state of the cluster metadata partition using the kafka-metadata-quorum tool
and specify either a Kafka broker with the --bootstrap-server
option or a
KRaft controller with the --bootstrap-controller
option.
For example, the following command specifies a broker and displays a summary of the metadata quorum:
bin/kafka-metadata-quorum --bootstrap-server host1:9092 describe --status
Output might look like the following:
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
You can specify a controller with the --bootstrap-controller
option. This is useful when
the brokers are not accessible.
bin/kafka-metadata-quorum --bootstrap-controller host1:9093 describe --status
Debug log segments¶
The kafka-dump-log tool tool can be used to debug the log segments and snapshots for the cluster metadata directory. The tool will scan the provided files and decode the metadata records. For example, the following command decodes and prints the records in the first log segment:
bin/kafka-dump-log --cluster-metadata-decoder --files tmp/kraft-controller-logs/_cluster_metadata-0/00000000000000023946.log
Inspect the metadata partition¶
The kafka-metadata-shell tool tool can be used to interactively inspect the metadata cluster. The following example shows how to open the shell.
kafka-metadata-shell --directory tmp/kraft-controller-logs/_cluster-metadata-0/
The shell will load, and after you are in the shell, you can explore the contents of the metadata log and then exit.
Loading...
[ Kafka Metadata Shell ]
>> ls
brokers configs features linkIds links shell topicIds topics
>> ls /topics
test
>> cat /topics/test/0/data
{
"partitionId" : 0,
"topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : null,
"addingReplicas" : null,
"leader" : 1,
"leaderEpoch" : 0,
"partitionEpoch" : 0
}
>> exit
Monitor KRaft¶
Following are some JMX metrics to monitor on the controller and broker when operating in KRaft mode. Some of the metrics depend on the setting for process.roles.
For more broker metrics, see Broker metrics. For more information, see KRaft monitoring.
KRaft quorum monitoring metrics¶
The following table lists KRaft controller quorum metrics.
Important
There is currently an issue with Confluent Control Center incorrectly reporting that a KRaft cluster is connected with ZooKeeper if multiple controllers fail or stop in a multi-controller cluster, but one KRaft controller is still running.
kafka.server:type=raft-metrics MBean name |
Description |
---|---|
append-records-rate |
The average number of records appended per second by the leader of the raft quorum. |
commit-latency-avg |
The average time in milliseconds to commit an entry in the raft log. |
commit-latency-max |
The maximum time in milliseconds to commit an entry in the raft log. |
current-epoch |
The current quorum epoch. |
current-leader |
The current quorum leader’s id; -1 indicates unknown. |
current-state |
The current state of this member; possible values are leader, candidate, voted, follower, unattached, observer. |
current-vote |
The current voted leader’s id; -1 indicates not voted for anyone. |
election-latency-avg |
The average time in milliseconds spent on electing a new leader. |
election-latency-max |
The maximum time in milliseconds spent on electing a new leader. |
fetch-records-rate |
The average number of records fetched from the leader of the raft quorum. |
high-watermark |
The high watermark maintained on this member; -1 if it is unknown. |
log-end-offset |
The current raft log end offset. |
number-unknown-voter-connections |
Number of unknown voters whose connection information is not cached. This value of this metric is always 0. |
poll-idle-ratio-avg |
The average fraction of time the client’s poll() is idle as opposed to waiting for the user code to process records. |
Other quorum metrics:
MBean | Description |
---|---|
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion |
Outputs the feature level of the current metadata version. |
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount |
The total number of times that a KRaft snapshot has been loaded since the process was started. |
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes |
The total size in bytes of the latest snapshot that the node has generated. If a snapshot has not been generated yet, this is the size of the latest snapshot that was loaded. If no snapshots have been generated or loaded, this is 0. |
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs |
The interval in milliseconds since the latest snapshot was generated. If no snapshot has been generated yet, this is the approximate time delta since the process was started. |
Controller metrics¶
With KRaft, Kafka added a new controller quorum to the cluster instead of the cluster being controlled by ZooKeeper. These controllers must be able to commit records for Kafka to be available so you need to monitor their health.
For the full list of KRaft metrics, see KRaft broker metrics and KRaft Quorum metrics.
kafka.controller:type=KafkaController MBean name |
Description |
---|---|
ActiveBrokerCount |
When using KRaft, the number of registered and unfenced brokers as observed by this controller. When using ZooKeeper, this value is the number of brokers known to the controller. |
ActiveControllerCount |
The number of active controllers on this node. Valid values are ‘0’ or ‘1’. Alert if the aggregated sum across all brokers in the cluster is anything other than 1 because there should be exactly one controller per cluster. |
FencedBrokerCount |
When using KRaft, the number of registered but fenced brokers as observed by this controller. When using ZooKeeper, this value is always 0. |
GlobalPartitionCount |
The number of all partitions in the cluster as observed by this controller. |
GlobalTopicCount |
The number of all topics in the cluster as observed by this controller. |
LastAppliedRecordLagMs |
Reports the difference between the local time and the append time of the last applied record batch. For active controllers the value of this lag is always zero. |
LastAppliedRecordOffset |
The offset of the last record that was applied by the controller to the cluster metadata partition. For the active controller this may include uncommitted records. For the inactive controller this always includes committed records only. |
LastAppliedRecordTimestamp |
The timestamp of the last record that was applied by the controller to the cluster metadata partition. |
LastCommittedRecordOffset |
The active controller reports the offset of the last committed offset it consumed.
Inactive controllers will always report the same value as LastAppliedRecordOffset . You can monitor the last committed offsets to see that they are advancing.
You can also use these metrics to check that all of the brokers and controllers are at a similar offset. |
LastAppliedRecordTimestamp |
The timestamp of the last record that was applied by the controller to the cluster metadata partition. |
MetadataErrorCount |
The number of times this controller node has encountered an error during metadata log processing. |
NewActiveControllerCount |
Counts the number of times this node has seen a new controller elected. A transition to the “no leader” state is not counted here. If the same controller as before becomes active, that still counts. |
EventQueueOperationsStartedCount |
The total number of controller event queue operations that were started. This count includes deferred operations. |
EventQueueOperationsTimedOutCount |
The total number of controller event queue operations that timed out before they could be performed. |
OfflinePartitionsCount |
The number of offline topic partitions (non-internal) as observed by this controller. |
PreferredReplicaImbalanceCount |
The count of topic partitions for which the leader is not the preferred leader. |
TimedOutBrokerHeartbeatCount |
The number of broker heartbeats that timed out on this controller since the process was started. Note that only active controllers handle heartbeats, so only they will see increases in this metric. |
ZkWriteDeltaTimeMs |
The number of milliseconds the KRaft controller took writing a delta into ZooKeeper. |
ZkWriteSnapshotTimeMs |
The number of milliseconds the KRaft controller took reconciling a snapshot into ZooKeeper. |
ZkWriteBehindLag |
The amount of lag in records that ZooKeeper is behind relative to the highest committed record in the metadata log. This metric will only be reported by the active KRaft controller. |
ControllerEventManager metrics:
kafka.controller:type=ControllerEventManager MBean name |
Description |
---|---|
EventQueueProcessingTimeMs |
A histogram of the time in milliseconds that requests spent being processed in the controller event queue. |
EventQueueTimeMs |
A histogram of the time in milliseconds that requests spent waiting in the controller event queue. |
KRaft Broker metrics¶
kafka.server:type=broker-metadata-metrics MBean name |
Description |
---|---|
last-applied-record-offset |
The offset of the last record from the cluster metadata partition that was applied by the broker. |
last-applied-record-timestamp |
The timestamp of the last record from the cluster metadata partition that was applied by the broker. |
last-applied-record-lag-ms |
The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker. |
metadata-load-error-count |
The number of errors encountered by the BrokerMetadataListener while loading the
metadata log and generating a new metadata delta based on it. |
metadata-apply-error-count |
The number of errors encountered by the BrokerMetadataPublisher
while applying a new metadata imaged based on the latest metadata delta. |