EXPLAIN Statement in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables viewing and analyzing the query plans of Flink SQL statements.
Syntax¶
EXPLAIN { <query_statement> | <insert_statement> | <statement_set> | CREATE TABLE ... AS SELECT ... }
<statement_set>:
STATEMENT SET
BEGIN
-- one or more INSERT INTO statements
{ INSERT INTO <select_statement>; }+
END;
Description¶
The EXPLAIN statement provides detailed information about how Flink executes a specified query or INSERT statement. EXPLAIN shows:
- The optimized physical execution plan
- If the changelog mode is not append-only, details about the changelog mode per operator
- Upsert keys and primary keys where applicable
- Table source and sink details
This information is valuable for understanding query performance, optimizing complex queries, and debugging unexpected results.
Example queries¶
Basic query analysis¶
This example analyzes a query finding users who clicked but never placed an order:
EXPLAIN
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
SELECT DISTINCT customer_id
FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;
The output shows the physical plan and operator details:
== Physical Plan ==
StreamPhysicalSink [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- StreamPhysicalExchange [8]
+- StreamPhysicalGroupAggregate [7]
+- StreamPhysicalExchange [6]
+- StreamPhysicalCalc [5]
+- StreamPhysicalTableSourceScan [4]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
State size: low
[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low
[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
State size: medium
[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)
[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
State size: low
Note that the [11] StreamPhysicalSink Table: Foreground
in the output indicates
this is a preview execution plan. For more accurate optimization analysis, it’s
recommended to test queries using either the final target table or CREATE TABLE AS
statements, which will determine the optimal primary key and changelog mode for
your specific use case.
Creating tables¶
This example shows creating a new table from a query:
EXPLAIN
CREATE TABLE clicks_without_orders AS
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
SELECT DISTINCT customer_id
FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;
The output includes sink information for the new table:
== Physical Plan ==
StreamPhysicalSink [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- StreamPhysicalExchange [8]
+- StreamPhysicalGroupAggregate [7]
+- StreamPhysicalExchange [6]
+- StreamPhysicalCalc [5]
+- StreamPhysicalTableSourceScan [4]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
State size: low
[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low
[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
State size: medium
[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)
[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalSink
Table: `catalog`.`database`.`clicks_without_orders`
Changelog mode: retract
State size: low
Inserting values¶
This example shows inserting static values:
EXPLAIN
INSERT INTO orders VALUES
(1, 1001, '2023-02-24', 50.0),
(2, 1002, '2023-02-25', 60.0),
(3, 1003, '2023-02-26', 70.0);
The output shows a simple insertion plan:
== Physical Plan ==
StreamPhysicalSink [6]
+- StreamPhysicalUnion [5]
+- StreamPhysicalCalc [2]
: +- StreamPhysicalValues [1]
+- StreamPhysicalCalc [3]
: +- (reused) [1]
+- StreamPhysicalCalc [4]
+- (reused) [1]
== Physical Details ==
[1] StreamPhysicalValues
State size: low
[6] StreamPhysicalSink
Table: `catalog`.`database`.`orders`
State size: low
Multiple operations¶
This example demonstrates operation reuse across multiple inserts:
EXPLAIN STATEMENT SET
BEGIN
INSERT INTO low_orders SELECT * from `orders` where price < 100;
INSERT INTO high_orders SELECT * from `orders` where price > 100;
END;
The output shows table scan reuse:
== Physical Plan ==
StreamPhysicalSink [3]
+- StreamPhysicalCalc [2]
+- StreamPhysicalTableSourceScan [1]
StreamPhysicalSink [5]
+- StreamPhysicalCalc [4]
+- (reused) [1]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
State size: low
[3] StreamPhysicalSink
Table: `catalog`.`database`.`low_orders`
State size: low
[5] StreamPhysicalSink
Table: `catalog`.`database`.`high_orders`
State size: low
Window functions¶
This example shows window functions and self-joins:
EXPLAIN
WITH windowed_customers AS (
SELECT * FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`customers`, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
)
)
SELECT
c1.window_start,
c1.city,
COUNT(DISTINCT c1.customer_id) as unique_customers,
COUNT(c2.customer_id) as total_connections
FROM
windowed_customers c1
JOIN windowed_customers c2
ON c1.city = c2.city
AND c1.customer_id < c2.customer_id
AND c1.window_start = c2.window_start
GROUP BY
c1.window_start,
c1.city
HAVING
COUNT(DISTINCT c1.customer_id) > 5;
The output shows the complex processing required for windowed aggregations:
== Physical Plan ==
StreamPhysicalSink [14]
+- StreamPhysicalCalc [13]
+- StreamPhysicalGroupAggregate [12]
+- StreamPhysicalExchange [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [8]
: +- StreamPhysicalCalc [7]
: +- StreamPhysicalWindowTableFunction [6]
: +- StreamPhysicalCalc [5]
: +- StreamPhysicalChangelogNormalize [4]
: +- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- (reused) [8]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`customers`
Primary key: (customer_id)
Changelog mode: upsert
Upsert key: (customer_id)
State size: low
[2] StreamPhysicalCalc
Changelog mode: upsert
Upsert key: (customer_id)
[3] StreamPhysicalExchange
Changelog mode: upsert
Upsert key: (customer_id)
[4] StreamPhysicalChangelogNormalize
Changelog mode: retract
Upsert key: (customer_id)
State size: medium
[5] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (customer_id)
[6] StreamPhysicalWindowTableFunction
Changelog mode: retract
State size: low
[7] StreamPhysicalCalc
Changelog mode: retract
[8] StreamPhysicalExchange
Changelog mode: retract
[9] StreamPhysicalJoin
Changelog mode: retract
State size: medium
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalExchange
Changelog mode: retract
[12] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (window_start, city)
State size: medium
[13] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (window_start, city)
[14] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
Upsert key: (window_start, city)
State size: low
Understanding the output¶
Reading physical plans¶
The physical plan shows how Flink executes your query. Each operation is numbered and indented to show its position in the execution flow. Indentation indicates data flow, with each operator passing results to its parent.
Changelog modes¶
Changelog modes show how operators handle data modifications:
- When no changelog mode appears, the operator uses “append” mode (insert-only).
- “upsert” mode enables inserts and updates using an upsert key.
- “retract” mode supports inserts, updates, and deletes.
Operators change changelog modes when different update patterns are needed, such as when moving from streaming reads to aggregations.
Data movement¶
The physical details section shows how data moves between operators. Watch for:
- Exchange operators indicating data redistribution
- Changes in upsert keys showing where data must be reshuffled
- Operator reuse marked by “(reused)” references
State size¶
Each operator in the physical plan includes a “State Size” property indicating its memory requirements during execution:
- LOW: Minimal state maintenance, typically efficient memory usage
- MEDIUM: Moderate state requirements, may need attention with high cardinality
- HIGH: Significant state maintenance that requires careful management
When operators show HIGH state size, you should configure a state TTL to prevent
unbounded state growth. Without TTL configuration, these operators can accumulate
unlimited state over time, potentially leading to resource exhaustion and the
statement ending up in a DEGRADED
state.
SET 'sql.state-ttl' = '12 hours';
For MEDIUM state size, consider TTL settings if your data has high cardinality or frequent updates per key.
Physical operators¶
Below is a reference of common operators you may see in EXPLAIN output, along with examples of SQL that typically produces them.
Basic operations¶
- StreamPhysicalTableSourceScan
Reads data from a source table. The foundation of any query reading from a table.
SELECT * FROM orders;
- StreamPhysicalCalc
Performs row-level computations and filtering. Appears when using WHERE clauses or expressions in SELECT.
SELECT amount * 1.1 as amount_with_tax FROM orders WHERE status = 'completed';
- StreamPhysicalValues
Generates literal row values. Commonly seen with INSERT statements.
INSERT INTO orders VALUES (1, 'pending', 100);
- StreamPhysicalSink
Writes results to a destination. Present in any INSERT or when displaying query results. Supports two modes of operation:
- Append-only: Each record is treated as a new event, which displays as State size: Low.
- Upsert-materialize: Maintains state to handle updates/deletes based on key fields. which displays as State size: High.
INSERT INTO order_summaries SELECT status, COUNT(*) FROM orders GROUP BY status;
Aggregation operations¶
- StreamPhysicalGroupAggregate
Performs grouping and aggregation. Created by GROUP BY clauses.
SELECT customer_id, SUM(price) FROM orders GROUP BY customer_id;
- StreamPhysicalLocalWindowAggregate and StreamPhysicalGlobalWindowAggregate
These operators implement Flink two-phase aggregation strategy for distributed stream processing. They work together to compute aggregations efficiently across multiple parallel instances while maintaining exactly-once processing semantics.
The LocalGroupAggregate performs initial aggregation within each parallel task, maintaining partial results in its state. The GlobalGroupAggregate then combines these partial results to produce final aggregations. This two-phase approach appears in both regular GROUP BY operations and windowed aggregations.
For window operations, these operators appear as StreamPhysicalLocalWindowAggregate and StreamPhysicalGlobalWindowAggregate. Here’s an example that triggers their use:
SELECT window_start, window_end, SUM(price) as total_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
Join operations¶
- StreamPhysicalJoin
Performs standard stream-to-stream joins.
SELECT o.*, c.name FROM orders o JOIN customers c ON o.customer_id = c.id;
- StreamPhysicalTemporalJoin
Joins streams using temporal (time-versioned) semantics.
SELECT orders.*, customers.* FROM orders LEFT JOIN customers FOR SYSTEM_TIME AS OF orders.`$rowtime` ON orders.customer_id = customers.customer_id;
- StreamPhysicalIntervalJoin
Joins streams within a time interval.
SELECT * FROM orders o, clicks c WHERE o.customer_id = c.user_id AND o.`$rowtime` BETWEEN c.`$rowtime` - INTERVAL '1' MINUTE AND c.`$rowtime`;
- StreamPhysicalWindowJoin
Joins streams within defined windows.
SELECT * FROM ( SELECT * FROM TABLE(TUMBLE(TABLE clicks, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES)) ) c JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES)) ) o ON c.user_id = o.customer_id AND c.window_start = o.window_start AND c.window_end = o.window_end;
Ordering and ranking¶
- StreamPhysicalRank
Computes the smallest or largest values (Top-N queries).
SELECT product_id, price FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY price DESC) AS row_num FROM orders) WHERE row_num <= 5;
- StreamPhysicalLimit
Limits the number of returned rows.
SELECT * FROM orders LIMIT 10;
- StreamPhysicalSortLimit
Combines sorting with row limiting.
SELECT * FROM orders ORDER BY $rowtime LIMIT 10;
- StreamPhysicalWindowRank
Computes the smallest or largest values within window boundaries (Window Top-N queries).
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum FROM ( SELECT window_start, window_end, customer_id, SUM(price) as price, COUNT(*) as cnt FROM TABLE( TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, customer_id ) ) WHERE rownum <= 3;
Data movement and distribution¶
- StreamPhysicalExchange
Redistributes/exchanges data between parallel instances. For example, when you write a query with a GROUP BY clause, Flink might use a HASH exchange to ensure all records with the same key are processed by the same task:
-- Appears in plans with GROUP BY on a different key than the source distribution SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id;
- StreamPhysicalUnion
Combines results from multiple queries.
SELECT * FROM european_orders UNION ALL SELECT * FROM american_orders;
- StreamPhysicalExpand
Generates multiple rows from a single row for CUBE, ROLLUP, and GROUPING SETS.
SELECT department, brand, COUNT(*) as product_count, COUNT(DISTINCT vendor) as vendor_count FROM products GROUP BY CUBE(department, brand) HAVING COUNT(*) > 1;
Specialized operations¶
- StreamPhysicalChangelogNormalize
Converts upsert-based changelog streams (based on primary key) into retract-based streams (with explicit +/- records) to support correct aggregation results in streaming queries.
-- Appears when processing versioned data, like a table that uses upsert semantics SELECT COUNT(*) as cnt FROM products;
- StreamPhysicalWindowTableFunction
Applies windowing operations as table functions.
SELECT * FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '1' HOUR) );
- StreamPhysicalCorrelate
Handles correlated subqueries and table function calls.
EXPLAIN SELECT product_id, product_name, tag FROM ( VALUES (1, 'Laptop', ARRAY['electronics', 'computers']), (2, 'Phone', ARRAY['electronics', 'mobile']) ) AS products (product_id, product_name, tags) CROSS JOIN UNNEST(tags) AS t (tag);
- StreamPhysicalMatch
Executes pattern-matching operations using MATCH_RECOGNIZE.
SELECT * FROM orders MATCH_RECOGNIZE ( PARTITION BY customer_id ORDER BY $rowtime MEASURES COUNT(*) as order_count PATTERN (A B+) DEFINE A as price > 100, B as price <= 100 );
Optimizing query performance¶
Minimizing data movement¶
Data shuffling impacts performance. When examining EXPLAIN output:
- Look for exchange operators and upsert key changes.
- Consider keeping compatible partitioning keys through your query.
- Watch for opportunities to reduce data redistribution.
Pay special attention to data skew when designing your queries. If a particular key value appears much more frequently than others, it can lead to uneven processing where a single parallel instance becomes overwhelmed handling that key’s data. Consider strategies like adding additional dimensions to your keys or pre-aggregating hot keys to distribute the workload more evenly.
Using operator reuse¶
Flink automatically reuses operators when possible. In EXPLAIN output:
- Look for “(reused)” references showing optimization.
- Consider restructuring queries to enable more reuse.
- Verify that similar operations share scan results.
Optimizing sink configuration¶
When working with sinks in upsert mode, it’s crucial to align your primary and upsert keys for optimal performance:
- Whenever possible, configure the primary key to be identical to the upsert key.
- Having different primary and upsert keys in upsert mode can lead to significant performance degradation.
- If you must use different keys, carefully evaluate the performance impact and consider restructuring your query to align these keys.