PostgreSQL CDC Source V2 (Debezium) Connector for Confluent Cloud¶
The fully-managed PostgreSQL Change Data Capture (CDC) Source V2 (Debezium) connector for Confluent Cloud can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output data formats. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services.
Note
- This Quick Start is for version 2 of this connector. For the earlier version of this connector, see PostgreSQL CDC Source Connector (Debezium) [Legacy] for Confluent Cloud. If migrating from V1 to V2, see Moving from V1 to V2.
- This Quick Start is for the fully-managed cloud connector. If you are installing the connector locally for Confluent Platform, see Debezium SQL Server CDC Source Connector Connector for Confluent Platform.
- See the Debezium docs for more information.
V2 Improvements¶
Note the following improvements made to the V2 connector.
- Added support for PostgreSQL 15.
- Added support for PostgreSQL 16. Note: The connector does not support logical replication from standby servers.
- Supports columns of type bytea[], an array of bytes (byte array).
- Filtered publications are updated automatically when updating the table capture list.
- Can stop or pause an in-progress incremental snapshot. Can resume the incremental snapshot if it was previously been paused.
- Supports regular expressions to specify table names for incremental snapshots.
- Supports SQL-based predicates to control the subset of records to be included in the incremental snapshot.
- Supports specifying a single column as a surrogate key for performing incremental snapshots.
- Can perform ad-hoc blocking snapshots.
- Indices that rely on hidden, auto-generated columns, or columns wrapped in database functions are no longer considered primary key alternatives for tables that do not have a primary key defined.
- Configuration options to specify how topic and schema names should be adjusted for compatibility.
Features¶
The PostgreSQL CDC Source V2 (Debezium) connector provides the following features:
- Topics created automatically: The connector automatically creates Kafka topics using the naming convention:
<topic.prefix>.<schemaName>.<tableName>
. The tables are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. For more information, see Maximum message size. - Logical decoding plugins supported:
pgoutput
. The default used ispgoutput
. - Database authentication: Uses password authentication.
- SSL support: Supports SSL encryption.
- Client-side field level encryption (CSFLE) support: The connector supports CSFLE for sensitive data. For more information about CSFLE setup, see the connector configuration.
- Tables included and Tables excluded: Sets whether a table is or is not monitored for changes. By default, the connector monitors every non-system table.
- Tombstones on delete: Configures whether a tombstone event should be generated after a delete event. Default is true.
- Output data formats: The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output Kafka record value format. It supports Avro, JSON Schema, Protobuf, JSON (schemaless), and String output record key format. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
- Tasks per connector: Organizations can run multiple connectors with a limit of one task per connector (
"tasks.max": "1"
). - Incremental snapshot: Supports incremental snapshotting via signaling. Note that the connector automatically adds the signal table to the publication only if the
publication.autocreate.mode
is set tofiltered
orall_tables
. You will need to add it manually if the mode is set todisabled
. - Offset management capabilities: Supports offset management. For more information, see Manage CSFLE.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Supported database versions¶
The PostgreSQL CDC Source V2 (Debezium) connector is compatible with the following PostgreSQL versions: 10, 11, 12, 13, 14, 15, 16.
Limitations¶
Be sure to review the following information.
- For connector limitations, see PostgreSQL CDC Source V2 (Debezium) Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
Maximum message size¶
This connector creates topics automatically. When it creates topics, the internal connector configuration property max.message.bytes
is set to the following:
- Basic cluster:
8 MB
- Standard cluster:
8 MB
- Enterprise cluster:
8 MB
- Dedicated cluster:
20 MB
For more information about Confluent Cloud clusters, see Kafka Cluster Types in Confluent Cloud.
Log retention during snapshot¶
When launched, the CDC connector creates a snapshot of the existing data in
the database to capture the nominated tables. To do this, the connector executes a
“SELECT *”
statement. Completing the snapshot can take a while if one or more of
the nominated tables is very large.
During the snapshot process, the replication slot is not advanced. This is to ensure that the database server does not remove WAL segments needed for replication once the snapshot process completes. If one or more tables are very large in size, the snapshot process could take a long time to complete. In situations with a high rate of change, it is possible that the PostgreSQL disk space consumed by WAL files keeps increasing. This has the potential to exhaust the disk space on the database server, leading to database operation failures or server shutdown.
Manage CSFLE¶
In general, database connectors can automatically create topics and corresponding schemas to match tables created on the database side. This capability, however, is not supported with CSFLE, leading to source connector failures. To prevent issues, Confluent recommends specifying allowlist of tables when enabling CSFLE.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
To get the current offset, make a GET
request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "server_01"
},
"offset": {
"lsn": 9943856034248,
"lsn_proc": 9943856034248,
"lsn_commit": 9943856034156,
"messageType": "INSERT",
"transaction_id": null,
"txId": 1142586,
"ts_usec": 1714023032677090
}
}
],
"metadata": {
"observed_at": "2024-03-28T17:57:48.139635200Z"
}
}
Responses include the following information:
- The position of latest offset.
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. CallingGET
repeatedly will fetch more recently observed offsets. - Information about the connector.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"server": "server_01"
},
"offset": {
"lsn": 9943855924248,
"lsn_commit": 9943855924192,
"lsn_proc": 9943855924248
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- For source connectors, the connector attempts to read from the position defined by the requested offsets.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "server_01"
},
"offset": {
"lsn": 9943855924248,
"lsn_commit": 9943855924192,
"lsn_proc": 9943855924248
}
}
],
"requested_at": "2024-03-28T17:58:45.606796307Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets in the source.
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2024-03-28T17:59:45.606796307Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "server_01"
},
"offset": {
"lsn": 9943855924248,
"lsn_commit": 9943855924192,
"lsn_proc": 9943855924248
}
}
],
"requested_at": "2024-03-28T17:58:45.606796307Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"server": "server_01"
},
"offset": {
"lsn": 9943856034248,
"lsn_proc": 9943856034248,
"lsn_commit": 9943856034156,
"messageType": "INSERT",
"transaction_id": null,
"txId": 1142586,
"ts_usec": 1714023032677090
}
}
],
"applied_at": "2024-03-28T17:58:48.079141883Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
JSON payload¶
The table below offers a description of the unique fields in the JSON payload for managing offsets of the PostgreSQL Change Data Capture (CDC) Source V2 (Debezium) connector.
Field | Definition | Required/Optional |
---|---|---|
server |
The logical name of the server, specified by the configuration topic.prefix . |
Required |
lsn_commit |
The Log Sequence Number (LSN) is a 64-bit number to identify the position of Write Ahead Log (WAL) records.
|
Required |
lsn_proc |
The LSN of the last processed event. When consuming from the replication slot, commits and their corresponding commit LSNs are totally ordered. While LSNs of change events within a transaction maintain order, it is important to note that LSN ordering is not preserved across transactions. Hence, to establish totally ordered positions, both the commit LSN and the LSN of the last processed event are required. |
Required |
lsn |
lsn is the same as lsn_proc . |
Required |
messageType |
The operation associated with the event for which the offset is committed. This is added to
identify BEGIN and COMMIT operations. In Postgres, a DML event can have same LSN as
that of its preceding transactional event (BEGIN/COMMIT ). So skipping messages just on
the basis of LSNs can cause data loss. To prevent the data loss, the events with last stored
LSNs are reprocessed if they happen to be transactional events. Need to be specified when
provide.transaction.metadata is enabled and the target lsn_proc belongs to a transactional event. |
Optional |
transaction_id |
If the configuration provide.transaction.metadata is set to false , transaction_id is a null
value. If provide.transaction.metadata is set to true , the value for transaction_id corresponds
to the transaction ID of the event for which the offset is committed. |
Optional |
transaction_data_collection_order_<SCHEMA>.<TABLE> |
If the configuration provide.transaction.metadata is set to false ,
transaction_data_collection_order_<SCHEMA>.<TABLE> is not included in the response.
If provide.transaction.metadata is set to true and the committed event corresponds
to the table being captured, the <SCHEMA> and <TABLE> components in the field contain
the schema and table name for the corresponding event for which the offset is committed. The
value corresponding to this field is a long representing the number of events processed for that
table in that transaction. |
Optional |
txId |
Id of the transaction to which the event corresponds. | Optional |
ts_usec |
Time in microseconds for the COMMIT operation of the transaction that generated the corresponding event. |
Optional |
Migrate connectors¶
Considerations:
- The configurations of the self-managed connector must match the configurations of the fully-managed connector.
- The self-managed connector must be operating in streaming mode. If the self-managed connector is still in the process of making a snapshot, you can either create a new connector on Confluent Cloud which starts the snapshot process from the beginning or wait for the snapshot process to complete and follow the migration guidance.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud PostgreSQL CDC Source V2 (Debezium) connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in a PostgreSQL database and then monitoring and recording all subsequent row-level changes.
- Prerequisites
Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
The PostgreSQL database must be configured for CDC. For details, see PostgreSQL in the Cloud.
Public access may be required for your database. See Manage Networking for Confluent Cloud Connectors for details. The following example shows the AWS Management Console when setting up a PostgreSQL database.
A parameter group with the property
rds.logical_replication=1
is required. An example is shown below. Once created, you must reboot the database.For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors. The following example shows the AWS Management Console when setting up security group rules for the VPC.
Note
See your specific cloud platform documentation for how to configure security rules for your VPC.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Make sure you have all your prerequisites completed.
At the Add Postgres CDC Source V2 (Debezium) Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
- Click Continue.
- Add the following database connection details:
- Database hostname: IP address or hostname of the PostgreSQL database server.
- Database port: Port number of the PostgreSQL database server.
- Database username: The name of the PostgreSQL database user that has the required authorization.
- Database password: Password of the PostgreSQL database user that has the required authorization.
- Database name: The name of the PostgreSQL database from which to stream the changes.
- SSL mode: Whether to use an encrypted connection to the PostgreSQL server. Possible settings
are:
disable
,prefer
, andrequire
.prefer
(default): attempts to use a secure (encrypted) connection first and, failing that, an unencrypted connection.disable
: uses an unencrypted connection.require
: uses a secure (encrypted) connection, and fails if one cannot be established.
- Click Continue.
Add the following details:
- (Optional) Enable Client-Side Field Level Encryption for data encryption. Specify a Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE setup, see Manage CSFLE for connectors.
- Select the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional See information, see Schema Registry Enabled Environments.
- Topic prefix: Provides a namespace (logical server name) for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. This logical name forms a namespace and is used in all the names of the Kafka topics and the Kafka Connect schema names.
- Slot name: The name of the PostgreSQL logical decoding slot that
is created for streaming changes from a particular plug-in and for a
particular database/schema. Defaults to
debezium
. - Publication name: The name of the PostgreSQL publication created
for streaming changes when using the standard logical decoding plugin
(pgoutput). Defaults to
dbz_publication
. - Snapshot mode: Specifies the criteria for running a snapshot
when the connector starts. Possible settings are:
initial
,initial_only
, andnever
.initial
(default): The connector performs a snapshot only when no offsets have been recorded for the logical server name.initial_only
: The connector performs an initial snapshot and then stops, without processing any subsequent changes.never
: The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the starting position available in the replication slot. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.
- Tables included: Enter a comma-separated list of
fully-qualified table identifiers for the connector to monitor. By
default, the connector monitors all non-system tables. A
fully-qualified table name is in the form
schemaName.tableName
. This property cannot be used with the property Tables excluded. - Tables excluded: Enter a comma-separated list of
fully-qualified table identifiers for the connector to ignore. A
fully-qualified table name is in the form
schemaName.tableName
. This property cannot be used with the property Tables included.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
JSON output decimal format: Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64
to serialize DECIMAL logical types as base64 encoded binary data andNUMERIC
to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.After-state only: Controls whether the generated Kafka record should contain only the state of the row after the event occurred. Defaults to
false
.Tombstones on delete: Configure whether a tombstone event should be generated after a delete event. The default is
true
.Publication auto-create mode: Applies only when streaming changes by using the pgoutput plug-in. Possible settings are
all_tables
,disabled
, andfiltered
.all_tables
: If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication for all tables in the database for which the connector is capturing changes. For the connector to create a publication it must access the database through a database user account that has permission to create publications and perform replications. You can create the publication using following SQL command:CREATE PUBLICATION <publication_name> FOR ALL TABLES
.disabled
: The connector does not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.filtered
: If a publication exists, the connector uses it. If no publication exists, the connector creates a new publication for tables that match the current filter configuration as specified by theTables included
, andTables excluded
connector configuration properties. For example:CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>
. If the publication exists, the connector updates the publication for tables that match the current filter configuration. For example:ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>
. For the connector to alter a publication it must access the database through a database user account that has ownership of the publication and the tables it is capturing.Note
If the existing regex patterns in
Tables included
orTables excluded
match the fully qualified name of a newly created table, the connector will miss events from this new table until the publication is manually altered to include it. To avoid missing events, it is recommended to alter the publication before adding data to newly created tables.When configuring multiple connectors to capture different sets of tables from the database using a filtered configuration, do not use the same publication for all connectors. If multiple connectors use the same publication, the latest connector may alter the publication based on its capture list, potentially causing an incorrect publication configuration for the older connectors.
Signal data collection: Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the fully-qualified collection name:
schemaName.tableName
. These signals can be used to perform incremental snapshotting.Columns excluded: An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
.Event processing failure handling mode: Specifies how the connector should react to exceptions during processing of events. Possible settings are:
fail
,skip
, andwarn
.fail
(default): propagates the exception, indicates the offset of the problematic event, and causes the connector to stop.skip
: skips the problematic event and continues processing.warn
: logs the offset of the problematic event, skips that event, and continues processing.
Schema name adjustment mode: Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are:
none
,avro
, andavro_unicode
.none
(default): does not apply any adjustment.avro
: replaces the characters that cannot be used in the Avro type name with underscore.avro_unicode
: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
Field name adjustment mode: Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings are:
none
,avro
, andavro_unicode
.none
(default): does not apply any adjustment.avro
: replaces the characters that cannot be used in the Avro type name with underscore.avro_unicode
: replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
Heartbeat interval (ms): Controls how frequently the connector sends heartbeat messages to a Kafka topic. The behavior of default value 0 is that the connector does not send heartbeat messages. Heartbeat messages are useful for monitoring whether the connector is receiving change events from the database. Heartbeat messages might help decrease the number of change events that need to be re-sent when a connector restarts. To send heartbeat messages, set this property to a positive integer, which indicates the number of milliseconds between heartbeat messages.
Decimal handling mode: Specifies how the connector should handle values for
DECIMAL
andNUMERIC
columns. Possible settings are:precise
,double
, andstring
.precise
(default): represents values by usingjava.math.BigDecimal
to represent values in binary form in change events.double
: represents values by using double values, which might result in a loss of precision but which is easier to use.string
: encodes values as formatted strings, which are easy to consume but semantic information about the real type is lost.
Time precision mode: Time, date, and timestamps can be represented with different modes of precision:
adaptive
(default): captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.adaptive_time_microseconds
: captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception isTIME
type fields, which are always captured as microseconds.connect
: always represents time and timestamp values by using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns’ precision.
Transforms and Predicates: For details, see the Single Message Transforms (SMT) documentation. For additional information about the Debezium SMTs ExtractNewRecordState and EventRouter (Debezium), see Debezium transformations.
For all property values and definitions, see Configuration Properties.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- This connector supports a single task only.
- Click Continue.
Verify the connection details by previewing the running configuration.
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
Make sure you have all your prerequisites completed.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"connector.class": "PostgresCdcSourceV2",
"name": "PostgresCdcSourceV2Connector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "****************************************************************",
"database.hostname": "debezium-1.<host-id>.us-east-2.rds.amazonaws.com",
"database.port": "5432",
"database.user": "postgres",
"database.password": "**************",
"database.dbname": "postgres",
"topic.prefix": "cdc",
"slot.name": "dbz_slot",
"publication.name": "dbz_publication",
"table.include.list":"public.passengers",
"output.data.format": "JSON",
"tasks.max": "1"
}
Note the following property definitions:
"connector.class"
: Identifies the connector plugin name."name"
: Sets a name for your new connector.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"database.hostname"
: IP address or hostname of the PostgreSQL database server."database.port"
: Port number of the PostgreSQL database server."database.user"
: The name of the PostgreSQL database user that has the required authorization."database.password"
: Password of the PostgreSQL database user that has the required authorization."database.dbname"
: The name of the PostgreSQL database from which to stream the changes."topic.prefix"
: Provides a namespace for the particular database server/cluster that the connector is capturing changes from."slot.name"
: The name of the PostgreSQL logical decoding slot that is created for streaming changes from a particular plug-in for a particular database/schema. The slot name can contain only lower-case letters, numbers, and the underscore character."publication.name"
: The name of the PostgreSQL publication created for streaming changes when using the standard logical decoding plugin (pgoutput)."table.include.list"
: An optional, comma-separated list of fully-qualified table identifiers for tables whose changes you want to capture. By default, the connector monitors all non-system tables. A fully-qualified table name is in the formschemaName.tableName
. This property cannot be used with the propertytable.exclude.list
."output.data.format"
: Sets the output record format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based record format (for example, Avro, JSON_SR (JSON Schema), or Protobuf)."tasks.max"
: Enter the number of tasks in use by the connector. Organizations can run multiple connectors with a limit of one task per connector (that is,"tasks.max": "1"
).
Note
(Optional) To enable CSFLE for data encryption, specify the following properties:
csfle.enabled
: Flag to indicate whether the connector honors CSFLE rules.sr.service.account.id
: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.
For more information on CSFLE setup, see Manage CSFLE for connectors.
Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI. For additional information about the Debezium SMTs ExtractNewRecordState and EventRouter (Debezium), see Debezium transformations.
See Configuration Properties for all property values and definitions.
Step 4: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file postgresql-cdc-source-v2.json
Example output:
Created connector PostgresCdcSourceV2Connector_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+-----------+--------------------------------+---------+-------+
lcc-ix4dl | PostgresCdcSourceV2Connector_0 | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
After-state only output limitation¶
When a connector is configured with the property after.state.only
set to
false
, you expect to see the previous values of all columns under before
in the record.
However, depending on the REPLICA IDENTITY setting
of the corresponding table, the before
field will be set to null or show a
subset of the columns. If PROTOBUF is used, the record may not contain the before
field at all. The following example shows this issue and provides a corrective
action to take.
{
"before": null,
"after": {
"id": 5,
"name": "Allen William Henry",
"sex": "male",
"age": 25,
"sibsp": 0,
"parch": 0,
"created_at": "2024-01-17T11:30:40.831461Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "test",
"ts_ms": 1705471663123,
"snapshot": "false",
"db": "postgres",
"sequence": "[null,\"8736500352768\"]",
"schema": "public",
"table": "passengers",
"txId": 572,
"lsn": 8736500352768,
"xmin": null
},
"op": "u",
"ts_ms": 1705471663501,
"transaction": null
}
For an updated record to contain the previous (before
) values of all columns
in the row, you need to modify the passengers
table by running ALTER TABLE
passengers REPLICA IDENTITY FULL
. After you make this change in the PostgreSQL
database, and records are updated, you should see records similar to the
following sample.
{
"before": {
"id": 8,
"name": "Gosta Leonard",
"sex": "male",
"age": 2,
"sibsp": 3,
"parch": 1,
"created_at": "2024-01-17T11:30:55.955056Z"
},
"after": {
"id": 8,
"name": "Gosta Leonard",
"sex": "male",
"age": 25,
"sibsp": 3,
"parch": 1,
"created_at": "2024-01-17T11:30:55.955056Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "test",
"ts_ms": 1705471953456,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"8736433249408\",\"8736500352768\"]",
"schema": "public",
"table": "passengers",
"txId": 581,
"lsn": 8736500482568,
"xmin": null
},
"op": "u",
"ts_ms": 1705471953986,
"transaction": null
}
Publication modifications¶
In version 2, when the connector is configured with Publication auto-create mode
as filtered
and
a publication already exists, the connector will automatically alter the publication to include the
configured tables each time it restarts. This means that if you update the Tables included
or
Tables excluded
connector configuration properties to include new tables, the publication will
be automatically updated by the connector.
In version 1, the connector does not alter an existing publication. To add new tables to the capture list,
you must manually alter the publication to include the new tables before updating the Tables included
or Tables excluded
configuration.
Upgrading PostgreSQL Database¶
When you upgrade the PostgreSQL database that Debezium uses, you must take specific steps to protect against data loss and to ensure that Debezium continues to operate. In general, Debezium is resilient to interruptions caused by network failures and other outages. For example, when a database server that a connector monitors stops or crashes, after the connector re-establishes communication with the PostgreSQL server, it continues to read from the last position recorded by the log sequence number (LSN) offset. The connector retrieves information about the last recorded offset from the Connect offsets topic, and queries the configured PostgreSQL replication slot for a LSN with the same value.
For the connector to start and to capture change events from a PostgreSQL database, a replication slot must be present. However, as part of the upgrade process, replication slots are removed, and the original slots are not restored after the upgrade completes. As a result, when the connector restarts and requests the last known offset from the replication slot, PostgreSQL cannot return the information.
You can create a new replication slot, but you must do more than create a new slot to guard against data loss. A new replication slot can provide the LSNs only for changes that occur after you create the slot; it cannot provide the offsets for events that occurred before the upgrade. When the connector restarts, it first requests the last known offset from the Connect offsets topic. It then sends a request to the replication slot to return information for the offset retrieved from the offsets topic. But the new replication slot cannot provide the information that the connector needs to resume streaming from the expected position. The connector then skips any existing change events in the log, and only resumes streaming from the most recent position in the log. This can lead to silent data loss: the connector emits no records for the skipped events, and it does not provide any information to indicate that events were skipped.
For guidance about how to perform a PostgreSQL database upgrade so that Debezium can continue to capture events while minimizing the risk of data loss, see the following procedure:
Temporarily stop applications that write to the database, or put them into a read-only mode.
Back up the database.
Temporarily disable write access to the database.
Provide the connector with enough time to capture all event records that are written to the replication slot. To verify that the connector has finished consuming entries from the replication slot, check the value of
confirmed_flush_lsn
of the slot, which should remain constant after all changes have been consumed. This step ensures that all change events that occurred before the downtime are accounted for, and that they are saved to Kafka.Note the current connector configurations and then delete the connector.
As a PostgreSQL administrator, drop the replication slot on the primary database server.
Perform the upgrade using an approved PostgreSQL upgrade procedure, such as
pg_upgrade
, orpg_dump
andpg_restore
.Verify that the publication that defines the tables for Debezium to capture is still present after the upgrade. If the publication is not available, you can either connect to the database as a PostgreSQL administrator to create a new publication or let the new connector create it on startup.
Note
The publication can be created with the same name as the previously configured publication; however, it is not mandatory to do so.
As a PostgreSQL administrator, you can either create the logical replication slot on the database or let the connector create it on startup. The slot must be created before enabling writes to the database. Otherwise, Debezium cannot capture the changes, resulting in data loss.
Note
The replication slot can be created with the same name as the previously configured replication slot; however, it is not mandatory to do so.
Launch a new connector with the same configurations as the previous connector, but set
snapshot.mode
tonever
. Additionally, updatepublication.name
andslot.name
if the publication name and replication slot name have changed.Note
If you were unable to verify that Debezium finished reading all database changes in step 4, you can configure the connector to perform a new snapshot by setting
snapshot.mode=initial
.Verify that the new replication slot is available.
Restore write access to the database and restart any applications that write to the database.
Moving from V1 to V2¶
Version 2 of this connector supports new features and has breaking changes that are not backward compatible with version 1 of the connector. To understand these changes and to plan for moving to version 2, see Backward Incompatible Changes in Debezium CDC V2 Connectors.
Given the backward-incompatible changes between version 1 and 2 of the CDC connectors, version 2 is being provided in a new set of CDC connectors on Confluent Cloud. You can provision either version 1 or version 2. However, note that eventually version 1 will be deprecated and no longer supported.
Before exploring your options for moving from version 1 to 2, be sure to make the required changes documented in Backward Incompatible Changes in Debezium CDC V2 Connectors. To get the offset in the following section, use the Confluent Cloud APIs. For more information, see Cluster API reference, Manage CSFLE, and Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To move from version 1 to 2 (v1 to v2)
Use the following steps to migrate to version 2. Implement and validate any connector changes in a pre-production environment before promoting to production.
Pause the v1 connector.
Get the offset for the v1 connector.
Create the v2 connector using the offset from the previous step.
confluent connect cluster create [flags]
For example:
Create a configuration file with connector configs and offsets.
{ "name": "(connector-name)", "config": { ... // connector specific configuration }, "offsets": [ { "partition": { ... // connector specific configuration }, "offset": { ... // connector specific configuration } } ] }
Create a connector in the current or specified Kafka cluster context.
confluent connect cluster create --config-file config.json
For connectors that maintain a schema history topic, you must configure the schema history topic name in v2 to match the schema history topic name from the v1 connector.
Delete the v1 connector.
For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
Configuration Properties¶
Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
How should we connect to your database?¶
database.hostname
IP address or hostname of the PostgreSQL database server.
- Type: string
- Importance: high
database.port
Port number of the PostgreSQL database server.
- Type: int
- Valid Values: [0,…,65535]
- Importance: high
database.user
The name of the PostgreSQL database user that has the required authorization.
- Type: string
- Importance: high
database.password
Password of the PostgreSQL database user that has the required authorization.
- Type: password
- Importance: high
database.dbname
The name of the PostgreSQL database from which to stream the changes.
- Type: string
- Importance: high
database.sslmode
Whether to use an encrypted connection to the PostgreSQL server. Possible settings are: disable, prefer, and require.
disable uses an unencrypted connection.
prefer attempts to use a secure (encrypted) connection first and, failing that, an unencrypted connection.
require uses a secure (encrypted) connection, and fails if one cannot be established.
- Type: string
- Default: prefer
- Importance: high
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Default: JSON
- Importance: high
output.key.format
Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, STRING or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Default: JSON
- Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
- Importance: high
json.output.decimal.format
Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
- Type: string
- Default: BASE64
- Importance: low
after.state.only
Controls whether the generated Kafka record should contain only the state of the row after the event occurred.
- Type: boolean
- Default: false
- Importance: low
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event.
true - a delete operation is represented by a delete event and a subsequent tombstone event.
false - only a delete event is emitted.
After a source record is deleted, emitting the tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.
- Type: boolean
- Default: true
- Importance: medium
How should we name your topic(s)?¶
topic.prefix
Topic prefix that provides a namespace (logical server name) for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used. The connector automatically creates Kafka topics using the naming convention: <topic.prefix>.<schemaName>.<tableName>.
- Type: string
- Importance: high
Database config¶
slot.name
The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
- Type: string
- Default: debezium
- Valid Values: Must match the regex
^[a-z0-9_]+$
- Importance: medium
publication.name
The name of the PostgreSQL publication created for streaming changes when using pgoutput. Based on the value of
publication.autocreate.mode
the publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time. If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.- Type: string
- Default: dbz_publication
- Valid Values: Must match the regex
^[^\s\"\'\`]+$
- Importance: medium
publication.autocreate.mode
Applies only when streaming changes by using the pgoutput plug-in. Possible settings are all_tables, disabled, and filtered.
all_tables - If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication for all tables in the database for which the connector is capturing changes. For the connector to create a publication it must access the database through a database user account that has permission to create publications and perform replications. You can create the publication using following SQL command: CREATE PUBLICATION <publication_name> FOR ALL TABLES;.
disabled - The connector does not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.
filtered - If a publication exists, the connector uses it. If no publication exists, the connector creates a new publication for tables that match the current filter configuration as specified by the table.include.list, and table.exclude.list connector configuration properties. For example: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>. If the publication exists, the connector updates the publication for tables that match the current filter configuration. For example: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>. For the connector to alter a publication it must access the database through a database user account that has ownership of the publication and the tables it is capturing.
Note:
If the existing regex patterns in table.include.list, and table.exclude.list match the fully qualified name of a newly created table, the connector will miss events from this new table until the publication is manually altered to include it. To avoid missing events, it is recommended to alter the publication before adding data to newly created tables.
When configuring multiple connectors to capture different sets of tables from the database using a filtered configuration, do not use the same publication for all connectors. If multiple connectors use the same publication, the latest connector may alter the publication based on its capture list, potentially causing an incorrect publication configuration for the older connectors.
- Type: string
- Default: all_tables
- Valid Values: all_tables, disabled, filtered
- Importance: medium
signal.data.collection
Fully-qualified name of the data collection that needs to be used to send signals to the connector. Use
schemaName.tableName
format to specify the fully-qualified collection name. Note that the connector automatically adds the signal table to the publication only if thepublication.autocreate.mode
is set tofiltered
orall_tables
. You will need to add it manually if the mode is set todisabled
.- Type: string
- Importance: medium
Connector config¶
snapshot.mode
Specifies the criteria for running a snapshot upon startup of the connector. Possible settings are: initial, never, and initial_only.
initial - The connector performs a snapshot only when no offsets have been recorded for the logical server name.
never - The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the starting position available in the replication slot. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.
initial_only - The connector performs an initial snapshot and then stops, without processing any subsequent changes.
- Type: string
- Default: initial
- Valid Values: initial, initial_only, never
- Importance: medium
table.include.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. When this property is set, the connector captures changes only from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes in every non-system table in each schema whose changes are being captured.
To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not also set the
table.exclude.list
property.- Type: list
- Importance: medium
table.exclude.list
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture. Each identifier is of the form schemaName.tableName. When this property is set, the connector captures changes from every table that you do not specify.
To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not set the
table.include.list
property.- Type: list
- Importance: medium
column.exclude.list
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.
To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name.
- Type: list
- Importance: medium
event.processing.failure.handling.mode
Specifies how the connector should react to exceptions during processing of events. Possible settings are: fail, skip, and warn.
fail propagates the exception, indicates the offset of the problematic event, and causes the connector to stop.
warn logs the offset of the problematic event, skips that event, and continues processing.
skip skips the problematic event and continues processing.
- Type: string
- Default: fail
- Valid Values: fail, skip, warn
- Importance: low
schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: none, avro, and avro_unicode.
none does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: medium
field.name.adjustment.mode
Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: none, avro, and avro_unicode.
none does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: medium
heartbeat.interval.ms
Controls how frequently the connector sends heartbeat messages to a Kafka topic. The behavior of default value 0 is that the connector does not send heartbeat messages. Heartbeat messages are useful for monitoring whether the connector is receiving change events from the database. Heartbeat messages might help decrease the number of change events that need to be re-sent when a connector restarts. To send heartbeat messages, set this property to a positive integer, which indicates the number of milliseconds between heartbeat messages.
- Type: int
- Default: 0
- Valid Values: [0,…]
- Importance: low
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
value.converter.ignore.default.for.nullables
Specifies whether the default value should be ignored for nullable fields. If set to true, the default value is ignored for nullable fields. If set to false, the default value is used for nullable fields.
- Type: boolean
- Default: false
- Importance: medium
key.converter.reference.subject.name.strategy
Set the subject reference name strategy for key. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: high
value.converter.reference.subject.name.strategy
Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: high
How should we handle data types?¶
decimal.handling.mode
Specifies how the connector should handle values for DECIMAL and NUMERIC columns. Possible settings are: precise, double, and string.
precise represents values by using java.math.BigDecimal to represent values in binary form in change events. double represents values by using double values, which might result in a loss of precision but which is easier to use. string encodes values as formatted strings, which are easy to consume but semantic information about the real type is lost.
- Type: string
- Default: precise
- Valid Values: double, precise, string
- Importance: medium
time.precision.mode
Time, date, and timestamps can be represented with different kinds of precisions:
adaptive captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.
adaptive_time_microseconds captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception is TIME type fields, which are always captured as microseconds.
connect always represents time and timestamp values by using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns’ precision.
- Type: string
- Default: adaptive
- Valid Values: adaptive, adaptive_time_microseconds, connect
- Importance: medium
Number of tasks for this connector¶
tasks.max
Maximum number of tasks for the connector.
- Type: int
- Valid Values: [1,…,1]
- Importance: high
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.