Kafka Connect MessageTimestampRouter SMT for Confluent Platform¶
The following provides usage information for the Confluent Single Message Transformation (SMT) io.confluent.connect.transforms.MessageTimestampRouter
.
Description¶
Update the record’s topic field as a function of the original topic value and the record’s timestamp field.
This is useful for sink connectors, because the topic field often determines the equivalent entity name in the destination system (for example, a database table or search index name). This SMT extracts the timestamp from the message value’s specified field, which is especially useful for log data in which the timestamp is stored as a field in the message. The message value must be a Map instance (Structs are not currently supported). See Kafka Connect TimestampRouter SMT for Confluent Platform to specify a basic topic pattern and timestamp format.
Installation¶
This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Platform. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Example¶
The following example extracts a field named timestamp
, time
, or ts
from the message value, in the order specified by the message.timestamp.keys
configuration. This timestamp value is originally in the format specified by
message.timestamp.format
. It adds a topic prefix and appends the timestamp
of the format specified by topic.timestamp.format
to the message topic.
"transforms": "MessageTimestampRouter",
"transforms.MessageTimestampRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.MessageTimestampRouter.topic.format": "foo-${topic}-${timestamp}",
"transforms.MessageTimestampRouter.message.timestamp.format": "yyyy-MM-dd",
"transforms.MessageTimestampRouter.topic.timestamp.format": "yyyy.MM.dd",
"transforms.MessageTimestampRouter.message.timestamp.keys": "timestamp,time,ts"
Message value: {"time":"2019-08-06"}
Topic (before): bar
Topic (after): foo-bar-2019.08.06
Tip
For additional examples, see Message Timestamp Router for managed connectors.
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
topic.format |
Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively. |
string | ${topic}-${timestamp} |
high | |
message.timestamp.format |
Format string for the message’s timestamp that is compatible with java.time.format.DateTimeFormatter . For additional details, see DateTimeFormatter. If no configuration or an empty string is provided, defaults to the format string for timestamp of ISO8601 standard, with mandatory date and optional time. |
string | “” | low | |
topic.timestamp.format |
Format string for the topic’s timestamp that is compatible with java.time.format.DateTimeFormatter . For additional details, see DateTimeFormatter. |
string | yyyy.MM.dd | high | |
message.timestamp.keys |
Comma-separated list of field names to look up the timestamp in the message value, in the order the names are listed. The timestamp is taken from the first found field. | string | high |
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.