Snowflake Source Connector for Confluent Cloud¶
The fully-managed Snowflake Source connector for Confluent Cloud can capture a snapshot of the existing data in specified Snowflake tables and then monitor and record all subsequent row-level changes to that data. The connector supports AVRO, JSON Schema, and PROTOBUF output data formats. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services. Note that deleted records are not captured.
Note
- If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.
Features¶
The Snowflake Source connector provides the following features:
Topics created automatically: The connector automatically creates Kafka topics using the naming convention:
<topic.prefix><database.schema.tableName>
. The topics are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
.Modes:
Set one of the following modes for updating a table each time it is polled:
- bulk: Performs a bulk load of all eligible table each time it is polled.
- incrementing: Uses a strictly incrementing column on each table to detect only new rows. Only rows with non-null value of incrementing column will be captured.
- timestamp: Uses timestamp column(s) to detect new and modified rows. On specifying multiple timestamp columns, the COALESCE SQL function will be used to find out the effective timestamp for a row. This assumes that the effective timestamp is updated with each write and its values are monotonically incrementing, but not necessarily unique. Only rows with non-null value of effective timestamp will be captured.
- timestamp+incrementing: Uses two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column, which provides a globally unique ID for updates, so each row can be assigned a unique stream offset. Only rows with non-null effective timestamp value and non-null incrementing column value will be captured.
Database authentication: The connector supports private key (with or without passphrase) authentication.
Data formats: The connector supports AVRO, JSON Schema, and PROTOBUF output data. Schema Registry must be enabled to use these formats. For more information, see Schema Registry Enabled Environments.
Offset management capabilities: Supports offset management. For more information, see Manage custom offsets.
Client-side field level encryption (CSFLE) support: The connector supports CSFLE for sensitive data. For more information about CSFLE setup, see connector configuration.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Limitations¶
Be sure to review the following information.
- For connector limitations, see Snowflake Source Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
Mode-wise offset structure¶
Each table has an individual entry in the offsets topic, with the table name stored in the key of the offset message. The connector fetches rows where the value of the incrementing column is strictly greater than the value stored in the offset.
Key
["SfConn-202",{"table":"TEST_DB.PUBLIC.TEST_TS_MODE"}]
Value
{
"incrementing": 2
}
Value includes the following information:
incrementing
: Specifies the value of incrementing column till which connector has completed reading.
Key
["SfConn-203",{"table":"TEST_DB.PUBLIC.TEST_TS_MODE_1"}]
Value
{
"timestamp_nanos": 463000000,
"timestamp": 1741253351000
}
Values include the following information:
timestamp
: Represents the number of milliseconds since January 1, 1970, 00:00:00 UTC.timestamp_nanos
: Represents the fractional seconds component of a Timestamp object.
This mode is a combination of the previous two modes. Hence, its offsets also represent a combination of the two modes.
Key
["SfConn-203",{"table":"TEST_DB.PUBLIC.TEST_TS_MODE_1"}]
Value
{
"timestamp_nanos": 352462000,
"incrementing": 2,
"timestamp": 1713218242000
}
Mode-wise offset guidance¶
Bulk mode¶
There are no offsets in the case of bulk mode since the whole database is queried in each poll method.
Timestamp mode¶
To get the current offset, make a GET
request that specifies the environment, Kafka cluster,
and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"table": "{table_name}"
},
"offset": {
"timestamp": 1741577430000,
"timestamp_nanos": 273000000
}
}
],
"metadata": {
"observed_at": "2025-03-10T04:00:31.754227738Z"
}
}
Responses include the following information:
- The position of latest offset.
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. CallingGET
repeatedly will fetch more recently observed offsets. - Information about the connector.
- In these examples, the curly braces around
connector_name
indicate a replaceable value.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"table": "{table_name}"
},
"offset": {
"timestamp": 1741577430000,
"timestamp_nanos": 273000000
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- For source connectors, the connector attempts to read from the position defined by the requested offsets.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"table": "{table_name}"
},
"offset": {
"timestamp": 1741577430000,
"timestamp_nanos": 273000000
}
}
],
"requested_at": "2025-03-10T04:06:30.786486181Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets in the source.
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2025-03-10T04:11:14.263641766Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the
environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"table": "{table_name}"
},
"offset": {
"timestamp": 1741577430000,
"timestamp_nanos": 273000000
}
}
],
"requested_at": "2025-03-10T04:06:30.786486181Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"table": "{table_name}"
},
"offset": {
"timestamp": 1741577430000,
"timestamp_nanos": 273000000
}
}
],
"applied_at": "2025-03-10T04:06:32.413138178Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
Incrementing mode and Incrementing+timestamp mode¶
For incrementing and timestamp+incrementing mode, the API request payload and response
match those for timestamp mode. Simply replace the
offset: {}
block with the specified offsets from their respective sections above.
Offset caveats¶
Note the following important consideration for managing offsets:
Timestamp mode: If you need to change the fractional seconds value while resetting offsets, you should do this in
timestamp_nanos
.Schema evolution: Confluent does not recommend updating offsets to a point in time before any DDL changes to the tables, as this may cause inconsistent results.
If you move the offsets back to a point before the schema evolution of the targeted tables during the connector run, you will not retrieve the exact same records as before. The records will reflect the schema of the current table.
JSON payload¶
The table below offers a description of the unique fields in the JSON payload for managing offsets of the Snowflake Source connectors:
Field | Definition | Required/Optional |
---|---|---|
incrementing |
Specifies the value of incrementing column up to which the connector has processed. The connector gets only values greater than the value in this field. Available only in the following modes: incrementing, timestamp+incrementing. |
Required |
table |
The name of the table. Available in the following modes: incrementing, timestamp, timestamp+incrementing. |
Required |
timestamp |
The number of milliseconds since Available only in the following modes: timestamp, timestamp+incrementing. |
Required |
timestamp_nanos |
Fractional seconds component of the Timestamp object. Available only in the following modes: timestamp, timestamp+incrementing. |
Required |
Generate a Snowflake key pair¶
Before you create the connector, you need to generate a key pair. Snowflake authentication requires 2048-bit (minimum) RSA. You add the public key to a Snowflake user account. You add the private key to the connector configuration (when completing the Quick Start instructions).
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud Snowflake Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in specified Snowflake tables and then monitoring and recording all subsequent row-level changes.
- Prerequisites
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- The connector automatically creates Kafka topics using the naming
convention:
<topic.prefix><database.schema.tableName>
. The topics are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. If you want to create topics with specific settings, create the topics before running this connector. - Schema Registry must be enabled. For more information, see Schema Registry Enabled Environments.
- Make sure your connector can reach your service. Consider the following before
running the connector:
- Depending on the service environment, certain network access limitations may exist. See Manage Networking for Confluent Cloud Connectors for details.
- To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors. For additional fully-managed connector networking details, see Networking and DNS.
- Clients from Azure Virtual Networks are not allowed to access the server by default. Check that your Azure Virtual Network is correctly configured and that Allow access to Azure Services is enabled.
- See your specific cloud platform documentation for how to configure security rules for your VPC.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Snowflake Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
Note
Freight clusters support only service accounts for Kafka authentication.
- Click Continue.
- Add the following Snowflake instance connection details:
- Snowflake Connection URL: The Snowflake connection URL. Use the format
<org_name>_<account_name>.snowflakecomputing.com
or if your account is in the AWS US West (Oregon) region, use<locator>.snowflakecomputing.com
. - Snowflake User: Specify the user for the Snowflake instance.
- Credentials Source: Source of the credentials for authentication. Use one of
the following supported values:
- PRIVATE_KEY: Authenticate using the private key for the Snowflake user. Enter only the
part of the key between
--BEGIN RSA PRIVATE KEY--
and--END RSA PRIVATE KEY--
. - PRIVATE_KEY_PASSPHRASE: Authenticate using both the private key and passphrase of the encrypted private key.
- PRIVATE_KEY: Authenticate using the private key for the Snowflake user. Enter only the
part of the key between
- Snowflake Connection URL: The Snowflake connection URL. Use the format
- Click Continue.
Configure the following:
Output messages
- Output record value format: Select the Output Kafka record value format (data going to the Kafka topic): AVRO, JSON_SR, or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, AVRO, JSON Schema, or PROTOBUF). For more information, see Schema Registry Enabled Environments.
How should we name your topic(s)?
- Topic Prefix: Enter a logical name to be prepended to table names to generate the Apache Kafka® topic name, where the connector publishes data.
Connector details
- Mode: Set one of the following modes for updating a table each time it is polled:
- bulk: Performs a bulk load of all eligible table each time it is polled.
- incrementing: Uses a strictly incrementing column on each table to detect only new rows.
Only rows with non-null value of incrementing column will be captured.
- Table to incrementing column mappings: Enter a comma-separated list of table regex
to incrementing column mappings. The expected format is
regex1:col2,regex2:col1
. Regexes will be matched against the fully-qualified table names. Identifier names are case-sensitive. Every table included for capture should match exactly one of the provided mappings. An example of valid input would beCOMPANY.EMPLOYEES.SALARY*:EMP_ID, COMPANY.FINANCE.ACCOUNTS.*:ID
.
- Table to incrementing column mappings: Enter a comma-separated list of table regex
to incrementing column mappings. The expected format is
- timestamp: Uses timestamp column(s) to detect new and modified rows. On specifying multiple
timestamp columns, the COALESCE SQL function will be used to find out the effective timestamp
for a row. This assumes that the effective timestamp is updated with each write and its values are
monotonically incrementing, but not necessarily unique. Only rows with non-null value of effective
timestamp will be captured..
- Table to timestamp columns mappings: Enter a comma-separated list of table regex to
timestamp column mappings. Timestamp columns supplied should strictly be of the
TIMESTAMP_NTZ
type. When specifying multiple timestamp columns, the COALESCE SQL function will be used to find out the effective timestamp for a row. The expected format isregex1:[col1|col2],regex2:[col3]
. Regexes will be matched against the fully-qualified table names. Identifier names are case-sensitive. Every table included for capture should match exactly one of the provided mappings. An example of valid input would beCOMPANY.EMPLOYEES.SALARY.*:[UPDATED_AT|MODIFIED_AT], COMPANY.FINANCE.ACCOUNTS.*:[CHANGED_AT]
.
- Table to timestamp columns mappings: Enter a comma-separated list of table regex to
timestamp column mappings. Timestamp columns supplied should strictly be of the
- timestamp+incrementing: Uses two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column, which provides a globally unique ID for updates, so each row can be assigned a unique stream offset. Only rows with non-null effective timestamp value and non-null incrementing column value will be captured.
- Tables Included: Enter a comma-separated list of regular expressions to match
the fully-qualified names of tables you want to copy. For example,
DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,...
. - Tables Excluded: Enter a comma-separated list of regular expressions to match
the fully-qualified names of tables for the connector to ignore. For example,
DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,...
. Note that this property applies only to tables filtered by thetable.include.list
field. - Database Timezone: Specify the timezone to interpret the values for timestamp types that don’t include timezone information. This should be set to the timezone of the Snowflake account.
Data encryption
- (Optional) Enable Client-Side Field Level Encryption: Enable CSFLE for data encryption. Specify a Schema Registry Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE setup, see Manage CSFLE for connectors.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Auto-restart policy
Enable Connector Auto-restart: Control the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to
true
, enabling the connector to automatically restart in case of user-actionable errors. Set this property tofalse
to disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.
Connector Details
Table Types: By default, the connector will only detect tables with type
TABLE
from the source database. This configuration allows a command separated list of table types to extract.Timestamp granularity for timestamp columns: Define the granularity of the Timestamp column. Defaults to
NANOS_LONG
.CONNECT_LOGICAL: Represents timestamp values using Kafka Connect’s built-in representations. This approach may lead to precision loss, as it only supports millisecond precision.
NANOS_LONG: Represents timestamp values as nanoseconds since the epoch.
NANOS_STRING: Represents timestamp values as nanoseconds since the epoch, encoded as strings.
Poll Interval (ms): Enter the frequency in milliseconds (ms) for polling new data in each table. Defaults to 5000 ms (5 seconds).
Schema Configuration
Value Subject Name Strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry. Defaults to
TopicNameStrategy
. Set this property toRecordNameStrategy
to derive the subject name from the record name. Set this property toTopicRecordNameStrategy
to derive the subject name from the topic and record name. For more information, see Subject name strategy.Value Converter Reference Subject Name Strategy: Determines how to construct the subject name for value. The default is
DefaultReferenceSubjectNameStrategy
, where the reference name is used as the subject. This configuration is provided specifically for PROTOBUF format only.
Processing position
Set offsets: Click Set offsets to define a specific offset for this connector to begin procession data from. For more information on managing offsets, see Manage offsets.
For all property values and definitions, see Configuration Properties.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details by previewing the running configuration.
Tip
For information about previewing your connector output, see Data Previews for Confluent Cloud Connectors.
{ "config": { "kafka.auth.mode": "SERVICE_ACCOUNT", "kafka.service.account.id": "sa-dev123", "schema.context.name": "default", "value.subject.name.strategy": "TopicNameStrategy", "value.converter.reference.subject.name.strategy": "DefaultReferenceSubjectNameStrategy", "connector.class": "SnowflakeSource", "name": "SnowflakeSourceConnector_0", "connection.url": "cflt.snowflakecomputing.com", "connection.user": "<user-name>", "connection.credentials.source": "PRIVATE_KEY", "connection.private.key": "<Snowflake Private Key>", "topic.prefix": "cli.", "mode": "timestamp", "timestamp.columns.mapping": "TEST_DB.PUBLIC.TEST_TS_MODE.*:[UPDATED_AT]", "db.timezone": "UTC", "table.types": "TABLE", "timestamp.granularity": "NANOS_LONG", "poll.interval.ms": "5000", "output.data.format": "AVRO", "tasks.max": "1", "auto.restart.on.user.error": "true" } }
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
Make sure you have all your prerequisites completed.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"name" : "SnowflakeSource_cli",
"connector.class": "SnowflakeSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"table.include.list": "TEST_DB.PUBLIC.TEST_TS_MODE.*",
"table.exclude.list": "TEST_DB.PUBLIC.TEST_TS_MODE_.*",
"db.timezone": "America/Los_Angeles",
"mode": "timestamp",
"timestamp.columns.mapping": "TEST_DB.PUBLIC.TEST_TS_MODE.*:[UPDATED_AT]",
"topic.prefix": "cli.",
"connection.url": "locator.snowflakecomputing.com",
"connection.user": "<user-name>",
"connection.credentials.source": "PRIVATE_KEY",
"connection.private.key": "<Snowflake Private Key>",
"tasks.max": "1",
"output.data.format": "AVRO"
}
Note the following property definitions:
"name"
: Sets a name for your new connector."connector.class"
: Identifies the connector plugin name.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"topic.prefix"
: Enter a topic prefix. The connector automatically creates Kafka topics using the naming convention:<topic.prefix><database.schema.tableName>
. The tables are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. If you want to create topics with specific settings, create the topics before running this connector. If you are configuring granular access using a service account, you must set up ACLs for the topic prefix."output.data.format"
: Sets the output Kafka record value format (data coming from the connector). Valid entries are AVRO, JSON_SR, or PROTOBUF. You must have Confluent Cloud Schema Registry configured if using a schema-based message formats."db.timezone"
: Specify the timezone to interpret the values for timestamp types that don’t include timezone information. This should be set to the timezone of the Snowflake account.. For more information, see this list of database timezones.
Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.
See Configuration Properties for all properties and definitions.
Step 4: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file snowflake-source.json
Example output:
Created connector SnowflakeSource_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+-----------+-------------------------+---------+-------+
lcc-ix4dl | SnowflakeSource_0 | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Configuration Properties¶
Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
- Importance: medium
value.converter.reference.subject.name.strategy
Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: medium
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
How should we connect to your Snowflake Instance?¶
connection.url
Snowflake connection URL. Supported formats are <org_name>_<account_name>.snowflakecomputing.com or If the account is located in the AWS US West (Oregon) region, then <locator>.snowflakecomputing.com.
- Type: string
- Importance: high
connection.user
User to be used for authenticating to snowflake.
- Type: string
- Importance: high
connection.credentials.source
The source of the credentials to use for authentication. Supported values are: PRIVATE_KEY: Use the private key to authenticate. PRIVATE_KEY_PASSPHRASE: Use the private key and passphrase to authenticate.
- Type: string
- Default: PRIVATE_KEY
- Importance: high
connection.private.key
Private key for Snowflake user.
- Type: password
- Importance: high
connection.private.key.passphrase
Passphrase of the encrypted private key.
- Type: password
- Importance: high
How should we name your topic(s)?¶
topic.prefix
Prefix to prepend to table names to generate the name of the Kafka topic to publish data to.
- Type: string
- Importance: high
Connector Details¶
mode
Mode represents the criteria on which table is polled each time. Options include – BULK: perform a bulk load of all the eligible tables each time it is polled. TIMESTAMP: use timestamp column(s) to detect new and modified rows. On specifying multiple timestamp columns, COALESCE SQL function would be used to find out the effective timestamp for a row. This assumes that the effective timestamp is updated with each write, it’s values are monotonically incrementing, but not necessarily unique. Only rows with non null value of effective timestamp would be captured. INCREMENTING: use a strictly incrementing column on each table to detect only new rows. Only rows with non null value of incrementing column would be captured. TIMESTAMP AND INCREMENTING: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset. Only rows with non null effective timestamp value and non null incrementing column value would be captured
- Type: string
- Importance: medium
table.include.list
A comma-separated list of regular expressions that match the fully-qualified names of tables to be copied. Identifier names are case sensitive. For example,
table.include.list: "DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,"
.- Type: list
- Importance: medium
table.exclude.list
A comma-separated list of regular expressions that match the fully-qualified names of tables not to be copied. This only applies on the tables filtered using include list. Identifier names are case sensitive. For example,
table.exclude.list: "DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,"
.- Type: list
- Importance: medium
timestamp.columns.mapping
A comma-separated list of table regex to timestamp columns mappings. Timestamp columns supplied should strictly be of type
TIMESTAMP_NTZ
. On specifying multiple timestamp columns, COALESCE SQL function would be used to find out the effective timestamp for a row. Expected format isregex1:[col1|col2],regex2:[col3]
. Regexes would be matched against the fully-qualified table names. Identifier names are case sensitive. Every table included for capture should match exactly one of the provided mappings. An example for a valid input would beCOMPANY.EMPLOYEES.SALARY.*:[UPDATED_AT|MODIFIED_AT], COMPANY.FINANCE.ACCOUNTS.*:[CHANGED_AT]
.- Type: list
- Importance: medium
incrementing.column.mapping
A comma-separated list of table regex to incrementing column mappings. Expected format is
regex1:col2,regex2:col1
. Regexes would be matched against the fully-qualified table names. Identifier names are case sensitive. Every table included for capture should match exactly one of the provided mappings. An example for a valid input would beCOMPANY.EMPLOYEES.SALARY*:EMP_ID,COMPANY.FINANCE.ACCOUNTS.*:ID
.- Type: list
- Importance: medium
db.timezone
Timezone to be used when interpreting values for timestamp types which don’t have a timezone information in them. This should be set to the timezone of the snowflake account.
- Type: string
- Importance: medium
timestamp.initial
Epoch timestamp in milliseconds which provides the start timestamp from where to capture rows. The value -1 sets the start timestamp to the current time, in this case older data would not be fetched. If not specified, start timestamp is treated as epoch start time, hence all data is fetched. Once the connector has managed to successfully record a source offset, this property has no effect even if changed to a different value later on.
- Type: long
- Importance: medium
table.types
By default, the connector will only detect tables with type TABLE. This config allows a command separated list of table types to extract.
- Type: list
- Default: TABLE
- Importance: medium
timestamp.granularity
Define the granularity of the Timestamp column. CONNECT_LOGICAL: Represents timestamp values using Kafka Connect built-in representations. This may lead to loss of precision as this only supports milliseconds precision. NANOS_LONG: Represents timestamp values as nanos since epoch. Avoid this if any of the eligible timestamp columns contains timestamps greater than 2262-11-04 23:47:16.854775807 GMT, nanos since epoch value would not fit in the range for long and hence would lead to erroneous values. Use nanos_string in such cases. NANOS_STRING: represents timestamp values as nanos since epoch in string.
- Type: string
- Default: NANOS_STRING
- Importance: low
poll.interval.ms
Frequency in ms to poll for new data in each table.
- Type: int
- Default: 5000 (5 seconds)
- Valid Values: [100,…]
- Importance: low
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Importance: high
Number of tasks for this connector¶
tasks.max
Maximum number of tasks for the connector.
- Type: int
- Valid Values: [1,…]
- Importance: high
Supported Data Types¶
The connector creates change events for table changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.
For certain data types, such as TIMESTAMP_NTZ
data type, you can customize how the connector maps
them by modifying the default configuration settings. This allows more control over handling
various data types, ensuring that the change events reflect the desired format and meet specific
requirements.
Numeric data types¶
The following table describes how the connector maps numeric types.
Snowflake data type | Connect type | Notes |
---|---|---|
NUMBER, DECIMAL, DEC, NUMERIC | BYTES |
The |
INT, INTEGER, BIGINT, SMALLINT, TINYINT, BYTEINT | BYTES |
The |
FLOAT, FLOAT4, FLOAT8, DOUBLE, DOUBLE PRECISION, REAL | FLOAT64 |
Note
You should specify in advance the precision and scale you expect for the values when creating a column. For example, NUMBER(3) limits it to only three-digit integers. This practice can help enforce a certain degree of integrity on the values entered into that column.
String and binary data types¶
The following table describes how the connector maps string and binary types.
Snowflake data type | Connect type |
---|---|
VARCHAR, STRING, TEXT, NVARCHAR, NVARCHAR2, CHAR VARYING, NCHAR VARYING | STRING |
CHAR, CHARACTER, NCHAR | STRING |
BINARY | BYTES |
Logical data types¶
The following table describes how the connector maps logical types.
Snowflake data type | Connect type |
---|---|
BOOLEAN | BOOLEAN |
Date and time data types¶
The following table describes how the connector maps date and time types.
Snowflake data type | Connect type | Notes |
---|---|---|
DATE | INT32 |
Represents the number of days since the epoch. |
TIME (P) | INT64 |
Represents the number of nanoseconds past midnight, and does not include timezone information. |
TIMESTAMP_NTZ (P)/ DATETIME (P) | INT64 OR STRING | Depending on configuration property timestamp.granularity, appropriate Kafka Connect type is chosen. For For For |
TIMESTAMP_LTZ (P), TIMESTAMP_TZ (P) | STRING | io.confluent.connect.snowflake.data.ZonedTimestamp
A string representation of a timestamp with timezone information. |
Semi-structured data types¶
The following table describes how the connector maps semi-structured types.
Snowflake data type | Connect type |
---|---|
VARIANT | STRING |
OBJECT | STRING |
ARRAY | STRING |
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.