Confluent Cloud Metrics¶
Metrics in Confluent Cloud are available either through first-class integrations with third-party monitoring providers or by directly querying the Confluent Cloud Metrics API. Users who would like to monitor Confluent Cloud are encouraged to use an integration to reduce the operational burden of monitoring. The Confluent Cloud Metrics API supports a diverse set of querying patterns to support usage and performance analysis over time.
This page is meant to be instructional and to help you get started with using the metrics that Confluent Cloud provides. For more information on the Confluent Cloud Metrics API, see the API Reference.
Metrics Quick Start¶
- Prerequisites
- Access to Confluent Cloud
- Internet connectivity
Create a API key to authenticate to the Metrics API. For example:
confluent login
confluent environment use env-abc123
confluent kafka cluster use lkc-YYYYY
confluent api-key create --resource cloud
Note
You must use a API Key resource-scoped for resource management to communicate with the Metrics API. Using an API Key resource-scoped for a Kafka cluster causes an authentication error.
See also
For an example that showcases how to monitor an Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to show metrics results, see the Observability for Kafka Clients to Confluent Cloud.
Add the MetricsViewer role to a new service account¶
The MetricsViewer role provides service account access to the Metrics API for all clusters in an organization. This role also enables service accounts to import metrics into third-party metrics platforms.
To assign the MetricsViewer role to a new service account:
Run the following commands to add a role binding for MetricsViewer to a new
service account. Remember to log in with the confluent login
command first.
Create the service account:
confluent iam service-account create MetricsImporter --description "A test service account to import Confluent Cloud metrics into our monitoring system"
Your output should resemble:
+-------------+--------------------------------+ | ID | sa-123abc | | Name | MetricsImporter | | Description | A test service account to | | | import Confluent Cloud metrics | | | into our monitoring system | +-------------+--------------------------------+
Make note of the ID field.
Add the MetricsViewer role binding to the service account:
confluent iam rbac role-binding create --role MetricsViewer --principal User:sa-123abc
Your output should resemble:
+-----------+----------------+ | Principal | User:sa-123abc | | Role | MetricsViewer | +-----------+----------------+
List the role bindings to confirm that the MetricViewer role was created:
confluent iam rbac role-binding list --principal User:sa-123abc
Your output should resemble:
Principal | Email | Role | Environment | ... -----------------+-------+---------------+-------------+---- User:sa-123abc | | MetricsViewer | |
List the existing service accounts:
confluent iam service-account list
Your output should resemble:
ID | Name | Description ------------+--------------------------------+----------------------------------- sa-1a2b3c | test-account | for testing sa-112233 | ProactiveSupport.1614189731753 | SA for Proactive Support sa-aabbcc | KSQL.lksqlc-ab123 | SA for KSQL w/ ID lksqlc-ab123 | | and Name ksqlDB_app_0 ...
Create an API key and add it to the new service account:
confluent api-key create --resource cloud --service-account sa-123abc
Your output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | 1234567ABCDEFGHI | | Secret | ABCDEF123456.................................................... | +---------+------------------------------------------------------------------+
Save the API key and secret in a secure location.
- In the top-right administration menu (☰) in the upper-right corner of the Confluent Cloud user interface, click ADMINISTRATION > API keys.
- Click Add key.
- Click the Granular access tile to set the scope for the API key. Click Next.
- Click Create a new one and specify the service account name, and optionally, a description. Click Next.
- The API key and secret are generated for the service account. You will need this API key and secret to connect to the cluster, so be sure to safely store this information. Click Save. The new service account with the API key and associated ACLs is created. When you return to the API access tab, you can view the newly-created API key to confirm.
- Return to Accounts & access in the administration menu, and in the Accounts tab, click Service accounts to view your service accounts.
- Select the service account that you want to assign the MetricsViewer role to.
- In service account’s details page, click Access.
- In the tree view, open the resource where you want the service account to have the MetricsViewer role.
- Click Add role assignment and select the MetricsViewer tile. Click Save.
When you return to Accounts & access, you can view the resources for the organization, and also see that the service account you created has the MetricsViewer role binding.
Integrate with third-party monitoring¶
Integrating directly with a third-party monitoring tool allows you to monitor Confluent Cloud alongside the rest of your applications.
Datadog¶
Datadog provides an integration where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Datadog UI, select resources to monitor, and see metrics in minutes using an out-of-the-box dashboard. If you use Datadog, create your Confluent Cloud API key and follow the instructions from Datadog to get started. After configuring the integration, search the Datadog dashboards for “Confluent Cloud Overview,” the default Confluent Cloud dashboard at Datadog. Clone the default dashboard so that you can edit it to suit your needs.
Dynatrace¶
Dynatrace provides an extension where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Dynatrace Monitoring Configuration, select resources to monitor, and see metrics in minutes in a prebuilt dashboard. If you use Dynatrace, create your Confluent Cloud API key (resource-scoped for resource management) and follow the instructions to get started.
Grafana Cloud¶
Grafana Labs provides an integration where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Grafana Cloud UI, select resources to monitor, and see metrics in minutes using an out-of-the-box-dashboard. If you use Grafana Cloud, create your Confluent Cloud API key (resource-scoped for resource management) and follow the instructions to get started.
Prometheus¶
Prometheus servers can scrape the Confluent Cloud Metrics API directly by making use of the export
endpoint. This endpoint returns the single
most recent data point for each metric, for each distinct combination of labels in the Prometheus exposition or Open Metrics format.
For more information, see Export metric values.
New Relic OpenTelemetry¶
You can collect metrics about your Confluent Cloud-managed Kafka deployment with the New Relic OpenTelemetry collector. The collector is a component of OpenTelemetry that collects, processes, and exports telemetry data to New Relic, or any observability back-end. For more information, see Monitoring Confluent Cloud Kafka with OpenTelemetry Collector.
Discovery using the Metrics API¶
The following examples use HTTPie rather than cURL. This software package can be installed using most common software package managers by following the documentation .
The Confluent Cloud Metrics API provides endpoints for programmatic discovery of available resources and their metrics. This resource and metric metadata is represented by descriptor
objects.
The discovery endpoints can be used to avoid hardcoding metric and resource names into client scripts.
Discover available resources¶
A resource represents the entity against which metrics are collected, for example, a Kafka cluster, a Kafka Connector, a ksqlDB application, etc.
Get a description of the available resources by sending a GET
request to the descriptors/resources
endpoint of the API:
http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/resources' --auth '<API_KEY>:<SECRET>'
This returns a JSON document describing the available resources to query and their labels.
Discover available metrics¶
Get a description of the available metrics by sending a GET
request to the descriptors/metrics
endpoint of the API:
http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/metrics?resource_type=kafka' --auth '<API_KEY>:<SECRET>'
Note
The resource_type
query parameter is required to specify the type of resource for which to list metrics. The valid resource types can be determined using the /descriptors/resources
endpoint.
This returns a JSON document describing the available metrics to query and their labels.
A human-readable list of the current metrics is available in the API Reference.
Example Queries¶
The Confluent Cloud Metrics API has an expressive query language that allows users to flexibly filter and group timeseries data. Example queries are provided as a template. Additional examples can be found within the Cloud Console which also uses the Confluent Cloud Metrics API.
Timestamps in metrics queries use UTC (Coordinated Universal Time) time. Use either UTC or an offset appropriate for your location.
Query for bytes produced to the cluster per minute grouped by topic¶
This query measures bytes produced (ingress). If you want to query bytes consumed (egress), see Query for bytes consumed from the cluster per minute grouped by topic.
Note that if you are using Cluster Linking, the received_bytes
does not include the mirror-in bytes to the cluster. You can use the cluster_link_destination_response_bytes
metrics to query the mirror-in bytes instead.
Create a file named
received_bytes_query.json
using the following template. Be sure to changelkc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/received_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.topic" ], "intervals": [ "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00" ], "limit": 25 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < received_bytes_query.json
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:00:00Z", "metric.topic": "test-topic", "value": 72.0 }, { "timestamp": "2019-12-19T16:01:00Z", "metric.topic": "test-topic", "value": 139.0 }, { "timestamp": "2019-12-19T16:02:00Z", "metric.topic": "test-topic", "value": 232.0 }, { "timestamp": "2019-12-19T16:03:00Z", "metric.topic": "test-topic", "value": 0.0 }, { "timestamp": "2019-12-19T16:04:00Z", "metric.topic": "test-topic", "value": 0.0 } ] }
Query for bytes consumed from the cluster per minute grouped by topic¶
This query measures bytes consumed (egress). If you want to query bytes produced (ingress), see Query for bytes produced to the cluster per minute grouped by topic.
Note that if you are using Cluster Linking, the sent_bytes
metrics also includes the mirror-out bytes from the cluster.
Create a file named
sent_bytes_query.json
using the following template. Be sure to changelkc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/sent_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.topic" ], "intervals": [ "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00" ], "limit": 25 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sent_bytes_query.json
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:01:00Z", "metric.topic": "test-topic", "value": 0.0 }, { "timestamp": "2019-12-19T16:02:00Z", "metric.topic": "test-topic", "value": 157.0 }, { "timestamp": "2019-12-19T16:03:00Z", "metric.topic": "test-topic", "value": 371.0 }, { "timestamp": "2019-12-19T16:04:00Z", "metric.topic": "test-topic", "value": 0.0 } ] }
Note
If you haven’t produced data during the time window, the dataset is empty for a given topic. For more details on
sent_bytes
andreceived_bytes
in Cluster Linking, please refer to Cluster Linking Performance Limits
Query for max retained bytes per hour over 2 hours for a cluster lkc-XXXXX
¶
Create a file named
cluster_retained_bytes_query.json
using the following template. Be sure to changelkc-XXXXX
and the timestamp values to match your needs:{ "aggregations": [ { "metric": "io.confluent.kafka.server/retained_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2019-12-19T11:00:00-05:00/P0Y0M0DT2H0M0S" ], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cluster_retained_bytes_query.json
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:00:00Z", "value": 507350.0 }, { "timestamp": "2019-12-19T17:00:00Z", "value": 507350.0 } ] }
Query for average consumer lag over the last hour grouped by topic and consumer group¶
Create a file named
consumer_lag_max_hour.json
using the following template. Be sure to changelkc-XXXXX
and note the interval is for the last hour with a 1-minute granularity.{ "aggregations": [ { "metric": "io.confluent.kafka.server/consumer_lag_offsets" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.consumer_group_id", "metric.topic" ], "intervals": [ "PT1H/now" ], "limit": 25 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < consumer_lag_max_hour.json
Your output should resemble:
{ "data": [ { "metric.consumer_group_id": "group_1", "metric.topic": "test_topic_1", "timestamp": "2022-03-23T21:00:00Z", "value": 0.0 }, { "metric.consumer_group_id": "group_2", "metric.topic": "test_topic_2", "timestamp": "2022-03-23T21:00:00Z", "value": 6.0 } ] }
Query for the number of streaming units used per hour for ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_streaming_unit_count.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/streaming_unit_count" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ], "group_by": [ "resource.ksql.id" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_streaming_unit_count.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 4.0 } ] }
Query for the max % of storage used over all CSUs for a ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_storage_utilization.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/storage_utilization" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_storage_utilization.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 0.85 } ] }
Query for the bytes of ksqlDB storage used by a query on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_query_storage.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/task_stored_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changedAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_storage.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "metric.query_id": "CTAS_PAGEVIEWS_2", "timestamp": "2021-02-24T10:00:00Z", "value": 7688174488 } ] }
Query for the bytes of storage used by a task on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_task_storage.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/task_stored_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.task_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_task_storage.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "timestamp": "2021-02-24T10:00:00Z", "value": 1079295760 } ] }
Query for the query saturation on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_query_saturation.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/query_saturation" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_saturation.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 0.85 } ] }
Query for the total bytes consumed by ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_bytes_consumed.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/consumed_total_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_consumed.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1024 } ] }
Query for the total bytes produced by ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_bytes_produced.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/produced_total_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_produced.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1024 } ] }
Query for the total topic offsets processed by task on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_offsets_processed.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/offsets_processed_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.tasK_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 123 } ] }
Query for the total topic offsets processed by all tasks of query on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_offsets_processed.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/offsets_processed_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 123 } ] }
Query for the current committed offset lag by task on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_offset_lag.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/committed_offset_lag" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.tasK_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 456 } ] }
Query for the current total committed offset lag for all tasks in query on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_offset_lag.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/committed_offset_lag" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 456 } ] }
Query for the total number of processing errors by query on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_processing_errors.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/processing_errors_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_processing_errors.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 16 } ] }
Query for the total number of restarts due to failure by query on ksqlDB cluster lksqlc-XXXXX
¶
Create a file named
ksql_query_restarts.json
using the following template. Be sure to changelksqlc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/query_restarts" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_restarts.json
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 3 } ] }
Query for the number of schemas in the Schema Registry cluster lsrc-XXXXX
¶
Create a file named
schema_count.json
using the following template. Be sure to changelsrc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.schema_registry/schema_count" } ], "filter": { "field": "resource.schema_registry.id", "op": "EQ", "value": "lsrc-XXXXX" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T10:01:00Z" ], "group_by": [ "resource.schema_registry.id" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < schema_count.json
Your output should resemble:
{ "data": [ { "resource.schema_registry.id": "lsrc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1.0 } ] }
Query for the hourly number of records received by a sink connector lcc-XXXXX
¶
Create a file named
sink_connector_record_number.json
using the following template. Be sure to changelcc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.connect/received_records" } ], "filter": { "field": "resource.connector.id", "op": "EQ", "value": "lcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ], "group_by": [ "resource.connector.id" ] }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your Confluent Cloud cluster credentials (--resource cloud
credentials).http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sink_connector_record_number.json
Your output should resemble:
{ "data": [ { "resource.connector.id": "lcc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 26455991.0 } ] }
Query for the total free memory on a custom connector clcc-XXXXX
¶
Create a file named
custom_connector_free_memory.json
using the following template. Be sure to changeclcc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.system/memory_free_bytes" } ], "filter": { "field": "resource.custom_connector.id", "op": "EQ", "value": "clcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z" ], "group_by": [ "resource.custom_connector.id" ] }
Submit the query as a
POST
using the following command.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_free_memory.json
Your output should resemble:
{ "data": [ { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T10:00:00Z", "value": 125229329.06666666 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T11:00:00Z", "value": 125193966.93333334 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T12:00:00Z", "value": 125140241.06666666 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T13:00:00Z", "value": 125099622.4 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T14:00:00Z", "value": 124849493.33333333 } ] }
For Confluent Cloud UI metrics for custom connectors, see View metrics.
Query for the total percent CPU used by a custom connector clcc-XXXXX
¶
Create a file named
custom_connector_percent_cpu.json
using the following template. Be sure to changeclcc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.system/cpu_load_percent" } ], "filter": { "field": "resource.custom_connector.id", "op": "EQ", "value": "clcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z" ], "group_by": [ "resource.custom_connector.id" ] }
Submit the query as a
POST
using the following command.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_percent_cpu.json
Your output should resemble:
{ "data": [ { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T10:00:00Z", "value": 0.021009808092643977 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T11:00:00Z", "value": 0.01990721858932965 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T12:00:00Z", "value": 0.020799848444189233 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T13:00:00Z", "value": 0.019948515028905416 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T14:00:00Z", "value": 0.020734587261390117 } ] }
For Confluent Cloud UI metrics for custom connectors, see View metrics.
Query for metrics for a specific Principal ID¶
The metric.principal_id
label can be used to filter metrics by specific users or service accounts. Metrics such as io.confluent.kafka.server/active_connection_count
and io.confluent.kafka.server/request_count
support filtering by the
metric.principal_id
label. To see all metrics that currently support this label see the API Reference.
Create a file named
principal_query.json
using the following template. Be sure to changelkc-XXXXX
and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/active_connection_count" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1H", "group_by": [ "metric.principal_id" ], "intervals": [ "2022-01-01T00:00:00Z/PT1H" ], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < principal_query.json
Your output should resemble:
{ "data": [ { "metric.principal_id": "sa-abc123", "timestamp": "2022-01-01T00:00:00Z", "value": 430.99999999997 }, { "metric.principal_id": "u-def456", "timestamp": "2022-01-01T00:00:00Z", "value": 427.93333333332 }, { "metric.principal_id": "u-abc123", "timestamp": "2022-01-01T00:00:00Z", "value": 333.19999999997 } ], "meta": { "pagination": { "next_page_token": "eyJ2ZXJzaW9uIjoiMSIsInJlcXVlc3RI", "page_size": 5 } }
Note
Topics without reported metric values during the specified interval aren’t returned.
Query for the total number of records a Flink SQL statement has received¶
Create a file named
num_records_in.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), statement name (XXXXXXXX-XXXX-XXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_in" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_in.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 115.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 131.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 127.0 } ] }
Query for the total number of records a Flink SQL statement has emitted¶
Create a file named
num_records_out.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), statement name (XXXXXXXX-XXXX-XXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_out" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_out.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 115.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 131.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 127.0 } ] }
Query for the backlog of a Flink SQL statement¶
This metric represents the total number of available records after the consumer offset in a Kafka partition for a Flink SQL statement, across all operators.
Create a file named
pending_records.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), statement name (XXXXXXXX-XXXX-XXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/pending_records" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < pending_records.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 0.0 } ] }
Note
The above value may not always be zero. A non-zero value indicates some backlog associated with the Flink statement.
Query for the total number of records all Flink SQL statements leveraging a Flink compute pool have received¶
Create a file named
num_records_in.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_in" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_in.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 236.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 228.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 240.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 230.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 252.0 } ] }
Query for the total number of records all Flink SQL statements leveraging a Flink compute pool have emitted¶
Create a file named
num_records_out.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_out" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_out.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 236.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 228.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 240.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 230.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 252.0 } ] }
Query for the total backlog of all Flink SQL statements leveraging a Flink compute pool¶
This metric represents the total number of available records after the consumer offset in a Kafka partition for all Flink SQL statements leveraging a Flink compute pool, across all operators.
Create a file named
pending_records.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/pending_records" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < pending_records.json
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 0.0 } ] }
Note
The above value may not always be zero. A non-zero value indicates the combined backlog associated with the Flink statements leveraging the Flink compute pool in the query.
Query for the absolute number of CFUs at a given moment in a Flink compute pool¶
This metric represents the absolute number of CFUs or the current usage at a given moment in a Flink compute pool.
Create a file named
current_cfus.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/compute_pool_utilization/current_cfus" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < current_cfus.json
Your output should resemble:
{ "data": [ { "timestamp": "2024-05-15T14:00:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:01:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:02:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:03:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:04:00Z", "value": 3.0 } ] }
Query for the maximum number of CFUs assigned to a Flink compute pool¶
This metric represents the maximum number of CFUs assigned to a Flink compute pool. When Flink statements are running, the compute pool is autoscaled up to this maximum number of CFUs assigned to a Flink compute pool.
Create a file named
cfu_limit.json
using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX
), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/compute_pool_utilization/cfu_limit" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"], "limit": 5 }
Submit the query as a
POST
using the following command. Be sure to changeAPI_KEY
andSECRET
to match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cfu_limit.json
Your output should resemble:
{ "data": [ { "timestamp": "2024-05-15T14:00:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:01:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:02:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:03:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:04:00Z", "value": 10.0 } ] }
FAQ¶
Can the Metrics API be used to reconcile my bill?¶
No, the Metrics API is intended to provide information for the purposes of monitoring, troubleshooting, and capacity planning. It is not intended as an audit system for reconciling bills as the metrics do not include request overhead for the Kafka protocol at this time. For more details, see the billing documentation.
Why am I seeing empty data sets for topics that exist on queries other than for retained_bytes
?¶
If there are only values of 0.0 in the time range queried, than the API will return an empty set. When there is non-zero data within the time range, time slices with values of 0.0 are returned.
Why didn’t retained_bytes
decrease after I changed the retention policy for my topic?¶
The value of retained_bytes
is calculated as the maximum over the interval for each data point returned.
If data has been deleted during the current interval, you will not see the effect until the next time range window begins.
For example, if you produced four GB of data per day over the last 30 days and queried for retained_bytes
over
the last three days with a one day interval, the query would return values of 112 GB, 116 GB, 120 GB as a time series. If
you then deleted all data in the topic and stopped producing data, the query would return the same values until the
next day. When queried at the start of the next day, the same query would return 116 GB, 120 GB, 0 GB.
What are the supported granularity levels?¶
Data is stored at a granularity of one minute. However, the allowed granularity for a query is restricted by the size of the query’s interval.
For the currently supported granularity levels and query restrictions, see the API Reference.
What is the retention time of metrics in the Metrics API?¶
Metrics are retained for seven days.
Do not confuse retention time with the granularity levels and query intervals mentioned in the API Reference. Confluent uses granularity and intervals to validate requests. Retention time refers to the length of time Confluent stores data. The largest interval Confluent supports is seven days. Making a request with an interval greater than seven days is useless because Confluent doesn’t retain data past seven days.
How do I monitor consumer lag?¶
- Query for the average consumer lag by using the Metrics API.
- Also, there are multiple other ways to monitor Consumer lag including the client metrics, UI, CLI, and Admin API. These methods are all available when using Confluent Cloud.
How do I know if a given metric is in preview or generally available (GA)?¶
You can find each metric’s lifecycle stage (preview
, generally available
, etc.) in the
response from the /descriptors/metrics
endpoint. While a metric is in preview
stage, you may find
breaking changes to labels without an API version change. This type iteration is necessary for Confluent to
stabilize changes and ensure the metric is suitable for most use cases.
What should I do if a query to Metrics API returns a timeout response (HTTP error code 504)?¶
If queries are exceeding the timeout (maximum query time is 60s) you may consider one or more of the following approaches:
- Reduce the time interval.
- Reduce the granularity of data returned.
- Break up the query on the client side to return fewer data points. For example, you can query for specific topics instead of all topics at once.
These approaches are especially important to when querying for partition-level data over days-long intervals.
Why are my Confluent Cloud metrics displaying only 1hr/6hrs/24hrs worth of data?¶
This is a known limitation that occurs in some clusters with a partition count of more than 2,000. Work is ongoing to resolve this issue, but there is no fix at this time.
What should I do if a query returns a 5xx response code?¶
Confluent recommends you retry these type of responses. Usually, this is an indication of a transient server-side issue. You should design your client implementations for querying the Metrics API to be resilient to this type of response for minutes-long periods.
How do I collect metrics for Confluent Cloud resources using Cloud Console?¶
From the Administration menu, select Metrics.
From Explore available metrics, select a Metric and a Resource. If there is data available for the metric you selected, the chart displays the data.
You can select a new time interval to meet your needs.
To copy a cURL template of the query used to display the selected data, select Copy cURL template. A template of the cURL command is added to your clipboard. Paste the template into a command prompt (Windows) or terminal (Mac, Linux). Edit the template to add an API key and secret to authenticate the request.