Snowflake Source Connector for Confluent Cloud

The fully-managed Snowflake Source connector for Confluent Cloud can capture a snapshot of the existing data in specified Snowflake tables and then monitor and record all subsequent row-level changes to that data. The connector supports AVRO, JSON Schema, and PROTOBUF output data formats. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services. Note that deleted records are not captured.

Note

Features

The Snowflake Source connector provides the following features:

  • Topics created automatically: The connector automatically creates Kafka topics using the naming convention: <topic.prefix><database.schema.tableName>. The topics are created with the properties: topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3.

  • Modes:

    Set one of the following modes for updating a table each time it is polled:

    • bulk: Performs a bulk load of all eligible table each time it is polled.
    • incrementing: Uses a strictly incrementing column on each table to detect only new rows. Only rows with non-null value of incrementing column will be captured.
    • timestamp: Uses timestamp column(s) to detect new and modified rows. On specifying multiple timestamp columns, the COALESCE SQL function will be used to find out the effective timestamp for a row. This assumes that the effective timestamp is updated with each write and its values are monotonically incrementing, but not necessarily unique. Only rows with non-null value of effective timestamp will be captured.
    • timestamp+incrementing: Uses two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column, which provides a globally unique ID for updates, so each row can be assigned a unique stream offset. Only rows with non-null effective timestamp value and non-null incrementing column value will be captured.
  • Database authentication: The connector supports private key (with or without passphrase) authentication.

  • Data formats: The connector supports AVRO, JSON Schema, and PROTOBUF output data. Schema Registry must be enabled to use these formats. For more information, see Schema Registry Enabled Environments.

  • Offset management capabilities: Supports offset management. For more information, see Manage custom offsets.

  • Client-side field level encryption (CSFLE) support: The connector supports CSFLE for sensitive data. For more information about CSFLE setup, see connector configuration.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Limitations

Be sure to review the following information.

Manage custom offsets

You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

To manage offsets:

Mode-wise offset structure

In bulk mode, since the entire table is queried during each poll, offsets are not used.

Mode-wise offset guidance

Bulk mode

There are no offsets in the case of bulk mode since the whole database is queried in each poll method.

Timestamp mode

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
        {
            "partition": {
              "table": "{table_name}"
            },
            "offset": {
              "timestamp": 1741577430000,
              "timestamp_nanos": 273000000
            }
        }
    ],
    "metadata": {
        "observed_at": "2025-03-10T04:00:31.754227738Z"
    }
}

Responses include the following information:

  • The position of latest offset.
  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling GET repeatedly will fetch more recently observed offsets.
  • Information about the connector.
  • In these examples, the curly braces around connector_name indicate a replaceable value.

Incrementing mode and Incrementing+timestamp mode

For incrementing and timestamp+incrementing mode, the API request payload and response match those for timestamp mode. Simply replace the offset: {} block with the specified offsets from their respective sections above.

Offset caveats

Note the following important consideration for managing offsets:

  • Timestamp mode: If you need to change the fractional seconds value while resetting offsets, you should do this in timestamp_nanos.

  • Schema evolution: Confluent does not recommend updating offsets to a point in time before any DDL changes to the tables, as this may cause inconsistent results.

    If you move the offsets back to a point before the schema evolution of the targeted tables during the connector run, you will not retrieve the exact same records as before. The records will reflect the schema of the current table.

JSON payload

The table below offers a description of the unique fields in the JSON payload for managing offsets of the Snowflake Source connectors:

Field Definition Required/Optional
incrementing

Specifies the value of incrementing column up to which the connector has processed. The connector gets only values greater than the value in this field.

Available only in the following modes: incrementing, timestamp+incrementing.

Required
table

The name of the table.

Available in the following modes: incrementing, timestamp, timestamp+incrementing.

Required
timestamp

The number of milliseconds since January 1, 1970, 00:00:00 GMT represented by the Timestamp object of the column value.

Available only in the following modes: timestamp, timestamp+incrementing.

Required
timestamp_nanos

Fractional seconds component of the Timestamp object.

Available only in the following modes: timestamp, timestamp+incrementing.

Required

Generate a Snowflake key pair

Before you create the connector, you need to generate a key pair. Snowflake authentication requires 2048-bit (minimum) RSA. You add the public key to a Snowflake user account. You add the private key to the connector configuration (when completing the Quick Start instructions).

Quick Start

Use this quick start to get up and running with the Confluent Cloud Snowflake Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in specified Snowflake tables and then monitoring and recording all subsequent row-level changes.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • The connector automatically creates Kafka topics using the naming convention: <topic.prefix><database.schema.tableName>. The topics are created with the properties: topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3. If you want to create topics with specific settings, create the topics before running this connector.
  • Schema Registry must be enabled. For more information, see Schema Registry Enabled Environments.
  • Make sure your connector can reach your service. Consider the following before running the connector:
    • Depending on the service environment, certain network access limitations may exist. See Manage Networking for Confluent Cloud Connectors for details.
    • To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors. For additional fully-managed connector networking details, see Networking and DNS.
    • Clients from Azure Virtual Networks are not allowed to access the server by default. Check that your Azure Virtual Network is correctly configured and that Allow access to Azure Services is enabled.
    • See your specific cloud platform documentation for how to configure security rules for your VPC.
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the Snowflake Source connector card.

Snowflake Source Connector Card

Step 4: Enter the connector details

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Snowflake Source Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.

Note

Freight clusters support only service accounts for Kafka authentication.

  1. Click Continue.

Step 5: Check the Kafka topic

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
   "name" : "SnowflakeSource_cli",
   "connector.class": "SnowflakeSource",
   "kafka.auth.mode": "KAFKA_API_KEY",
   "kafka.api.key": "<my-kafka-api-key>",
   "table.include.list": "TEST_DB.PUBLIC.TEST_TS_MODE.*",
   "table.exclude.list": "TEST_DB.PUBLIC.TEST_TS_MODE_.*",
   "db.timezone": "America/Los_Angeles",
   "mode": "timestamp",
   "timestamp.columns.mapping": "TEST_DB.PUBLIC.TEST_TS_MODE.*:[UPDATED_AT]",
   "topic.prefix": "cli.",
   "connection.url": "locator.snowflakecomputing.com",
   "connection.user": "<user-name>",
   "connection.credentials.source": "PRIVATE_KEY",
   "connection.private.key": "<Snowflake Private Key>",
   "tasks.max": "1",
   "output.data.format": "AVRO"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "topic.prefix": Enter a topic prefix. The connector automatically creates Kafka topics using the naming convention: <topic.prefix><database.schema.tableName>. The tables are created with the properties: topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3. If you want to create topics with specific settings, create the topics before running this connector. If you are configuring granular access using a service account, you must set up ACLs for the topic prefix.

  • "output.data.format": Sets the output Kafka record value format (data coming from the connector). Valid entries are AVRO, JSON_SR, or PROTOBUF. You must have Confluent Cloud Schema Registry configured if using a schema-based message formats.

  • "db.timezone": Specify the timezone to interpret the values for timestamp types that don’t include timezone information. This should be set to the timezone of the Snowflake account.. For more information, see this list of database timezones.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

See Configuration Properties for all properties and definitions.

Step 4: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file snowflake-source.json

Example output:

Created connector SnowflakeSource_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |            Name         | Status  |  Type
+-----------+-------------------------+---------+-------+
lcc-ix4dl   | SnowflakeSource_0       | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Configuration Properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
  • Importance: medium
value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: medium

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

How should we connect to your Snowflake Instance?

connection.url

Snowflake connection URL. Supported formats are <org_name>_<account_name>.snowflakecomputing.com or If the account is located in the AWS US West (Oregon) region, then <locator>.snowflakecomputing.com.

  • Type: string
  • Importance: high
connection.user

User to be used for authenticating to snowflake.

  • Type: string
  • Importance: high
connection.credentials.source

The source of the credentials to use for authentication. Supported values are: PRIVATE_KEY: Use the private key to authenticate. PRIVATE_KEY_PASSPHRASE: Use the private key and passphrase to authenticate.

  • Type: string
  • Default: PRIVATE_KEY
  • Importance: high
connection.private.key

Private key for Snowflake user.

  • Type: password
  • Importance: high
connection.private.key.passphrase

Passphrase of the encrypted private key.

  • Type: password
  • Importance: high

How should we name your topic(s)?

topic.prefix

Prefix to prepend to table names to generate the name of the Kafka topic to publish data to.

  • Type: string
  • Importance: high

Connector Details

mode

Mode represents the criteria on which table is polled each time. Options include – BULK: perform a bulk load of all the eligible tables each time it is polled. TIMESTAMP: use timestamp column(s) to detect new and modified rows. On specifying multiple timestamp columns, COALESCE SQL function would be used to find out the effective timestamp for a row. This assumes that the effective timestamp is updated with each write, it’s values are monotonically incrementing, but not necessarily unique. Only rows with non null value of effective timestamp would be captured. INCREMENTING: use a strictly incrementing column on each table to detect only new rows. Only rows with non null value of incrementing column would be captured. TIMESTAMP AND INCREMENTING: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset. Only rows with non null effective timestamp value and non null incrementing column value would be captured

  • Type: string
  • Importance: medium
table.include.list

A comma-separated list of regular expressions that match the fully-qualified names of tables to be copied. Identifier names are case sensitive. For example, table.include.list: "DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,".

  • Type: list
  • Importance: medium
table.exclude.list

A comma-separated list of regular expressions that match the fully-qualified names of tables not to be copied. This only applies on the tables filtered using include list. Identifier names are case sensitive. For example, table.exclude.list: "DB_A.PUBLIC.CUSTOMER.*,DB_B.PUBLIC.CUSTOMER.*,".

  • Type: list
  • Importance: medium
timestamp.columns.mapping

A comma-separated list of table regex to timestamp columns mappings. Timestamp columns supplied should strictly be of type TIMESTAMP_NTZ. On specifying multiple timestamp columns, COALESCE SQL function would be used to find out the effective timestamp for a row. Expected format is regex1:[col1|col2],regex2:[col3]. Regexes would be matched against the fully-qualified table names. Identifier names are case sensitive. Every table included for capture should match exactly one of the provided mappings. An example for a valid input would be COMPANY.EMPLOYEES.SALARY.*:[UPDATED_AT|MODIFIED_AT], COMPANY.FINANCE.ACCOUNTS.*:[CHANGED_AT].

  • Type: list
  • Importance: medium
incrementing.column.mapping

A comma-separated list of table regex to incrementing column mappings. Expected format is regex1:col2,regex2:col1. Regexes would be matched against the fully-qualified table names. Identifier names are case sensitive. Every table included for capture should match exactly one of the provided mappings. An example for a valid input would be COMPANY.EMPLOYEES.SALARY*:EMP_ID,COMPANY.FINANCE.ACCOUNTS.*:ID.

  • Type: list
  • Importance: medium
db.timezone

Timezone to be used when interpreting values for timestamp types which don’t have a timezone information in them. This should be set to the timezone of the snowflake account.

  • Type: string
  • Importance: medium
timestamp.initial

Epoch timestamp in milliseconds which provides the start timestamp from where to capture rows. The value -1 sets the start timestamp to the current time, in this case older data would not be fetched. If not specified, start timestamp is treated as epoch start time, hence all data is fetched. Once the connector has managed to successfully record a source offset, this property has no effect even if changed to a different value later on.

  • Type: long
  • Importance: medium
table.types

By default, the connector will only detect tables with type TABLE. This config allows a command separated list of table types to extract.

  • Type: list
  • Default: TABLE
  • Importance: medium
timestamp.granularity

Define the granularity of the Timestamp column. CONNECT_LOGICAL: Represents timestamp values using Kafka Connect built-in representations. This may lead to loss of precision as this only supports milliseconds precision. NANOS_LONG: Represents timestamp values as nanos since epoch. Avoid this if any of the eligible timestamp columns contains timestamps greater than 2262-11-04 23:47:16.854775807 GMT, nanos since epoch value would not fit in the range for long and hence would lead to erroneous values. Use nanos_string in such cases. NANOS_STRING: represents timestamp values as nanos since epoch in string.

  • Type: string
  • Default: NANOS_STRING
  • Importance: low
poll.interval.ms

Frequency in ms to poll for new data in each table.

  • Type: int
  • Default: 5000 (5 seconds)
  • Valid Values: [100,…]
  • Importance: low

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: AVRO
  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Supported Data Types

The connector creates change events for table changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.

For certain data types, such as TIMESTAMP_NTZ data type, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.

Numeric data types

The following table describes how the connector maps numeric types.

Snowflake data type Connect type Notes
NUMBER, DECIMAL, DEC, NUMERIC BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer that represents how many digits the decimal point shifted.

INT, INTEGER, BIGINT, SMALLINT, TINYINT, BYTEINT BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer that represents how many digits the decimal point shifted.

FLOAT, FLOAT4, FLOAT8, DOUBLE, DOUBLE PRECISION, REAL FLOAT64  

Note

You should specify in advance the precision and scale you expect for the values when creating a column. For example, NUMBER(3) limits it to only three-digit integers. This practice can help enforce a certain degree of integrity on the values entered into that column.

String and binary data types

The following table describes how the connector maps string and binary types.

Snowflake data type Connect type
VARCHAR, STRING, TEXT, NVARCHAR, NVARCHAR2, CHAR VARYING, NCHAR VARYING STRING
CHAR, CHARACTER, NCHAR STRING
BINARY BYTES

Logical data types

The following table describes how the connector maps logical types.

Snowflake data type Connect type
BOOLEAN BOOLEAN

Date and time data types

The following table describes how the connector maps date and time types.

Snowflake data type Connect type Notes
DATE INT32

org.apache.kafka.connect.data.Date

Represents the number of days since the epoch.

TIME (P) INT64

io.confluent.connect.snowflake.data.NanoTime

Represents the number of nanoseconds past midnight, and does not include timezone information.

TIMESTAMP_NTZ (P)/ DATETIME (P) INT64 OR STRING

Depending on configuration property timestamp.granularity, appropriate Kafka Connect type is chosen.

For NANOS_STRING, the connector uses STRING.

For NANOS_LONG, the connector uses INT_64.

For CONNECT_LOGICAL, the connector uses org.apache.kafka.connect.data.Timestamp.

TIMESTAMP_LTZ (P), TIMESTAMP_TZ (P) STRING io.confluent.connect.snowflake.data.ZonedTimestamp A string representation of a timestamp with timezone information.

Semi-structured data types

The following table describes how the connector maps semi-structured types.

Snowflake data type Connect type
VARIANT STRING
OBJECT STRING
ARRAY STRING

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png