Implement a Custom KMS Driver on Confluent Platform

The client-side field level encryption (CSFLE) functionality makes use of a library called Google Tink, a “secure-by-default” encryption library. In a CSFLE implementation, a key management service (KMS) securely stores and manages the encryption keys. The KMS is external to the database and provides a layer of security by separating key management from database management. When creating a custom KMS driver, do not encrypt fields that are used for filtering or sorting unless you have a compelling reason to do so.

To use a custom KMS with the CSFLE functionality in the Java client, three interfaces need to be implemented:

  • io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver
  • com.google.crypto.tink.KmsClient
  • com.google.crypto.tink.Aead

KMS driver

An example of a custom KmsDriver appears below. The getKeyUrlPrefix method should return a custom prefix, distinct from aws-kms://, azure-kms://, gcp-kms://, hcvault://, and local-kms://.

public class CustomKmsDriver implements KmsDriver {

  public CustomKmsDriver() {
  }

  @Override
  public String getKeyUrlPrefix() {
    return "custom-kms://";
  }

  @Override
  public KmsClient newKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    return new CustomKmsClient(configs, kekUrl);
  }
}

The getAead method should return a custom Aead implementation.

public final class CustomKmsClient implements KmsClient {

  public static final String PREFIX = "custom-kms://";

  private String keyUri;
  private Aead aead;

  public CustomKmsClient(Map<String, ?> configs, Optional<String> kekUrl)
      throws GeneralSecurityException {
    String uri = kekUrl.orElse(PREFIX);
    if (!uri.toLowerCase(Locale.US).startsWith(PREFIX)) {
      throw new IllegalArgumentException("key URI must start with " + PREFIX);
    }
    this.keyUri = uri;
    this.aead = new CustomKmsAead(configs, uri);
  }

  /**
   * @return true either if this client is a generic one and uri starts with
   *     {@link CustomKmsClient#PREFIX}, or the client is a specific one that is bound to the key
   *     identified by {@code uri}.
   */
  @Override
  public boolean doesSupport(String uri) {
    if (this.keyUri != null && this.keyUri.equals(uri)) {
      return true;
    }
    return this.keyUri == null && uri.toLowerCase(Locale.US).startsWith(PREFIX);
  }

  /**
   * Loads credentials from a properties file.
   *
   * @throws GeneralSecurityException if the client initialization fails
   */
  @Override
  public KmsClient withCredentials(String credentialPath) throws GeneralSecurityException {
    return this;
  }

  /**
   * Loads default credentials.
   *
   * @throws GeneralSecurityException if the client initialization fails
   */
  @Override
  public KmsClient withDefaultCredentials() throws GeneralSecurityException {
    return this;
  }

  @Override
  public Aead getAead(String uri) throws GeneralSecurityException {
    if (this.keyUri != null && !this.keyUri.equals(uri)) {
      throw new GeneralSecurityException(
          String.format(
              "this client is bound to %s, cannot load keys bound to %s", this.keyUri, uri));
    }

    return aead;
  }
}

Custom Aead example

An example of a custom Aead appears below. You must implement the encrypt and decrypt methods to ensure data is encrypted and decrypted.

public final class CustomKmsAead implements Aead {

  public CustomKmsAead(Map<String, ?> configs, String keyUrl) {
    // use configs and keyUrl to create a KMS client to the custom KMS
  }

  @Override
  public byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to encrypt the plaintext
    } catch (Exception e) {
      throw new GeneralSecurityException("encryption failed", e);
    }
  }

  @Override
  public byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
      throws GeneralSecurityException {
    try {
      // use the KMS client to decrypt the ciphertext
    } catch (Exception e) {
      throw new GeneralSecurityException("decryption failed", e);
    }
  }
}

The final step is to create a provider configuration file named io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver in the resource directory META-INF/services. The contents of the file should contain the fully qualified name of the custom KmsDriver, such as the following:

com.mycompany.CustomKmsDriver

This tells the service loader what class to instantiate when the KmsDriver interface is requested.