EXPLAIN Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables viewing and analyzing the query plans of Flink SQL statements.

Syntax

EXPLAIN { <query_statement> | <insert_statement> | <statement_set> | CREATE TABLE ... AS SELECT ... }

<statement_set>:
STATEMENT SET
BEGIN
  -- one or more INSERT INTO statements
  { INSERT INTO <select_statement>; }+
END;

Description

The EXPLAIN statement provides detailed information about how Flink executes a specified query or INSERT statement. EXPLAIN shows:

  • The optimized physical execution plan
  • If the changelog mode is not append-only, details about the changelog mode per operator
  • Upsert keys and primary keys where applicable
  • Table source and sink details

This information is valuable for understanding query performance, optimizing complex queries, and debugging unexpected results.

Example queries

Basic query analysis

This example analyzes a query finding users who clicked but never placed an order:

EXPLAIN
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
  SELECT DISTINCT customer_id
  FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;

The output shows the physical plan and operator details:

== Physical Plan ==

StreamPhysicalSink [11]
  +- StreamPhysicalCalc [10]
    +- StreamPhysicalJoin [9]
      +- StreamPhysicalExchange [3]
      :  +- StreamPhysicalCalc [2]
      :    +- StreamPhysicalTableSourceScan [1]
      +- StreamPhysicalExchange [8]
        +- StreamPhysicalGroupAggregate [7]
          +- StreamPhysicalExchange [6]
            +- StreamPhysicalCalc [5]
              +- StreamPhysicalTableSourceScan [4]

== Physical Details ==

[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
State size: low

[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low

[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
State size: medium

[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)

[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
State size: low

Note that the [11] StreamPhysicalSink Table: Foreground in the output indicates this is a preview execution plan. For more accurate optimization analysis, it’s recommended to test queries using either the final target table or CREATE TABLE AS statements, which will determine the optimal primary key and changelog mode for your specific use case.

Creating tables

This example shows creating a new table from a query:

EXPLAIN
CREATE TABLE clicks_without_orders AS
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
  SELECT DISTINCT customer_id
  FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;

The output includes sink information for the new table:

== Physical Plan ==

StreamPhysicalSink [11]
  +- StreamPhysicalCalc [10]
    +- StreamPhysicalJoin [9]
      +- StreamPhysicalExchange [3]
      :  +- StreamPhysicalCalc [2]
      :    +- StreamPhysicalTableSourceScan [1]
      +- StreamPhysicalExchange [8]
        +- StreamPhysicalGroupAggregate [7]
          +- StreamPhysicalExchange [6]
            +- StreamPhysicalCalc [5]
              +- StreamPhysicalTableSourceScan [4]

== Physical Details ==

[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
State size: low

[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low

[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
State size: medium

[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)

[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalSink
Table: `catalog`.`database`.`clicks_without_orders`
Changelog mode: retract
State size: low

Inserting values

This example shows inserting static values:

EXPLAIN
INSERT INTO orders VALUES
  (1, 1001, '2023-02-24', 50.0),
  (2, 1002, '2023-02-25', 60.0),
  (3, 1003, '2023-02-26', 70.0);

The output shows a simple insertion plan:

== Physical Plan ==

StreamPhysicalSink [6]
  +- StreamPhysicalUnion [5]
    +- StreamPhysicalCalc [2]
    :  +- StreamPhysicalValues [1]
    +- StreamPhysicalCalc [3]
    :  +- (reused) [1]
    +- StreamPhysicalCalc [4]
      +- (reused) [1]

== Physical Details ==

[1] StreamPhysicalValues
State size: low

[6] StreamPhysicalSink
Table: `catalog`.`database`.`orders`
State size: low

Multiple operations

This example demonstrates operation reuse across multiple inserts:

EXPLAIN STATEMENT SET
BEGIN
  INSERT INTO low_orders SELECT * from `orders` where price < 100;
  INSERT INTO high_orders SELECT * from `orders` where price > 100;
END;

The output shows table scan reuse:

== Physical Plan ==

StreamPhysicalSink [3]
  +- StreamPhysicalCalc [2]
    +- StreamPhysicalTableSourceScan [1]

StreamPhysicalSink [5]
  +- StreamPhysicalCalc [4]
    +- (reused) [1]

== Physical Details ==

[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low

[3] StreamPhysicalSink
Table: `catalog`.`database`.`low_orders`
State size: low

[5] StreamPhysicalSink
Table: `catalog`.`database`.`high_orders`
State size: low

Window functions

This example shows window functions and self-joins:

EXPLAIN
WITH windowed_customers AS (
  SELECT * FROM TABLE(
    TUMBLE(TABLE `examples`.`marketplace`.`customers`, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
  )
)
SELECT
    c1.window_start,
    c1.city,
    COUNT(DISTINCT c1.customer_id) as unique_customers,
    COUNT(c2.customer_id) as total_connections
FROM
    windowed_customers c1
    JOIN windowed_customers c2
    ON c1.city = c2.city
    AND c1.customer_id < c2.customer_id
    AND c1.window_start = c2.window_start
GROUP BY
    c1.window_start,
    c1.city
HAVING
    COUNT(DISTINCT c1.customer_id) > 5;

The output shows the complex processing required for windowed aggregations:

== Physical Plan ==

StreamPhysicalSink [14]
  +- StreamPhysicalCalc [13]
    +- StreamPhysicalGroupAggregate [12]
      +- StreamPhysicalExchange [11]
        +- StreamPhysicalCalc [10]
          +- StreamPhysicalJoin [9]
            +- StreamPhysicalExchange [8]
            :  +- StreamPhysicalCalc [7]
            :    +- StreamPhysicalWindowTableFunction [6]
            :      +- StreamPhysicalCalc [5]
            :        +- StreamPhysicalChangelogNormalize [4]
            :          +- StreamPhysicalExchange [3]
            :            +- StreamPhysicalCalc [2]
            :              +- StreamPhysicalTableSourceScan [1]
            +- (reused) [8]

== Physical Details ==

[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`customers`
Primary key: (customer_id)
Changelog mode: upsert
Upsert key: (customer_id)
State size: low

[2] StreamPhysicalCalc
Changelog mode: upsert
Upsert key: (customer_id)

[3] StreamPhysicalExchange
Changelog mode: upsert
Upsert key: (customer_id)

[4] StreamPhysicalChangelogNormalize
Changelog mode: retract
Upsert key: (customer_id)
State size: medium

[5] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (customer_id)

[6] StreamPhysicalWindowTableFunction
Changelog mode: retract
State size: low

[7] StreamPhysicalCalc
Changelog mode: retract

[8] StreamPhysicalExchange
Changelog mode: retract

[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalExchange
Changelog mode: retract

[12] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (window_start, city)
State size: medium

[13] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (window_start, city)

[14] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
Upsert key: (window_start, city)
State size: low

Understanding the output

Reading physical plans

The physical plan shows how Flink executes your query. Each operation is numbered and indented to show its position in the execution flow. Indentation indicates data flow, with each operator passing results to its parent.

Changelog modes

Changelog modes show how operators handle data modifications:

  • When no changelog mode appears, the operator uses “append” mode (insert-only).
  • “upsert” mode enables inserts and updates using an upsert key.
  • “retract” mode supports inserts, updates, and deletes.

Operators change changelog modes when different update patterns are needed, such as when moving from streaming reads to aggregations.

Data movement

The physical details section shows how data moves between operators. Watch for:

  • Exchange operators indicating data redistribution
  • Changes in upsert keys showing where data must be reshuffled
  • Operator reuse marked by “(reused)” references

State size

Each operator in the physical plan includes a “State Size” property indicating its memory requirements during execution:

  • LOW: Minimal state maintenance, typically efficient memory usage
  • MEDIUM: Moderate state requirements, may need attention with high cardinality
  • HIGH: Significant state maintenance that requires careful management

When operators show HIGH state size, you should configure a state TTL to prevent unbounded state growth. Without TTL configuration, these operators can accumulate unlimited state over time, potentially leading to resource exhaustion and the statement ending up in a DEGRADED state.

SET 'sql.state-ttl' = '12 hours';

For MEDIUM state size, consider TTL settings if your data has high cardinality or frequent updates per key.

Physical operators

Below is a reference of common operators you may see in EXPLAIN output, along with examples of SQL that typically produces them.

Basic operations

StreamPhysicalTableSourceScan

Reads data from a source table. The foundation of any query reading from a table.

SELECT * FROM orders;
StreamPhysicalCalc

Performs row-level computations and filtering. Appears when using WHERE clauses or expressions in SELECT.

SELECT amount * 1.1 as amount_with_tax
FROM orders
WHERE status = 'completed';
StreamPhysicalValues

Generates literal row values. Commonly seen with INSERT statements.

INSERT INTO orders VALUES (1, 'pending', 100);
StreamPhysicalSink

Writes results to a destination. Present in any INSERT or when displaying query results. Supports two modes of operation:

  • Append-only: Each record is treated as a new event, which displays as State size: Low.
  • Upsert-materialize: Maintains state to handle updates/deletes based on key fields. which displays as State size: High.
INSERT INTO order_summaries
SELECT status, COUNT(*)
FROM orders
GROUP BY status;

Aggregation operations

StreamPhysicalGroupAggregate

Performs grouping and aggregation. Created by GROUP BY clauses.

SELECT customer_id, SUM(price)
FROM orders
GROUP BY customer_id;
StreamPhysicalLocalWindowAggregate and StreamPhysicalGlobalWindowAggregate

These operators implement Flink two-phase aggregation strategy for distributed stream processing. They work together to compute aggregations efficiently across multiple parallel instances while maintaining exactly-once processing semantics.

The LocalGroupAggregate performs initial aggregation within each parallel task, maintaining partial results in its state. The GlobalGroupAggregate then combines these partial results to produce final aggregations. This two-phase approach appears in both regular GROUP BY operations and windowed aggregations.

For window operations, these operators appear as StreamPhysicalLocalWindowAggregate and StreamPhysicalGlobalWindowAggregate. Here’s an example that triggers their use:

SELECT window_start, window_end, SUM(price) as total_price
   FROM TABLE(
       TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
   GROUP BY window_start, window_end;

Join operations

StreamPhysicalJoin

Performs standard stream-to-stream joins.

SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;
StreamPhysicalTemporalJoin

Joins streams using temporal (time-versioned) semantics.

SELECT
     orders.*,
     customers.*
FROM orders
LEFT JOIN customers FOR SYSTEM_TIME AS OF orders.`$rowtime`
ON orders.customer_id = customers.customer_id;
StreamPhysicalIntervalJoin

Joins streams within a time interval.

SELECT *
FROM orders o, clicks c
WHERE o.customer_id = c.user_id
AND o.`$rowtime` BETWEEN c.`$rowtime` - INTERVAL '1' MINUTE AND c.`$rowtime`;
StreamPhysicalWindowJoin

Joins streams within defined windows.

SELECT *
FROM (
    SELECT * FROM TABLE(TUMBLE(TABLE clicks, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) c
JOIN (
    SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) o
ON c.user_id = o.customer_id
    AND c.window_start = o.window_start
    AND c.window_end = o.window_end;

Ordering and ranking

StreamPhysicalRank

Computes the smallest or largest values (Top-N queries).

SELECT product_id, price
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY price DESC) AS row_num
  FROM orders)
WHERE row_num <= 5;
StreamPhysicalLimit

Limits the number of returned rows.

SELECT * FROM orders LIMIT 10;
StreamPhysicalSortLimit

Combines sorting with row limiting.

SELECT * FROM orders ORDER BY $rowtime LIMIT 10;
StreamPhysicalWindowRank

Computes the smallest or largest values within window boundaries (Window Top-N queries).

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, customer_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, customer_id
    )
  ) WHERE rownum <= 3;

Data movement and distribution

StreamPhysicalExchange

Redistributes/exchanges data between parallel instances. For example, when you write a query with a GROUP BY clause, Flink might use a HASH exchange to ensure all records with the same key are processed by the same task:

-- Appears in plans with GROUP BY on a different key than the source distribution
SELECT customer_id, COUNT(*)
   FROM orders
   GROUP BY customer_id;
StreamPhysicalUnion

Combines results from multiple queries.

SELECT * FROM european_orders
UNION ALL
SELECT * FROM american_orders;
StreamPhysicalExpand

Generates multiple rows from a single row for CUBE, ROLLUP, and GROUPING SETS.

SELECT
    department,
    brand,
    COUNT(*) as product_count,
    COUNT(DISTINCT vendor) as vendor_count
FROM products
GROUP BY CUBE(department, brand)
HAVING COUNT(*) > 1;

Specialized operations

StreamPhysicalChangelogNormalize

Converts upsert-based changelog streams (based on primary key) into retract-based streams (with explicit +/- records) to support correct aggregation results in streaming queries.

-- Appears when processing versioned data, like a table that uses upsert semantics
SELECT COUNT(*) as cnt
FROM products;
StreamPhysicalWindowTableFunction

Applies windowing operations as table functions.

SELECT * FROM TABLE(
     TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '1' HOUR)
   );
StreamPhysicalCorrelate

Handles correlated subqueries and table function calls.

EXPLAIN
SELECT
    product_id,
    product_name,
    tag
FROM (
    VALUES
        (1, 'Laptop', ARRAY['electronics', 'computers']),
        (2, 'Phone', ARRAY['electronics', 'mobile'])
) AS products (product_id, product_name, tags)
CROSS JOIN UNNEST(tags) AS t (tag);
StreamPhysicalMatch

Executes pattern-matching operations using MATCH_RECOGNIZE.

SELECT *
   FROM orders
   MATCH_RECOGNIZE (
     PARTITION BY customer_id
     ORDER BY $rowtime
     MEASURES
       COUNT(*) as order_count
     PATTERN (A B+)
     DEFINE
       A as price > 100,
       B as price <= 100
   );

Optimizing query performance

Minimizing data movement

Data shuffling impacts performance. When examining EXPLAIN output:

  • Look for exchange operators and upsert key changes.
  • Consider keeping compatible partitioning keys through your query.
  • Watch for opportunities to reduce data redistribution.

Pay special attention to data skew when designing your queries. If a particular key value appears much more frequently than others, it can lead to uneven processing where a single parallel instance becomes overwhelmed handling that key’s data. Consider strategies like adding additional dimensions to your keys or pre-aggregating hot keys to distribute the workload more evenly.

Using operator reuse

Flink automatically reuses operators when possible. In EXPLAIN output:

  • Look for “(reused)” references showing optimization.
  • Consider restructuring queries to enable more reuse.
  • Verify that similar operations share scan results.

Optimizing sink configuration

When working with sinks in upsert mode, it’s crucial to align your primary and upsert keys for optimal performance:

  • Whenever possible, configure the primary key to be identical to the upsert key.
  • Having different primary and upsert keys in upsert mode can lead to significant performance degradation.
  • If you must use different keys, carefully evaluate the performance impact and consider restructuring your query to align these keys.