SHOW Statements in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables listing available Flink databases, catalogs, and tables, which map to Confluent Cloud environments, Apache Kafka® clusters, and topics.
Confluent Cloud for Apache Flink supports these SHOW statements.
SHOW CATALOGS | SHOW CREATE MODEL |
SHOW CREATE TABLE | SHOW CURRENT CATALOG |
SHOW CURRENT DATABASE | SHOW DATABASES |
SHOW FUNCTIONS | SHOW JOBS |
SHOW MODELS | SHOW TABLES |
SHOW CATALOGS¶
- Syntax
SHOW CATALOGS;
- Description
- Show all catalogs. Confluent Cloud for Apache Flink maps Flink catalogs to environments.
- Example
SHOW CATALOGS;
Your output should resemble:
+-------------------------+------------+ | catalog name | catalog id | +-------------------------+------------+ | my_environment | env-12abcz | | example-streams-env | env-23xjoo | | quickstart-env | env-9wg8ny | | default | env-t12345 | +-------------------------+------------+
Run the USE CATALOG statement to set the current Flink catalog (Confluent Cloud environment).
USE CATALOG my_environment;
Your output should resemble:
+---------------------+----------------+ | Key | Value | +---------------------+----------------+ | sql.current-catalog | my_environment | +---------------------+----------------+
SHOW CURRENT CATALOG¶
- Syntax
SHOW CURRENT CATALOG;
- Description
- Show the current catalog.
- Example
SHOW CURRENT CATALOG;
Your output should resemble:
+----------------------+ | current catalog name | +----------------------+ | my_environment | +----------------------+
SHOW DATABASES¶
- Syntax
SHOW DATABASES;
- Description
- Show all databases in the current catalog. Confluent Cloud for Apache Flink maps Flink databases to Kafka clusters.
- Example
SHOW DATABASES;
Your output should resemble:
+---------------+-------------+ | database name | database id | +---------------+-------------+ | cluster_0 | lkc-r289m7 | +---------------+-------------+
Run the USE statement to set the current database (Kafka cluster).
USE cluster_0;
Your output should resemble:
+----------------------+-----------+ | Key | Value | +----------------------+-----------+ | sql.current-database | cluster_0 | +----------------------+-----------+
SHOW CURRENT DATABASE¶
- Syntax
SHOW CURRENT DATABASE;
- Description
- Show the current database. Confluent Cloud for Apache Flink maps Flink databases to Kafka clusters.
- Example
SHOW CURRENT DATABASE;
Your output should resemble:
+-----------------------+ | current database name | +-----------------------+ | cluster_0 | +-----------------------+
SHOW TABLES¶
- Syntax
SHOW TABLES [ [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
- Description
Show all tables for the current database. You can filter the output of SHOW TABLES by using the LIKE clause with an optional matching pattern.
The optional LIKE clause shows all tables with names that match
<sql_like_pattern>
.The syntax of the SQL pattern in a
LIKE
clause is the same as in theMySQL
dialect.%
matches any number of characters, including zero characters. Use the backslash character to escape the%
character:\%
matches one%
character._
matches exactly one character. Use the backslash character to escape the_
character:\_
matches one_
character.
- Example
Create two tables in the current catalog:
flights
andorders
.-- Create a flights table. CREATE TABLE flights ( flight_id STRING, origin STRING, destination STRING );
-- Create an orders table. CREATE TABLE orders ( user_id BIGINT NOT NULL, product_id STRING, amount INT );
Show all tables in the current database that are similar to the specified SQL pattern.
SHOW TABLES LIKE 'f%';
Your output should resemble:
+------------+ | table name | +------------+ | flights | +------------+
Show all tables in the given database that are not similar to the specified SQL pattern.
SHOW TABLES NOT LIKE 'f%';
Your output should resemble:
+------------+ | table name | +------------+ | orders | +------------+
Show all tables in the current database.
SHOW TABLES;
+------------+ | table name | +------------+ | flights | | orders | +------------+
SHOW CREATE TABLE¶
- Syntax
SHOW CREATE TABLE [catalog_name.][db_name.]table_name;
- Description
- Show details about the specified table.
- Example
SHOW CREATE TABLE flights;
Your output should resemble:
+-----------------------------------------------------------+ | SHOW CREATE TABLE | +-----------------------------------------------------------+ | CREATE TABLE `my_environment`.`cluster_0`.`flights` ( | | `flight_id` VARCHAR(2147483647), | | `origin` VARCHAR(2147483647), | | `destination` VARCHAR(2147483647) | | ) WITH ( | | 'changelog.mode' = 'append', | | 'connector' = 'confluent', | | 'kafka.cleanup-policy' = 'delete', | | 'kafka.max-message-size' = '2097164 bytes', | | 'kafka.partitions' = '6', | | 'kafka.retention.size' = '0 bytes', | | 'kafka.retention.time' = '604800000 ms', | | 'scan.bounded.mode' = 'unbounded', | | 'scan.startup.mode' = 'earliest-offset', | | 'value.format' = 'avro-registry' | | ) | | | +-----------------------------------------------------------+
Inferred Tables¶
Inferred tables are tables that have not been created with CREATE TABLE but are detected automatically by using information about existing topics and Schema Registry entries.
The following examples show SHOW CREATE TABLE called on the resulting table.
No key and no value in Schema Registry¶
SHOW CREATE TABLE returns:
CREATE TABLE `t_raw` (
`key` VARBINARY(2147483647),
`val` VARBINARY(2147483647)
) DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.format' = 'raw',
'value.format' = 'raw'
...
);
- Properties
Key and value formats are raw (binary format) with BYTES
Following Kafka message semantics, both key and value support NULL as well, so the following statement is supported:
INSERT INTO t_raw (key, val) SELECT CAST(NULL AS BYTES), CAST(NULL AS BYTES);
No key and but record value in Schema Registry¶
Given the following value in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "i",
"type": "int"
},
{
"name": "s",
"type": "string"
}
]
}
SHOW CREATE TABLE returns:
CREATE TABLE `t_raw_key` (
`key` VARBINARY(2147483647),
`i` INT NOT NULL,
`s` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.format' = 'raw',
'value.format' = 'avro-registry'
...
)
- Properties
Key format is raw (binary format) with BYTES
Following Kafka message semantics, key supports NULL as well. So this is possible: so the following statement is supported:
INSERT INTO t_raw_key SELECT CAST(NULL AS BYTES), 12, 'Bob';
Atomic key and record value in Schema Registry¶
Given the following key and value in Schema Registry:
"int"
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "i",
"type": "int"
},
{
"name": "s",
"type": "string"
}
]
}
SHOW CREATE TABLE returns:
CREATE TABLE `t_atomic_key` (
`key` INT NOT NULL,
`i` INT NOT NULL,
`s` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.format' = 'avro-registry',
'value.format' = 'avro-registry'
...
)
- Properties
- Schema Registry defines column data type INT NOT NULL.
- The column name
key
is used as a default, because Schema Registry doesn’t provide a column name.
Overlapping names in key/value, no key in Schema Registry¶
Given the following value in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "i",
"type": "int"
},
{
"name": "s",
"type": "string"
}
]
}
SHOW CREATE TABLE returns:
CREATE TABLE `t_raw_disjoint` (
`key_key` VARBINARY(2147483647),
`i` INT NOT NULL,
`key` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key_key`) INTO 1 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.fields-prefix' = 'key_',
'key.format' = 'raw',
'value.format' = 'avro-registry'
...
)
- Properties
- Schema Registry value defines columns INT NOT NULL and
key STRING
- The column name
key BYTES
is used as a default if no key is in Schema Registry - Because
key
would collide with value column,key_
prefix is added
- Schema Registry value defines columns INT NOT NULL and
Record key and record value in Schema Registry¶
Given the following key and value in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
}
]
}
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "zip_code",
"type": "string"
}
]
}
SHOW CREATE TABLE returns:
CREATE TABLE `t_sr_disjoint` (
`uid` INT NOT NULL,
`name` VARCHAR(2147483647) NOT NULL,
`zip_code` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`uid`) INTO 1 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'value.format' = 'avro-registry'
...
)
- Properties
- Schema Registry defines columns for both key and value.
- The column names of key and value are disjoint sets and don’t overlap.
Record key and record value with overlap in Schema Registry¶
Given the following key and value in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
}
]
}
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
},{
"name": "name",
"type": "string"
},
{
"name": "zip_code",
"type": "string"
}
]
}
SHOW CREATE TABLE returns:
CREATE TABLE `t_sr_joint` (
`uid` INT NOT NULL,
`name` VARCHAR(2147483647) NOT NULL,
`zip_code` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`uid`) INTO 1 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'value.fields-include' = 'all',
'value.format' = 'avro-registry'
...
)
- Properties
- Schema Registry defines columns for both key and value.
- The column names of key and value overlap on
uid
. 'value.fields-include' = 'all'
is set to exclude the key because it is fully contained in the value.
Inferred tables schema evolution¶
Schema Registry columns overlap with computed/metadata columns¶
Given the following value in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
}
]
}
Evolve the table by adding metadata:
ALTER TABLE t_metadata_overlap ADD `timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA;
Evolve the table by adding an optional schema column:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
},
{
"name": "timestamp",
"type": ["null", "string"],
"default": null
}
]
}
SHOW CREATE TABLE shows:
CREATE TABLE t_metadata_overlap` (
`key` VARBINARY(2147483647),
`uid` INT NOT NULL,
`timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
...
)
- Properties
Schema Registry says there is a timestamp physical column, but Flink says there is timestamp metadata column.
In this case, metadata columns and computed columns have precedence, so Flink removes the physical column from the schema.
Given that Flink advertises FULL_TRANSITIVE mode, queries still work, and the physical column is set to NULL in the payload:
INSERT INTO t_metadata_overlap SELECT CAST(NULL AS BYTES), 42, TO_TIMESTAMP_LTZ(0, 3); SELECT * FROM t_metadata_overlap;
Evolve the table by renaming metadata:
ALTER TABLE t_metadata_overlap DROP `timestamp`;
ALTER TABLE t_metadata_overlap
ADD message_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp';
SHOW CREATE TABLE shows:
CREATE TABLE `t_metadata_overlap` (
`key` VARBINARY(2147483647),
`uid` INT NOT NULL,
`timestamp` VARCHAR(2147483647),
`message_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
...
)
- Properties
- Now, both physical and metadata column show up and can be accessed both for reading and writing.
SHOW JOBS¶
- Syntax
SHOW JOBS;
- Description
- Show the status of all statements in the current catalog/environment.
- Example
SHOW JOBS;
Your output should resemble:
+----------------------------------+-----------+------------------+--------------+------------------+------------------+ | Name | Phase | Statement | Compute Pool | Creation Time | Detail | +----------------------------------+-----------+------------------+--------------+------------------+------------------+ | 0fb72c57-8e3d-4614 | COMPLETED | CREATE TABLE ... | lfcp-8m03rm | 2024-01-23 13... | Table 'flight... | | 8567b0eb-fabd-4cb8 | COMPLETED | CREATE TABLE ... | lfcp-8m03rm | 2024-01-23 13... | Table 'orders... | | 4cd171ca-77db-48ce | COMPLETED | SHOW TABLES L... | lfcp-8m03rm | 2024-01-23 13... | | | 291eb50b-965c-4a53 | COMPLETED | SHOW TABLES N... | lfcp-8m03rm | 2024-01-23 13... | | | 7a30e70a-36af-41f4 | COMPLETED | SHOW TABLES; | lfcp-8m03rm | 2024-01-23 13... | | +----------------------------------+-----------+------------------+--------------+------------------+------------------+
SHOW FUNCTIONS¶
- Syntax
SHOW [USER] FUNCTIONS;
- Description
Show all functions including system functions and user-defined functions in the current catalog and current database. Both system and catalog functions are returned.
The
USER
option shows only user-defined functions in the current catalog and current database.Functions of internal modules are shown if your Organization is in the allow-list, for example, OLTP functions.
For convenience, SHOW FUNCITONS also shows functions with special syntax or keywords that don’t follow a traditional functional-style syntax, like
FUNC(arg0)
. For example,||
(string concatenation) orIS BETWEEN
.- Example
SHOW FUNCTIONS;
Your output should resemble:
+------------------------+ | function name | +------------------------+ | % | | * | | + | | - | | / | | < | | <= | | <> | | = | | > | | >= | | ABS | | ACOS | | AND | | ARRAY | | ARRAY_CONTAINS | | ASCII | | ASIN | | ATAN | | ATAN2 | | AVG | ...
SHOW MODELS¶
- Syntax
SHOW MODELS [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ];
- Description
Show all AI models that are registered in the current Flink environment.
To register an AI model, run the CREATE MODEL statement.
- Example
SHOW MODELS;
Your output should resemble:
+----------------+
| Model Name |
+----------------+
| demo_model |
+----------------+
SHOW CREATE MODEL¶
- Syntax
SHOW CREATE MODEL <model-name>;
- Description
Show details about the specified AI inference model.
This command is useful for understanding the configuration and options that were set when the model was created with the CREATE MODEL statement.
- Example
For an example AWS Bedrock model named “bedrock_embed”, the following statement might display the shown output.
SHOW CREATE MODEL bedrock_embed; -- Example SHOW CREATE MODEL output: CREATE MODEL `model-testing`.`virtual_topic_GCP`.`bedrock_embed` INPUT (`text` VARCHAR(2147483647)) OUTPUT (`response` ARRAY<FLOAT>) WITH ( 'BEDROCK.CONNECTION' = 'bedrock-connection-hao', 'BEDROCK.INPUT_FORMAT' = 'AMAZON-TITAN-EMBED', 'PROVIDER' = 'bedrock', 'TASK' = 'text_generation' );