Configure Metadata Service (MDS) in Confluent Platform

The Confluent Platform Metadata Service (MDS) manages a variety of metadata about your Confluent Platform installation. Specifically, the MDS:

  • Hosts the cluster registry that enables you to keep track of which clusters you have installed.
  • Serves as the system of record for cross-cluster authorization data (including RBAC, and centralized ACLs), and can be used for token-based authentication.
  • Provides a convenient way to manage audit log configurations across multiple clusters.
  • Can be used to authenticate data (note that client authentication is not supported).

You can set up the MDS internally within a Kafka cluster that serves other functions, and manage permissions in the same way that a database stores permissions for users logging into the database itself. You can also use the MDS to store user data. For the Kafka cluster hosting MDS, you must configure MDS on each Kafka broker, and you should synchronize these configurations across nodes.

You can also set up MDS on a dedicated Kafka cluster, servicing multiple worker Kafka clusters in such a way that security information is isolated away from client data. In the case of role-based access control (RBAC), the MDS offers a single, centralized configuration context that, after it is set up for a cluster, saves administrators from the complex and time-consuming task of defining and assigning roles for each resource on an individual basis. The MDS can enforce the rules for RBAC, centralized audit logs, centralized ACLs, and the cluster registry on its host Kafka cluster and across multiple secondary clusters (such as Kafka, Connect, and Schema Registry). So you can use a single Kafka cluster hosting MDS to manage and secure multiple secondary Kafka, Connect, Schema Registry, and ksqlDB clusters.

The MDS listens for commands using HTTP on the default port 1. MDS maintains a local cache of authorization data that is persisted to an internal Kafka topic named _confluent-metadata-auth.

Running on a Kafka broker, you can optionally integrate MDS with LDAP to provide authentication and refreshable bearer tokens for impersonation. Note that impersonation is restricted to Confluent components. For details about configuring LDAP integration with RBAC, see Configure LDAP Authentication.

This topic includes the following configuration tasks:

Ready to get started?

Prerequisites

  • You must download self-managed Confluent Platform for your environment.

  • If your environment already includes, or will include, Active Directory (LDAP service), it must be configured as well. The configurations on this page are based on Microsoft Active Directory (AD). You must update these configurations to match your LDAP service. Nested LDAP groups are not supported.

  • Brokers running MDS must be configured with a separate listener for inter-broker communication. To access this listener, a user must be configured with ZooKeeper-based ACLs (not centralized ACLs) for authorization. If required, you can configure these users as super.users, but they cannot rely on access to resources using role-based or group-based access. The broker user must be configured as a super user or granted access using ACLs.

    Tip

    To avoid broker listener or inter-broker communication issues after you have migrated from ZooKeeper-based ACLs to centralized ACLs, you may want to include broker users in the list of super.users to ensure access.

  • Brokers will accept requests on the inter-broker listener port before the metadata for RBAC authorization has been initialized. However, requests on other ports are only accepted after the required metadata has been initialized, including any available LDAP metadata. Broker initialization only completes after all relevant metadata has been obtained and cached. When starting multiple brokers in an MDS cluster with a replication factor of 3 (default) for a metadata topic, at least three brokers must also be started simultaneously to enable initialization to complete on the brokers. Note that there is a timeout/retry limitation for this initialization, which you can specify in confluent.authorizer.init.timeout.ms. For details, refer to Configure Confluent Server Authorizer in Confluent Platform.

  • REST Proxy services that integrate with AD/LDAP using MDS will use the user login name as the user principal for authorization decisions. By default, this is also the principal used by brokers for users authenticating using SASL/GSSAPI (Kerberos). If your broker configuration overrides principal.builder.class or sasl.kerberos.principal.to.local.rules to create a different principal, the user principal used by brokers may be different from the principal used by other Confluent Platform components. In this case, you should configure ACLs and role bindings for your customized principal for broker resources.

Important

The user ID specified in group role bindings is case-specific, and must match the case specified in the AD record. Also note that when logging in as a super user, the login ID is also case-specific and must match the case specified for the user ID in role bindings.

Create a PEM key pair

You must create a PEM (Privacy-Enhanced Mail) key pair for use by the token service. This key pair is added to your server.properties file in the next section.

Important

MDS only loads the PKCS#1 PEM key format, which can be recognized by the presence of the RSA keyword in the header and footer of the key. The openssl genrsa command uses PKCS#1 as the default format until OpenSSL 1.1.1 is available.

  1. Create a PEM key.

    Note

    The PEM key length depends on the encryption method you are using (AES-256 or RSA-4096), and the bits should be based on the needs/requirements of the method being used.

    This example shows how to create the 2048-bit RSA private key, and stores the keys in a folder named <path-to-tokenKeypair.pem> (replace the path in <> to reflect your setup).

    mkdir <path-to-tokenKeypair.pem> && openssl genrsa -out <path-to-tokenKeypair.pem> 2048
    
  2. Extract the public key.

    openssl rsa -in <path-to-tokenKeypair.pem> -outform PEM -pubout -out <path-to-public.pem>
    

Note

Only use OpenSSL to create the PEM key files. Do not use Keytool; it is not a valid option and will result in an error during startup.

Create a PKCS#8 key

Note that MDS supports an unencrypted PKCS#8 key, but this section discusses using encrypted PKCS#8 keys.

An encrypted private key can be encrypted using any of the multiple algorithms that are supported. When the token endpoint is configured for MDS, it loads the keypair at startup time. Loading an encrypted keypair involves reading the encrypted keypair file, reading the passphrase, and decrypting the keypair with the provided password.

When MDS tries to decrypt the key with the given passphrase, the configured security provider is used to run the decryption logic. A few encryption algorithms are supported by the default security provider present with Java. For keypairs encrypted with such algorithms, no further configuration is needed.

For encryption algorithms that require a specific security provider to be configured, the security provider must be configured for the Kafka process inside which MDS is running. For example, if the keypair is encrypted with AES-256, a compatible security provider, such as the Bouncy Castle FIPS security provider must be configured. For the Bouncy Castle FIPS provider, you must specify the following configuration option in the Kafka server.properties file:

security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator

The Bouncy Castle FIPS provider supports two types of keystore formats: PKCS12 and BCFKS.

Note that when the Bouncy Castle FIPS provider is configured on an SSL-enabled Kafka cluster, additional steps are required to configure the provider.

For more information, see:

Default provider

To generate a PKCS#8-format encrypted keypair that works with the default security provider, run the following openssl commands. The first command generates a private key in PKCS#8 format. The second command prompts for a password.

$ openssl genpkey -algorithm RSA -out private_key.pem
$ openssl pkcs8 -topk8 -in private_key.pem \
    -out pkcs8KeypairEncrypted_PBE-SHA1-3DES.pem -v1 PBE-SHA1-3DES

Bouncy Castle FIPS provider

To generate a PKCS#8-format encrypted keypair that works with the Bouncy Castle FIPS provider, run the following openssl commands. The first command generates a private key in PKCS#8 format. The second command prompts for a password.

$ openssl genpkey -algorithm RSA -out private_key.pem
$ openssl pkcs8 -topk8 -in rsakey.pem \
    -out pvtkey-pkcs8-aes256.pem -v2 aes256

To see a sample configuration for the Bouncy Castle FIPS provider, expand the following section:

Sample configuration for Bouncy Castle FIPS provider Example configuration for Bouncy Castle FIPS provider .. code-block:: text advertised.listeners=INTERNAL://REDACTED.us-west-2.compute.internal:9092,BROKER://REDACTED.us-west-2.compute.internal:9091,CUSTOM://REDACTED.us-west-2.compute.amazonaws.com:9093,TOKEN://REDACTED.us-west-2.compute.amazonaws.com:9094 authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer broker.id=1 confluent.ansible.managed=true confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL confluent.balancer.topic.replication.factor=3 confluent.basic.auth.credentials.source=USER_INFO confluent.basic.auth.user.info=schema-registry:password confluent.license.topic=_confluent-command confluent.license.topic.replication.factor=3 confluent.metadata.server.advertised.listeners=https://REDACTED.us-west-2.compute.internal:8090 confluent.metadata.server.authentication.method=BEARER confluent.metadata.server.listeners=https://0.0.0.0:8090 confluent.metadata.server.ssl.key.password=REDACTED confluent.metadata.server.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.metadata.server.ssl.keystore.password=REDACTED confluent.metadata.server.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.metadata.server.ssl.truststore.password=REDACTED confluent.metadata.server.ssl.truststore.type=BCFKS confluent.metadata.server.ssl.keystore.type=BCFKS confluent.metadata.server.token.key.path=/var/ssl/private/encrypted_aes256_tokenKeypair.pem confluent.metadata.server.token.key.passphrase=REDACTED security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator #confluent.metadata.server.security.providers=io.confluent.kafka.security.fips.provider.BcFipsProviderCreator confluent.metadata.server.token.max.lifetime.ms=3600000 confluent.metadata.server.token.signature.algorithm=RS256 confluent.metadata.topic.replication.factor=3 confluent.metrics.reporter.bootstrap.servers=REDACTED.us-west-2.compute.internal:9091,REDACTED.us-west-2.compute.internal:9091,REDACTED.us-west-2.compute.internal:9091 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.key.password=REDACTED confluent.metrics.reporter.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.metrics.reporter.ssl.keystore.password=REDACTED confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.metrics.reporter.ssl.truststore.password=REDACTED confluent.metrics.reporter.ssl.keystore.type=BCFKS confluent.metrics.reporter.ssl.truststore.type=BCFKS confluent.metrics.reporter.topic.replicas=3 confluent.schema.registry.url=https://REDACTED.us-west-2.compute.internal:8081 confluent.security.event.logger.exporter.kafka.topic.replicas=3 confluent.ssl.key.password=REDACTED confluent.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks confluent.ssl.keystore.password=REDACTED confluent.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks confluent.ssl.truststore.password=REDACTED confluent.support.customer.id=anonymous confluent.support.metrics.enable=true group.initial.rebalance.delay.ms=3000 inter.broker.listener.name=BROKER kafka.rest.bootstrap.servers=REDACTED.us-west-2.compute.internal:9092,REDACTED.us-west-2.compute.internal:9092,REDACTED.us-west-2.compute.internal:9092 kafka.rest.client.security.protocol=SASL_SSL kafka.rest.client.ssl.key.password=REDACTED kafka.rest.client.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks kafka.rest.client.ssl.keystore.password=REDACTED kafka.rest.client.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks kafka.rest.client.ssl.truststore.password=REDACTED kafka.rest.client.ssl.keystore.type=BCFKS kafka.rest.client.ssl.truststore.type=BCFKS kafka.rest.confluent.metadata.basic.auth.user.info=pkcs8-7-5-x-74-test-cluster-main.kafka_erp:Confluent1! kafka.rest.confluent.metadata.bootstrap.server.urls=https://REDACTED.us-west-2.compute.internal:8090,https://REDACTED.us-west-2.compute.internal:8090,https://REDACTED.us-west-2.compute.internal:8090 kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC kafka.rest.confluent.metadata.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks kafka.rest.confluent.metadata.ssl.truststore.password=REDACTED kafka.rest.enable=true kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension kafka.rest.public.key.path=/var/ssl/private/public.pem kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler ldap.com.sun.jndi.ldap.read.timeout=3000 ldap.group.member.attribute.pattern=uid=(.),OU=rbac,DC=confluent,DC=io ldap.group.name.attribute=cn ldap.group.search.base=OU=rbac,DC=confluent,DC=io ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory ldap.java.naming.provider.url=ldap://ip-10-0-242-18.us-west-2.compute.internal:389 ldap.java.naming.security.authentication=simple ldap.java.naming.security.credentials=Confluent1! ldap.java.naming.security.principal=uid=mds,OU=rbac,DC=confluent,DC=io ldap.user.memberof.attribute.pattern=cn=(.),OU=rbac,DC=confluent,DC=io ldap.user.name.attribute=uid ldap.user.object.class=account ldap.user.search.base=OU=rbac,DC=confluent,DC=io listener.name.broker.ssl.client.auth=required listener.name.broker.ssl.key.password=REDACTED listener.name.broker.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.broker.ssl.keystore.password=REDACTED listener.name.broker.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.broker.ssl.truststore.password=REDACTED listener.name.custom.ssl.client.auth=required listener.name.custom.ssl.key.password=REDACTED listener.name.custom.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.custom.ssl.keystore.password=REDACTED listener.name.custom.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.custom.ssl.truststore.password=REDACTED listener.name.internal.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath="/var/ssl/private/public.pem"; listener.name.internal.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler listener.name.internal.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler listener.name.internal.principal.builder.class=io.confluent.kafka.security.authenticator.OAuthKafkaPrincipalBuilder listener.name.internal.sasl.enabled.mechanisms=OAUTHBEARER listener.name.internal.ssl.client.auth=required listener.name.internal.ssl.key.password=REDACTED listener.name.internal.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore.pk12 listener.name.internal.ssl.keystore.password=REDACTED listener.name.internal.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore.pk12 listener.name.internal.ssl.truststore.password=REDACTED listener.name.token.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath="/var/ssl/private/public.pem"; listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler listener.name.token.principal.builder.class=io.confluent.kafka.security.authenticator.OAuthKafkaPrincipalBuilder listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER listener.name.token.ssl.client.auth=required listener.name.token.ssl.key.password=REDACTED listener.name.token.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks listener.name.token.ssl.keystore.password=REDACTED listener.name.token.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks listener.name.token.ssl.truststore.password=REDACTED listener.security.protocol.map=INTERNAL:SASL_SSL,BROKER:SSL,CUSTOM:SSL,TOKEN:SASL_SSL listeners=INTERNAL://:9092,BROKER://:9091,CUSTOM://:9093,TOKEN://:9094 log.dirs=/var/lib/kafka/data log.retention.check.interval.ms=300000 log.retention.hours=168 log.segment.bytes=1073741824 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter num.io.threads=16 num.network.threads=8 num.partitions=1 num.recovery.threads.per.data.dir=2 offsets.topic.replication.factor=3 sasl.enabled.mechanisms=OAUTHBEARER socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 ssl.key.password=REDACTED ssl.keystore.location=/var/ssl/private/kafka_broker.keystore_BCFKS.bcfks ssl.keystore.password=REDACTED ssl.truststore.location=/var/ssl/private/kafka_broker.truststore_BCFKS.bcfks ssl.truststore.password=REDACTED super.users=User:mds;User:C=US,ST=Ca,L=PaloAlto,O=CONFLUENT,OU=TEST,CN=kafka_broker transaction.state.log.min.isr=2 transaction.state.log.replication.factor=3 zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.connect=REDACTED.us-west-2.compute.internal:2182,REDACTED.us-west-2.compute.internal:2182,REDACTED.us-west-2.compute.internal:2182 zookeeper.connection.timeout.ms=18000 zookeeper.ssl.client.enable=true zookeeper.ssl.keystore.location=/var/ssl/private/kafka_broker.keystore.pk12 zookeeper.ssl.keystore.password=REDACTED zookeeper.ssl.truststore.location=/var/ssl/private/kafka_broker.truststore.pk12 zookeeper.ssl.truststore.password=REDACTED ssl.keystore.type=BCFKS ssl.truststore.type=BCFKS listener.name.internal.ssl.keystore.type=PKCS12 listener.name.internal.ssl.truststore.type=PKCS12 listener.name.broker.ssl.keystore.type=BCFKS listener.name.broker.ssl.truststore.type=BCFKS listener.name.custom.ssl.keystore.type=BCFKS listener.name.custom.ssl.truststore.type=BCFKS listener.name.token.ssl.keystore.type=BCFKS listener.name.token.ssl.truststore.type=BCFKS zookeeper.ssl.keystore.type=PKCS12 zookeeper.ssl.truststore.type=PKCS12 confluent.ssl.keystore.type=BCFKS confluent.ssl.truststore.type=BCFKS

Troubleshoot

If you encounter configuration issues, review the following common errors and solutions.

Pad block corrupted / error finalizing cipher

Most likely, the specified password is incorrect. Verify that the password is correct and try again. Here’s an example stacktrace:

Example stacktrace for pad block corrupted / error finalizing cipher Example stacktrace for pad block corrupted / error finalizing cipher .. code-block:: text [2023-05-22 20:22:49,321] ERROR Unable to load token keyPair from config token.key.path with path:/Users/ragrawal/testkeys/tokenKeyPairPkcs8EncV2Des3BlahPass.pem (io.confluent.tokenapi.jwt.JwsProvider) java.io.IOException: Unable to decrypt private key: at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:98) at io.confluent.tokenapi.jwt.JwsProvider.tryLoadKeyPair(JwsProvider.java:206) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.Collections$2.tryAdvance(Collections.java:4853) at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4861) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.confluent.tokenapi.jwt.JwsProvider.loadKeys(JwsProvider.java:141) at io.confluent.tokenapi.jwt.JwsProvider.configure(JwsProvider.java:82) at io.confluent.tokenapi.jwt.JwtProvider.configure(JwtProvider.java:50) at io.confluent.rbacapi.app.RbacApiApplication.setupResources(RbacApiApplication.java:354) at io.confluent.rest.Application.configureHandler(Application.java:301) at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:201) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73) at io.confluent.http.server.KafkaHttpServerImpl.doStart(KafkaHttpServerImpl.java:111) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.bouncycastle.pkcs.PKCSException: unable to read encrypted data: Error finalising cipher at org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo.decryptPrivateKeyInfo(Unknown Source) at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:95) ... 18 more Caused by: org.bouncycastle.crypto.InvalidCipherTextException: Error finalising cipher at org.bouncycastle.jcajce.io.CipherInputStream.finaliseCipher(Unknown Source) at org.bouncycastle.jcajce.io.CipherInputStream.nextChunk(Unknown Source) at org.bouncycastle.jcajce.io.CipherInputStream.read(Unknown Source) at org.bouncycastle.util.io.Streams.pipeAll(Unknown Source) at org.bouncycastle.util.io.Streams.readAll(Unknown Source) ... 20 more Caused by: javax.crypto.BadPaddingException: Error finalising cipher data: pad block corrupted at org.bouncycastle.jcajce.provider.BaseCipher.engineDoFinal(Unknown Source) at java.base/javax.crypto.Cipher.doFinal(Cipher.java:2089) ... 25 more

Unsupported PEM object type: class org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo

This happens when an encrypted key is provided, but no passphrase is provided or an empty string is provided as passphrase.

Here’s an example stacktrace:

Example stacktrace for pad block corrupted / error finalizing cipher Example stacktrace for pad block corrupted / error finalizing cipher .. code-block:: text [2023-05-22 20:19:28,316] ERROR Unable to load token keyPair from config token.key.path with path:/Users/ragrawal/testkeys/tokenKeyPairPkcs8EncV2Des3BlahPass.pem (io.confluent.tokenapi.jwt.JwsProvider) java.io.IOException: Unsupported PEM object type: class org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo at io.confluent.common.security.util.PemUtils.loadKeyPair(PemUtils.java:70) at io.confluent.tokenapi.jwt.JwsProvider.tryLoadKeyPair(JwsProvider.java:209) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.Collections$2.tryAdvance(Collections.java:4853) at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4861) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.confluent.tokenapi.jwt.JwsProvider.loadKeys(JwsProvider.java:141) at io.confluent.tokenapi.jwt.JwsProvider.configure(JwsProvider.java:82) at io.confluent.tokenapi.jwt.JwtProvider.configure(JwtProvider.java:50) at io.confluent.rbacapi.app.RbacApiApplication.setupResources(RbacApiApplication.java:354) at io.confluent.rest.Application.configureHandler(Application.java:301) at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:201) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73) at io.confluent.http.server.KafkaHttpServerImpl.doStart(KafkaHttpServerImpl.java:111) at java.base/java.lang.Thread.run(Thread.java:833)

Configure a primary Kafka cluster to host the MDS and role binding

Tip

You can store passwords and other configuration data securely using the Confluent CLI confluent secret commands. For more information refer to Manage Secrets in Confluent Platform.

Note

You must complete and run the MDS configuration steps described here on all available brokers in your cluster. The configuration example here is for a cluster with a standalone broker. To view an example of a multiple broker (inter-broker) configuration, refer to Configure mTLS Authentication and RBAC for Kafka Brokers.

The following sections describe how to configure a primary Kafka cluster to host the MDS:

If you encounter issues configuring token authentication, refer to Token authentication.

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

Configure the Confluent Server Authorizer

An authorizer is a server plugin used by Apache Kafka® to authorize operations. More specifically, an authorizer controls whether or not to authorize an operation based on the principal and the resource being accessed.

The Confluent Server Authorizer supports proprietary role-based access control (RBAC) authorization for LDAP-based users and groups, as well as the setting of ACLs. Confluent Server Authorizer also supports pluggable authorization and group providers, enabling ACLs, and RBAC providers to be loaded at runtime.

Add the following configuration for Confluent Authorizer to your Kafka properties file (/etc/kafka/server.properties). Any content in brackets (<>) must be customized for your environment.

1############################# Confluent Server Authorizer Settings  #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3
4# Specify a list of Access Rule Providers to retain ACLs that have already been enabled and to enable RBAC
5confluent.authorizer.access.rule.providers= KRAFT_ACL,CONFLUENT
6# Specify when bootstrapping Confluent Platform and to assign a SystemAdmin
7super.users=<User:admin;User:mds>

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

The following sections describe the configuration properties used to specify the Confluent Server Authorizer settings.

authorizer.class.name

Defines the authorizer to use and serves as an entry point for Confluent proprietary functionality. In this MDS configuration, it turns “on” the Confluent Server Authorizer.

confluent.authorizer.access.rule.providers

List of access rule providers that are enabled. The Kafka Authorizer uses access rule providers to determine what rules should be used in access decisions. Supported access rule providers are:

  • KRAFT_ACL: Uses ACLs stored in the metadata topic to generate a set of access rule objects.
  • CONFLUENT: Uses RBAC role bindings and Centralized ACLs stored in Kafka to generate a set of access rule objects.
  • ZK_ACL: Default. Uses ACLs stored in ZooKeeper to generate a set of access rule objects.

KRAFT_ACL or ZK_ACL can be used, depending upon which mode the cluster is running in.

super.users

In the MDS configuration, you should assign the super.users attribute to define a user who has full access to all resources within a Metadata Service (MDS) cluster. The primary use of super.users is to bootstrap Confluent Platform and assign a SystemAdmin. On MDS clusters, the super.user can create role bindings for all other clusters. Permissions granted by super.user apply only to the broker where the super.user attribute is specified, and not to other brokers, clusters, or Confluent Platform components. No authorization is enforced on users defined as super.users, so we strongly recommend that you specify this attribute with a limited number of users (for example, 1-2 users who are responsible for bootstrapping).

Note

Bootstrapping Confluent Platform means that when you bring up a cluster for the very first time, you are linking to or relying on another server/cluster that already includes the correct host and other configuration details needed to start up, rather than having to enter all of startup attributes each and every time you start up a new cluster. Bootstrapping saves time and resource, while providing a reliable cluster startup experience.

Configure the LDAP identity provider

This configuration shows the LDAP context to identify LDAP users and groups to the MDS. The baseline LDAP configuration procedure for MDS is shown followed by detailed descriptions of the essential configuration options.

  1. Ensure you have this information available before you begin.

    • The hostname (LDAP server URL, for example, LDAPSERVER.EXAMPLE.COM), port (for example, 389), and any other security mechanisms (such as TLS)
    • The full DN (distinguished name) of LDAP users
    • If you have a complex LDAP directory tree, consider developing search filters for your configuration. These filters help MDS to trim LDAP search results.
  2. Edit your Kafka properties file (/etc/kafka/server.properties).

  3. Add the following baseline configuration for your identify provider (LDAP).

     1############################# Identity Provider Settings (LDAP) #############################
     2# Search groups for group-based authorization.
     3ldap.group.name.attribute=<sAMAccountName>
     4ldap.group.object.class=group
     5ldap.group.member.attribute=member
     6ldap.group.member.attribute.pattern=CN=(.*),DC=rbac,DC=confluent,DC=io
     7ldap.group.search.base=CN=Users,DC=rbac,DC=confluent,DC=io
     8#Limit the scope of searches to subtrees off of base
     9ldap.user.search.scope=2
    10#Enable filters to limit search to only those groups needed
    11ldap.group.search.filter=(|(CN=<specific group>)(CN=<specific group>))
    12
    13# Kafka authenticates to the directory service with the bind user.
    14ldap.java.naming.provider.url=ldap://<hostname>:389
    15ldap.java.naming.security.authentication=simple
    16ldap.java.naming.security.credentials=<password>
    17ldap.java.naming.security.principal=<mds-user-DN>
    18
    19# Locate users. Make sure that these attributes and object classes match what is in your directory service.
    20ldap.user.name.attribute=<sAMAccountName>
    21ldap.user.object.class=user
    22ldap.user.search.base=<user-search-base-DN>
    
  4. Adjust the configuration details for your environment, particularly the content in brackets (<>).

    Pay special attention to the following as you work:

    • Nested LDAP groups are not supported.
    • If you enable LDAP authentication for Kafka clients by adding the LDAP callback handler (not shown in this configuration):
      • Specify ldap.user.password.attribute only if your LDAP server does not support simple bind.
      • If you define this property (io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler), LDAP will perform the user search and return the password back to Kafka and Kafka will perform the password check.
      • The LDAP server will return the user’s hashed password, so Kafka cannot authenticate the user unless the user’s properties file also uses the hashed password.
  5. Save and close the property file.

  6. After configuring LDAP–but before configuring MDS– connect to and query your LDAP server to verify your LDAP connection information.

    It is recommended that you use an LDAP tool to do this (for example, JXplorer).

  7. When all sections of the MDS configuration are complete and your LDAP connection is verified, Start Confluent Platform.

The following sections provide details about the baseline LDAP configuration options for user and group-based authorization. For more details about LDAP configuration, see Configure LDAP Group-Based Authorization for MDS and Configure LDAP Authentication.

ldap.group.name.attribute

Contains the name of the group in a group entry obtained using an LDAP search. You can specify a regex pattern to extract the group name used in ACLs from this attribute by configuring ldap.group.name.attribute.pattern. The <sAMAccountName> is specific to Microsoft Active Directory and is the login name used to support clients and servers running various versions of Windows OS. Modify the value used if your LDAP configuration differs. The default for this configuration option is cn (common name).

ldap.group.object.class

Specifies the LDAP object class value that defines users in the directory service. Specify group to search groups for group-based authorization. Note that group has many applications, but is essentially a list of zero or more digital identities.

ldap.group.member.attribute

The name of the attribute that contains the members of the group in a group entry obtained using an LDAP search. The default is member. You can specify a regular expression (regex) pattern to extract the user principals from this attribute by configuring ldap.group.member.attribute.pattern.

ldap.group.member.attribute.pattern

A Java regular expression pattern that extracts the user principals of group members from group member entries obtained from the LDAP attribute specified using ldap.group.member.attribute. By default the full value of the attribute is used.

ldap.group.search.base

This attribute tells LDAP to limit the search base to group-based search using the values specified. The default is ou=groups.

ldap.group.search.scope

The LDAP search scope for group-based search. The value of 2 opens the search to include all the subtrees off the specified base, which is often too vast a space to search, and can result in timeouts. When specified, you also should specify ldap.group.search.filter. The default value is 1.

ldap.group.search.filter

The LDAP search filter for group-based search. Enables filters to limit search to only those groups needed. It is recommended that you list all the groups that will be used for searching. This is typically required because the LDAP trees in large organizations tend to be so large that trying to search it all results in timeouts. For instance, after you add a scope of 2 in ldap.group.search.scope to search all subtrees, you need to narrow the groups that are included in the search. You can include any number of groups in this search filter.

The following sections provide details about the baseline LDAP configuration options that Kafka uses to authenticate to the directory service with the bind user.

ldap.java.naming.provider.url

This option defines the URL to use for connections to the LDAP server. The default hostname is localhost; the default port is 389. You must specify this option for the MDS configuration. To configure with multiple LDAP servers for high availability, you can also supply space-separated list of LDAP URLs.

ldap.java.naming.security.authentication

If password authentication is enabled on your LDAP server, you can configure the user principal and password so that brokers can authenticate with the LDAP server using simple authentication. If you do not want to authenticate with the LDAP server, specify none. The recommended value to get your MDS configuration up and running is simple, which is a PLAINTEXT authentication protocol and provides no security. For production instances, you should specify a more secure SASL method supported by your LDAP server (such as SASL_GSSAPI).

ldap.java.naming.security.credentials

Specifies the security credentials (password) of the principal performing the LDAP search.

ldap.java.naming.security.principal

Specifies the security principal, which is the distinguished name of the LDAP user performing the LDAP search. In this configuration, specify the MDS user using the DN (LDAP distinguished name, which is a sequence of relative distinguished names (RDN) connected by commas).

The following sections provide details for the options used to locate users. Make sure that these attributes and object classes match what is in your directory service.

ldap.user.name.attribute

This attribute identifies the user principal in a user entry obtained using an LDAP search. You can specify a regex pattern to extract the user principal from this attribute by configuring ldap.user.name.attribute.pattern. The <sAMAccountName> is specific to Active Directory and is the login name used to support clients and servers running various versions of Windows OS. Modify this configuration if your LDAP configuration differs. The default for this option is cn (common name).

ldap.user.object.class

Specifies the LDAP object class value that defines users in the directory service. Specify user to search for user-based authorization.

ldap.user.search.base

Use to specify the LDAP search base for a user-based search. The default value is ou=users.

Configure an MDS node

This section describes both how to configure an MDS node and the configuration options you can use to create a key pair for MDS and configure MDS on the broker. You must update paths to the key files to match your setup.

Note

The Confluent Server supports the servlet applications REST Proxy and MDS.

  • If MDS is configured, then its listener configuration takes precedence over the REST Proxy listener configurations.
  • If MDS is disabled, then the REST Proxy listener configurations take precedence.

How to configure

  1. Review the MDS server settings below and note the option settings you need for your environment.

  2. Edit your Kafka properties file, /etc/kafka/server.properties.

  3. Add the following sample configuration for MDS server settings.

    1######################### SAMPLE MDS Server Settings #############################
    2# Bind Metadata Service HTTP service to a port.
    3confluent.metadata.server.listeners=http://<host>:8090
    4# The key to encrypt the token (when you issue you a token)
    5confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
    6# Supported authentication methods
    7confluent.metadata.server.authentication.method=BEARER
    
  4. Change the sample settings for use in your environment.

    Make sure to customize the elements in brackets (<>).

  5. When you have completed all the sections of the MDS configuration, Start Confluent Platform.

MDS server settings

The following sections describe the configuration options.

confluent.metadata.server.kraft.controller.enabled

If your environment uses RBAC and your KRaft controller nodes have the process.roles=controller or process.roles=broker,controller roles, then set this option to true. By default, this setting is false.

confluent.metadata.server.listeners

Use to bind the HTTP (or HTTPS) service to a port. The default port number is 8090.

This option is not required for controllers.

confluent.metadata.server.token.key.path

The location of the PEM-encoded public/private key pair to be used for signing and verifying tokens. The key pair is used to sign tokens when the token service issues a token, and to verify tokens when the token service authenticates a token. The key pair is generated using the RSA algorithm.

The token service supports:

  • PKCS#8 PEM-encoded private keys
  • RS256 signatures
confluent.metadata.server.token.key.passphrase

The passphrase for the private key. Specify this option if the private key is encrypted. If the private key is not encrypted, do not specify this option. If this option is specified, the value must not be an empty string.

confluent.metadata.server.authentication.method

Use to specify that you support token authentication on the MDS side. Use BEARER to indicate that bearer token authentication is enabled for the configuration.

Configure the token listener

MDS can give a token in exchange for a user name and password. MDS can also accept a token from a user or client and authenticate that principal by the token (which can be used after the first time to continue to authenticate).

The tokens are used to authenticate to the Kafka-configured OAUTHBEARER listener. This section shows how to enable and configure the MDS token service.

  • Be sure to use the same public keys across components and brokers.
  • Do not use a unique key for each MDS service or component connecting to OAUTH.
  • Be aware that after setting up MDS, the OAUTH listener does not display specific errors in relation to the MDS decryption keys (tokenKeypair.pem and public.pem) when they are different.

Add the following configuration for token listener settings to your Kafka properties file (/etc/kafka/server.properties). Any content in brackets (<>) must be customized for your environment.

 1############################# Token Listener Settings #############################
 2# Add named listener TOKEN to existing listeners and advertised.listeners
 3advertised.listeners=<advertised.listeners>,TOKEN://localhost:9092
 4listeners=<listeners>,TOKEN://:9092
 5# Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication.
 6listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
 7# Set SASL callback handler for verifying authentication token signatures
 8listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
 9# Configure the public key used to verify RBAC Metadata Service signatures
10# Note: username, password and metadataServerUrls must be set if used for inter-broker communication
11listener.name.token.oauthbearer.sasl.jaas.config= \
12   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
13   publicKeyPath="<path-to-public.pem>";
14# Add protocol mapping for newly-added named listener TOKEN
15listener.security.protocol.map=<listener.map>,TOKEN:SASL_PLAINTEXT
16listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

The following sections describe the configuration options used to specify the token listener settings.

advertised.listeners

Comma-separated list of URIs and listener names for other brokers and clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used.

listeners

Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. Each listener must include the hostname and the port.

listener.name.rbac.oauthbearer.sasl.login.callback.handler.class

Use to configure and enable Kafka API calls (for example, produce, consume) so they can understand and authenticate MDS JSON web tokens and passed usernames-passwords. For details, refer to Use SASL/OAUTHBEARER Authentication between Confluent Server Brokers and Kafka Clients in Confluent Platform.

listener.name.rbac.oauthbearer.sasl.server.callback.handler.class

Use to configure and enable Kafka API calls (for example, produce, consume) such that they will be able to understand and authenticate MDS JSON web tokens and passed usernames-passwords. For details, refer to Use SASL/OAUTHBEARER Authentication between Confluent Server Brokers and Kafka Clients in Confluent Platform.

listener.name.rbac.oauthbearer.sasl.jaas.config

Used by listener.name.rbac.oauthbearer.sasl.login.callback.handler.class to verify incoming JSON Web Tokens (JWT).

listener.security.protocol.map

This is a Kafka broker configuration option that defines key/value pairs for the security protocol to use, per listener name. Map between listener names and security protocols. This must be defined for the same security protocol to be usable in more than one port or IP. For example, internal and external traffic can be separated even if TLS is required for both. More precisely, the user could define listeners with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. Different security (TLS and SASL) settings can be configured for each listener by adding a normalized prefix (the listener name is lowercase) to the configuration name.

listener.name.rbac.sasl.enabled.mechanisms

A comma-separated list of SASL mechanisms enabled on the RBAC listener.

Important

For custom Kafka clients (Java or librdkafka), do not use the RBAC token listener for external client communications. Dependencies required for this authentication are not included in the Kafka client libraries. With RBAC enabled, token services are intended for internal communication between Confluent Platform components only (for example, it is valid for a Schema Registry licensed client), and not for long-running service principals or client authentication. For long-lived or client use cases, Use one of the supported authentication methods, like SASL or mTLS (mutual TLS). For details, see Authentication Methods Overview.

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

Full primary Kafka cluster MDS configuration

This example shows the full configuration for the primary Kafka cluster that is hosting MDS and role binding:

 1############################# Confluent Server Authorizer Settings  #############################
 2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
 3
 4# Specify a list of Access Rule Providers to retain ACLs that have already been enabled and to enable RBAC
 5confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT
 6
 7# Specify when bootstrapping Confluent Platform and to assign a SystemAdmin
 8super.users=<User:admin;User:mds>
 9
10############################# Identity Provider Settings (LDAP) #############################
11# Search groups for group-based authorization.
12ldap.group.name.attribute=<sAMAccountName>
13ldap.group.object.class=group
14ldap.group.member.attribute=member
15ldap.group.member.attribute.pattern=CN=(.*),DC=rbac,DC=confluent,DC=io
16ldap.group.search.base=CN=Users,DC=rbac,DC=confluent,DC=io
17#Limit the scope of searches to subtrees off of base
18ldap.user.search.scope=2
19#Enable filters to limit search to only those groups needed
20ldap.group.search.filter=(|(CN=<specific group>)(CN=<specific group>))
21
22# Kafka authenticates to the directory service with the bind user.
23ldap.java.naming.provider.url=ldap://<hostname>:389
24ldap.java.naming.security.authentication=simple
25ldap.java.naming.security.credentials=<password>
26ldap.java.naming.security.principal=<mds-user-DN>
27
28# Locate users. Make sure that these attributes and object classes match what is in your directory service.
29ldap.user.name.attribute=<sAMAccountName>
30ldap.user.object.class=user
31ldap.user.search.base=<user-search-base-DN
32
33############################# MDS Server Settings #############################
34# Bind Metadata Service HTTP service to port 8090.
35confluent.metadata.server.listeners=http://0.0.0.0:8090
36# The key to encrypt the token (when you issue you a token)
37confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
38# Supported authentication methods
39confluent.metadata.server.authentication.method=BEARER
40
41############################# Token Listener Settings #############################
42# Add named listener TOKEN to existing listeners and advertised.listeners
43advertised.listeners=<advertised.listeners>,TOKEN://localhost:9092
44listeners=<listeners>,TOKEN://:9092
45# Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication.
46listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
47# Set SASL callback handler for verifying authentication token signatures
48listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
49# Configure the public key used to verify RBAC Metadata Service signatures
50# Note: username, password and metadataServerUrls must be set if used for inter-broker communication
51listener.name.token.oauthbearer.sasl.jaas.config= \
52   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
53   publicKeyPath="<path-to-public.pem>";
54# Add protocol mapping for newly-added named listener TOKEN
55listener.security.protocol.map=<listener.map>,TOKEN:SASL_PLAINTEXT
56listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER

Troubleshooting MDS configuration issues

The following sections provide guidance to help your troubleshoot issues you may encounter in your MDS configuration.

Token authentication

Token login and authentication to MDS using OAUTHBEARER can fail without obvious known exceptions or errors appearing in either server.log or metadata-service.log. Such failures may be due to either the public and private key files being corrupt, or because the token was generated using different public and private key files.

To troubleshoot token authentication:

  1. Delete both tokenKeypair.pem and public.pem from the folders configured in server.properties and regenerate them. The newly-generated key files will be placed in:

    confluent.metadata.server.token.key.path=<path-to-token-key-pair.pem>
    
  2. After regenerating tokenKeypair.pem and public.pem, restart the broker server where MDS is running.

  3. On the client machine, delete the local CLI cache (~/.confluent/config.json), which cached the token for the super user after the super user logged in.

  4. Log in again using the CLI.

MDS REST client configurations

If a component (such as Schema Registry, ksqlDB, Confluent Control Center, or Connect) client configured to communicate with MDS includes an incorrect username or password, it can result in an endless loop of attempts to authenticate, which can inadvertently produce a continuous loop of exceptions in your REST client exception log and impact performance. For example:

[2021-01-25 05:11:58,330] ERROR [pool-17-thread-1] Error while refreshing active metadata server urls, retrying (io.confluent.security.auth.client.rest.RestClient)
io.confluent.security.auth.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
   at io.confluent.security.auth.client.rest.RestClient$HTTPRequestSender.lambda$submit$0(RestClient.java:353)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)

To control the number of authentication retry attempts, include the following options in your REST client MDS configuration:

  • confluent.metadata.server.urls.max.retries
  • confluent.metadata.server.urls.fail.on.401

For details about these configuration options, refer to REST client configurations.

Configure a secondary Kafka cluster managed by the MDS of the primary Kafka cluster

The following sections describe how to configure a secondary Kafka cluster managed by the MDS of the primary Kafka cluster.

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

Configure the Confluent Server Authorizer

See Configure the Confluent Server Authorizer for details about the Confluent Server Authorizer and the settings recommended here.

Add the following MDS configuration to your Kafka properties file (/etc/kafka/server.properties). Any content in brackets (<>) must be customized for your environment.

1############################# Confluent Server Authorizer Settings  #############################
2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
3confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

Configure the MDS

This section configures Kafka so that it can talk to the MDS cluster and “consume” the role bindings. In the configuration example below, SASL_PLAINTEXT/PLAIN is used, but you should use whatever security mechanism is required by the Kafka broker running MDS. Also, you should specify the port exposed by the other broker. For more information, see Metadata Service Configuration Settings.

1############################# MDS Settings #############################
2confluent.metadata.bootstrap.servers=<kafka-with-mds-hostname-1>:<port>,<kafka-with-mds-hostname-2>:<port>,...
3confluent.metadata.security.protocol=SASL_PLAINTEXT
4confluent.metadata.sasl.mechanism=PLAIN
5confluent.metadata.sasl.jaas.config= \
6   org.apache.kafka.common.security.plain.PlainLoginModule required \
7   username="<broker-username>" \
8   password="<broker-password>";

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

The following sections describe the configuration options used to specify the MDS settings.

confluent.metadata.bootstrap.servers

The Kafka hostname and port of the cluster host MDS. Used to determine how a broker connects to the MDS. For the default KRaft mode, include the hostname and port for MDS controller node configurations only. This option is required for managed brokers, and optional for MDS brokers, which default to the inter-broker values. An example value looks like ec2-22-222-22-222.compute-1.amazonaws.com:9092.

confluent.metadata.security.protocol

Defines the security options to be used when connecting to an external MDS, and provides managed brokers the ability to consume RBAC data.

confluent.metadata.sasl.mechanism

Defines the security options to be used when connecting to an external MDS, and provides managed brokers the ability to consume RBAC data.

confluent.metadata.sasl.jaas.config

Defines the JAAS configuration for managed clusters to connect to and consume the role binding data so that they can locally enforce RBAC on direct Kafka API calls to themselves.

Configure the token listener

This section of the MDS configuration enables the listener with the OAUTHBEARER SASL mechanism, which is used for impersonation. For more information, see Configure Clients for SASL/OAUTHBEARER authentication in Confluent Platform. For details about token listener settings in an MDS configuration, see Configure the token listener.

Any differences between the token listener settings used here and those described above in Configure the token listener are detailed below.

1############################# Token-based Listener Settings #############################
2listeners=<listeners>,TOKEN://:9092
3advertised.listeners=<advertised.listeners>,TOKEN://<hostname>:9092
4listener.security.protocol.map=<advertised.listeners>,TOKEN:SASL_PLAINTEXT
5listener.name.token.oauthbearer.sasl.jaas.config= \
6   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
7   publicKeyPath="<path-to-public.pem>";
8listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
9listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler

listener.name.token.oauthbearer.sasl.jaas.config

The oauthbearer.sasl.jaas.config of the defined LISTENER_NAME. Here, TOKEN is the name of the listener. In other words, listener.name.token.oauthbearer.sasl.jaas.config is an instance of listener.name.<LISTENER_NAME>.oauthbearer.sasl.jaas.config.

Important

When all sections of the MDS configuration are complete, Start Confluent Platform.

Full secondary Kafka cluster MDS configuration

This example shows the full configuration for the secondary Kafka cluster that is managed by the MDS of the primary Kafka cluster:

 1############################# Confluent Server Authorizer Settings  #############################
 2authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
 3confluent.authorizer.access.rule.providers= KRAFT_ACL,ZK_ACL,CONFLUENT
 4
 5############################# MDS Settings #############################
 6confluent.metadata.bootstrap.servers=<kafka-with-mds-host-1>:<port>,<kafka-with-mds-host-2>:<port>,...
 7confluent.metadata.security.protocol=SASL_PLAINTEXT
 8confluent.metadata.sasl.mechanism=PLAIN
 9confluent.metadata.sasl.jaas.config= \
10   org.apache.kafka.common.security.plain.PlainLoginModule required \
11   username="<broker-username>" \
12   password="<broker-password>";
13
14############################# Token-based Listener Settings #############################
15listeners=<listeners>,TOKEN://:9092
16advertised.listeners=<advertised.listeners>,TOKEN://<hostname>:9092
17listener.security.protocol.map=<advertised.listeners>,TOKEN:SASL_PLAINTEXT
18listener.name.token.oauthbearer.sasl.jaas.config= \
19   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
20   publicKeyPath="<path-to-public.pem>";
21listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
22listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler