Flink SQL REST API for Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® provides a REST API for managing your Flink SQL statements and compute pools programmatically.
Use the REST API to manage these features:
For the complete Flink REST API reference, see:
In addition to the REST API, you can manage Flink statements and compute pools by using these Confluent tools:
Prerequisites¶
To manage Flink SQL statements by using the REST API, you must generate an API key that’s specific to the Flink environment. Also, you need Confluent Cloud account details, like your organization and environment identifiers.
- Flink API Key: Follow the steps in Generate a Flink API key.
- Organization ID: The identifier your organization, for example, “b0b421724-4586-4a07-b787-d0bb5aacbf87”.
- Environment ID: The identifier of the environment where your Flink SQL statements run, for example, “env-z3y2x1”.
- Cloud provider name: The name of the cloud provider where your cluster
runs, for example, “AWS”. To see the available providers, run the
confluent flink region list
command. - Cloud region: The name of the region where your cluster runs, for
example, “us-east-1”. To see the available regions, run the
confluent flink region list
command.
Depending on the request, you may need these details:
- Cloud API key: Some requests require a Confluent Cloud API key and secret, which are distinct from a Flink API key and secret. Follow the instructions here to create a new API key for Confluent Cloud, and on the https://confluent.cloud/settings/api-keys page, select the Cloud resource management tile for the API key’s resource scope.
- Principal ID: The identifier of your user account or a service account, for example, “u-aq1dr2” for a user account or “sa-23kgz4” for a service account.
- Compute pool ID: The identifier of the compute pool that runs your Flink SQL statements, for example, “lfcp-8m03rm”.
- Statement name: A unique name for a Flink SQL statement.
- SQL code: The code for a Flink SQL statement.
Rate limits¶
Requests to the Flink REST API are rate-limited per IP address.
- Concurrent connections: 100
- Requests per minute: 1000
- Requests per second: 50
Private networking endpoints¶
If you have enabled Flink private networking, the REST endpoints are different.
<!-- Without private network -->
https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/
<!-- With private network -->
https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.private.confluent.cloud/
For example, if you send a request to the us-east-1
AWS region without
a private network, the host is:
<!-- Without private network -->
https://flink.us-east-1.aws.confluent.cloud
With a private network, the host is:
<!-- Without private network -->
https://flink.us-east-1.aws.private.confluent.cloud
Generate a Flink API key¶
To access the REST API, you need an API key specifically for Flink. This key is distinct from the Confluent Cloud API key.
Before you create an API key for Flink access, decide whether you want to create long-running Flink SQL statements. If you need long-running Flink SQL statements, Confluent recommends using a service account and creating an API key for it. If you want to run only interactive queries or statements for a short time while developing queries, you can create an API key for your user account.
- Follow the steps in Generate an API Key for Access.
Run the following commands to save your API key and secret in environment variables.
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
The REST API uses basic authentication, which means that you provide a base64-encoded string made from your Flink API key and secret in the request header.
You can use the base64
command to encode the “key:secret” string. Be sure
to use the -n
option of the echo
command to prevent newlines from being
embedded in the encoded string. If you’re on Linux, be sure to use the -w 0
option of the base64
command, to prevent the string from being line-wrapped.
For convenience, save the encoded string in an environment variable:
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
Manage statements¶
Using requests to the Flink REST API, you can perform these actions:
- Submit a statement
- Get a statement
- List statements
- Update metadata for a statement
- Delete a statement
Flink SQL statement schema¶
A statement has the following schema:
api_version: "sql/v1"
kind: "Statement"
organization_id: "" # Identifier of your Confluent Cloud organization
environment_id: "" # ID of your Confluent Cloud environment
name: "" # Primary identifier of the statement, must be unique within the environment, 100 max length, [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*
metadata:
created_at: "" # Creation timestamp of this resource
updated_at: "" # Last updated timestamp of this resource
resource_version: "" # Generated by the system and updated whenever the statement is updated (including by the system). Opaque and should not be parsed.
self: "" # An absolute URL to this resource
uid: "" # uid is unique in time and space (i.e., even if the name is re-used)
spec:
compute_pool_id: "" # The ID of the compute pool the statement should run in. DNS Subdomain (RFC 1123) – 255 max len, [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*
principal: "" # user or service account ID
properties: map[string]string # Optional. request/client properties
statement: "SELECT * from Orders;" # The raw SQL text
stopped: false # Boolean, specifying if the statement should be stopped
status:
phase: PENDING | RUNNING | COMPLETED | DELETING | FAILING | FAILED
detail: "" # Optional. Human-readable description of phase.
result_schema: "" # Optional. JSON object in TableSchema format; describes the data returned by the results serving API.
The statement name has a maximum length of 100 characters and must satisfy the following regular expression:
[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*
The underscore character (_
) is not supported.
Submit a statement¶
You can submit a Flink SQL statement by sending a POST request to the Statements endpoint.
Submitting a Flink SQL statement requires the following inputs:
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
export STATEMENT_NAME="<statement-name>" # example: "user-filter"
export ORG_ID="<organization-id>" # example: "b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export PRINCIPAL_ID="<principal-id>" # (optional) example: "sa-23kgz4" for a service account, or "u-aq1dr2" for a user account
export SQL_CODE="<sql-statement-text>" # example: "SELECT * FROM USERS;"
export JSON_DATA="<payload-string>"
The PRINCIPAL_ID parameter is optional. Confluent Cloud infers the principal from the provided Flink API key.
The following JSON shows an example payload:
{
"name": "${STATEMENT_NAME}",
"organization_id": "${ORG_ID}",
"environment_id": "${ENV_ID}",
"spec": {
"statement": "${SQL_CODE}",
"properties": {
"key1": "value1",
"key2": "value2"
},
"compute_pool_id": "${COMPUTE_POOL_ID}",
"principal": "${PRINCIPAL_ID}",
"stopped": false
}
}
Quotation mark characters in the JSON string must be escaped, so the payload string to send resembles the following:
export JSON_DATA="{
\"name\": \"${STATEMENT_NAME}\",
\"organization_id\": \"${ORG_ID}\",
\"environment_id\": \"${ENV_ID}\",
\"spec\": {
\"statement\": \"${SQL_CODE}\",
\"properties\": {
\"key1\": \"value1\",
\"key2\": \"value2\"
},
\"compute_pool_id\": \"${COMPUTE_POOL_ID}\",
\"principal\": \"${PRINCIPAL_ID}\",
\"stopped\": false
}
}"
The following command sends a POST request that submits a Flink SQL statement.
curl --request POST \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}" \
--header 'content-type: application/json' \
--data "${JSON_DATA}"
Your output should resemble:
Response from a request to submit a SQL statement
{
"api_version": "sql/v1",
"environment_id": "env-z3y2x1",
"kind": "Statement",
"metadata": {
"created_at": "2023-12-16T17:12:08.914198Z",
"resource_version": "1",
"self": "https://flink.us-east-1.aws.confluent.cloud/sql/v1/organizations/b0b21724-4586-4a07-b787-d0bb5aacbf87/environments/env-z3y2x1/statements/demo-statement-1",
"uid": "0005dd7b-8a7e-4274-b97e-c21b134d98f0",
"updated_at": "2023-12-16T17:12:08.914198Z"
},
"name": "demo-statement-1",
"organization_id": "b0b21724-4586-4a07-b787-d0bb5aacbf87",
"spec": {
"compute_pool_id": "lfcp-8m03rm",
"principal": "u-aq1dr2",
"properties": null,
"statement": "select 1;",
"stopped": false
},
"status": {
"detail": "",
"phase": "PENDING"
}
}
Get a statement¶
Get the details about a Flink SQL statement by sending a GET request to the Statements endpoint.
Getting a Flink SQL statement requires the following inputs:
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
export STATEMENT_NAME="<statement-name>" # example: "user-filter"
export ORG_ID="<organization-id>" # example: "b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
The following command gets a Flink SQL statement’s details by its name.
Attempting to get a deleted statement returns 404
.
curl --request GET \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements/${STATEMENT_NAME}" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}"
Your output should resemble:
Response from a request to get a SQL statement
{
"api_version": "sql/v1",
"environment_id": "env-z3y2x1",
"kind": "Statement",
"metadata": {
"created_at": "2023-12-16T16:08:36.650591Z",
"resource_version": "13",
"self": "https://flink.us-east-1.aws.confluent.cloud/sql/v1/organizations/b0b21724-4586-4a07-b787-d0bb5aacbf87/environments/env-z3y2x1/statements/demo-statement-1",
"uid": "5387a4a4-02dd-4375-8db1-80bdd82ede96",
"updated_at": "2023-12-16T16:10:05.353298Z"
},
"name": "demo-statement-1",
"organization_id": "b0b21724-4586-4a07-b787-d0bb5aacbf87",
"spec": {
"compute_pool_id": "lfcp-8m03rm",
"principal": "u-aq1dr2",
"properties": null,
"statement": "select 1;",
"stopped": false
},
"status": {
"detail": "",
"phase": "COMPLETED",
"result_schema": {
"columns": [
{
"name": "EXPR$0",
"type": {
"nullable": false,
"type": "INTEGER"
}
}
]
}
}
}
Tip
Pipe the result through jq
to extract the code for the Flink SQL
statement:
curl --request GET \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements/${STATEMENT_NAME}" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}" \
| jq -r '.spec.statement'
Your output should resemble:
select 1;
List statements¶
List the statements in an environment by sending a GET request to the Statements endpoint.
- Request Query Parameters
spec.compute_pool_id
(optional): Fetch only the statements under this compute pool ID.page_token
(optional): Retrieve a page based on a previously received token (via themetadata.next
field ofStatementList
).page_size
(optional): Maximum number of items to return in a page.
Listing all Flink SQL statements requires the following inputs:
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
export ORG_ID="<organization-id>" # example: "b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="environment-id" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
The following command returns details for all non-deleted Flink SQL statements under the scope of the environment (one or more compute pools) where you have permission to do a GET request.
curl --request GET \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}"
Your output should resemble:
Response from a request to list the statements in an environment
{
"api_version": "sql/v1",
"data": [
{
"api_version": "sql/v1",
"environment_id": "env-z3y2x1",
"kind": "Statement",
"metadata": {
"created_at": "2023-12-16T16:08:36.650591Z",
"resource_version": "13",
"self": "https://flink.us-east-1.aws.confluent.cloud/sql/v1/organizations/b0b21724-4586-4a07-b787-d0bb5aacbf87/environments/env-z3y2x1/statements/demo-statement-1",
"uid": "5387a4a4-02dd-4375-8db1-80bdd82ede96",
"updated_at": "2023-12-16T16:10:05.353298Z"
},
"name": "demo-statement-1",
"organization_id": "b0b21724-4586-4a07-b787-d0bb5aacbf87",
"spec": {
"compute_pool_id": "lfcp-8m03rm",
"principal": "u-aq1dr2",
"properties": null,
"statement": "select 1;",
"stopped": false
},
"status": {
"detail": "",
"phase": "COMPLETED",
"result_schema": {
"columns": [
{
"name": "EXPR$0",
"type": {
"nullable": false,
"type": "INTEGER"
}
}
]
}
}
}
Update metadata for a statement¶
Update the metadata for a statement by sending a PUT request to the Statements endpoint.
- The statement’s code is immutable.
- You must specify a resource version in the payload metadata.
You can stop and resume a statement by setting stopped
in the spec
to
true
to stop the statement and false
to resume the statement.
Updating metadata for an existing Flink SQL statement requires the following inputs:
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
export STATEMENT_NAME="<statement-name>" # example: "user-filter"
export ORG_ID="<organization-id>" # example: "b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export PRINCIPAL_ID="<principal-id>" # (optional) example: "sa-23kgz4" for a service account, or "u-aq1dr2" for a user account
export SQL_CODE="<sql-statement-text>" # example: "SELECT * FROM USERS;"
export RESOURCE_VERSION="<version>" # example: "a3e", must be fetched from the latest version of the statement
export JSON_DATA="<payload-string>"
The PRINCIPAL_ID parameter is optional. Confluent Cloud infers the principal from the provided Flink API key.
The following JSON shows an example payload:
{
"name": "${STATEMENT_NAME}",
"organization_id": "${ORG_ID}",
"environment_id": "${ENV_ID}",
"spec": {
"statement": "${SQL_CODE}",
"properties": {
"key1": "value1",
"key2": "value2"
},
"compute_pool_id": "${COMPUTE_POOL_ID}",
"principal": "${PRINCIPAL_ID}",
"stopped": false
},
"metadata": {
"resource_version": "${RESOURCE_VERSION}"
}
}
Quotation mark characters in the JSON string must be escaped, so the payload string to send resembles the following:
export JSON_DATA="{
\"name\": \"${STATEMENT_NAME}\",
\"organization_id\": \"${ORG_ID}\",
\"environment_id\": \"${ENV_ID}\",
\"spec\": {
\"statement\": \"${SQL_CODE}\",
\"properties\": {
\"key1\": \"value1\",
\"key2\": \"value2\"
},
\"compute_pool_id\": \"${COMPUTE_POOL_ID}\",
\"principal\": \"${PRINCIPAL_ID}\",
\"stopped\": false
},
\"metadata\": {
\"resource_version\": \"${RESOURCE_VERSION}\"
}
}"
The following command sends a PUT request that updates metadata for an existing Flink SQL statement.
curl --request PUT \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements/${STATEMENT_NAME}" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}" \
--header 'content-type: application/json' \
--data "${JSON_DATA}"
Resource version is required in the PUT request and changes every time the statement is updated, by the system or by the user. It’s not possible to calculate the resource version ahead of time, so if the statement has changed since it was fetched, you must submit a GET request, reapply the modifications, and try the update again.
This means you must loop and retry on 409 errors. The following pseudo code shows the loop.
while true:
statement = getStatement()
# make modifications to the current statement
statement.spec.stopped = True
# send the update
response = updateStatement(statement)
# if a conflict, retry
if response.code == 409:
continue
elif response.code == 200:
return "success"
else:
return response.error()
Delete a statement¶
Delete a statement from the compute pool by sending a DELETE request to the Statements endpoint.
- Once a statement deleted, it can’t be undone.
- State is cleaned up by Confluent Cloud.
- When deletion is complete, the statement is no longer accessible.
Deleting a statement requires the following inputs:
export FLINK_API_KEY="<flink-api-key>"
export FLINK_API_SECRET="<flink-api-secret>"
export BASE64_FLINK_KEY_AND_SECRET=$(echo -n "${FLINK_API_KEY}:${FLINK_API_SECRET}" | base64 -w 0)
export STATEMENT_NAME="<statement-name>" # example: "user-filter"
export ORG_ID="<organization-id>" # example: "b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
The following command deletes a statement in the specified organization and environment.
curl --request DELETE \
--url "https://flink.${CLOUD_REGION}.${CLOUD_PROVIDER}.confluent.cloud/sql/v1/organizations/${ORG_ID}/environments/${ENV_ID}/statements/${STATEMENT_NAME}" \
--header "Authorization: Basic ${BASE64_FLINK_KEY_AND_SECRET}"
Manage compute pools¶
Using requests to the Flink REST API, you can perform these actions:
- Create a Flink compute pool
- Read a Flink compute pool
- Update a Flink compute pool
- Delete a Flink compute pool
You must be authorized to create, update, delete (FlinkAdmin
) or use
(FlinkDeveloper
) a compute pool. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
List Flink compute pools¶
List the compute pools in your environment by sending a GET request to the Compute Pools endpoint.
- This request uses your Cloud API key instead of the Flink API key.
Listing the compute pools in your environment requires the following inputs:
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
Run the following command to list the compute pools in your environment.
curl --request GET \
--url "https://confluent.cloud/api/fcpm/v2/compute-pools?environment=${ENV_ID}&page_size=100" \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}" \
| jq -r '.data[] | .spec.display_name, {id}'
Your output should resemble:
compute_pool_0
{
"id": "lfcp-j123kl"
}
compute_pool_2
{
"id": "lfcp-abc1de"
}
my-lfcp-01
{
"id": "lfcp-l2mn3o"
}
...
Find your compute pool in the list and save its ID in an environment variable.
export COMPUTE_POOL_ID="<your-compute-pool-id>"
Create a Flink compute pool¶
Create a compute pool in your environment by sending a POST request to the Compute Pools endpoint.
- This request uses your Cloud API key instead of the Flink API key.
Creating a compute pool requires the following inputs:
export COMPUTE_POOL_NAME="<compute-pool-name>" # human readable name, for example: "my-compute-pool"
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export CLOUD_PROVIDER="<cloud-provider>" # example: "aws"
export CLOUD_REGION="<cloud-region>" # example: "us-east-1"
export MAX_CFU="<max-cfu>" # example: 5
export JSON_DATA="<payload-string>"
The following JSON shows an example payload. The network
key is optional.
{
"spec": {
"display_name": "${COMPUTE_POOL_NAME}",
"cloud": "${CLOUD_PROVIDER}",
"region": "${CLOUD_REGION}",
"max_cfu": ${MAX_CFU},
"environment": {
"id": "${ENV_ID}"
},
"network": {
"id": "n-00000",
"environment": "string"
}
}
}
Quotation mark characters in the JSON string must be escaped, so the payload string to send resembles the following:
export JSON_DATA="{
\"spec\": {
\"display_name\": \"${COMPUTE_POOL_NAME}\",
\"cloud\": \"${CLOUD_PROVIDER}\",
\"region\": \"${CLOUD_REGION}\",
\"max_cfu\": ${MAX_CFU},
\"environment\": {
\"id\": \"${ENV_ID}\"
}
}
}"
The following command sends a POST request to create a compute pool.
curl --request POST \
--url https://api.confluent.cloud/fcpm/v2/compute-pools \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}" \
--header 'content-type: application/json' \
--data "${JSON_DATA}"
Your output should resemble:
Response from a request to create a compute pool
{
"api_version": "fcpm/v2",
"id": "lfcp-6g7h8i",
"kind": "ComputePool",
"metadata": {
"created_at": "2024-02-27T22:44:27.18964Z",
"resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-b787-d0bb5aacbf87/environment=env-z3y2x1/flink-region=aws.us-east-1/compute-pool=lfcp-6g7h8i",
"self": "https://api.confluent.cloud/fcpm/v2/compute-pools/lfcp-6g7h8i",
"updated_at": "2024-02-27T22:44:27.18964Z"
},
"spec": {
"cloud": "AWS",
"display_name": "my-compute-pool",
"environment": {
"id": "env-z3y2x1",
"related": "https://api.confluent.cloud/fcpm/v2/compute-pools/lfcp-6g7h8i",
"resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-b787-d0bb5aacbf87/environment=env-z3y2x1"
},
"http_endpoint": "https://flink.us-east-1.aws.confluent.cloud/sql/v1/organizations/b0b21724-4586-4a07-b787-d0bb5aacbf87/environments/env-z3y2x1",
"max_cfu": 5,
"region": "us-east-1"
},
"status": {
"current_cfu": 0,
"phase": "PROVISIONING"
}
}
Read a Flink compute pool¶
Get the details about a compute pool in your environment by sending a GET request to the Compute Pools endpoint.
- This request uses your Cloud API key instead of the Flink API key.
Getting details about a compute pool requires the following inputs:
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
Run the following command to get details about the compute pool specified in the COMPUTE_POOL_ID environment variable.
curl --request GET \
--url "https://api.confluent.cloud/fcpm/v2/compute-pools/${COMPUTE_POOL_ID}?environment=${ENV_ID}" \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}"
Your output should resemble:
Response from a request to read a compute pool
{
"api_version": "fcpm/v2",
"id": "lfcp-6g7h8i",
"kind": "ComputePool",
"metadata": {
"created_at": "2024-02-27T22:44:27.18964Z",
"resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-b787-d0bb5aacbf87/environment=env-z3y2x1/flink-region=aws.us-east-1/compute-pool=lfcp-6g7h8i",
"self": "https://api.confluent.cloud/fcpm/v2/compute-pools/lfcp-6g7h8i",
"updated_at": "2024-02-27T22:44:27.18964Z"
},
"spec": {
"cloud": "AWS",
"display_name": "my-compute-pool",
"environment": {
"id": "env-z3y2x1",
"related": "https://api.confluent.cloud/fcpm/v2/compute-pools/lfcp-6g7h8i",
"resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-b787-d0bb5aacbf87/environment=env-z3y2x1"
},
"http_endpoint": "https://flink.us-east-1.aws.confluent.cloud/sql/v1/organizations/b0b21724-4586-4a07-b787-d0bb5aacbf87/environments/env-z3y2x1",
"max_cfu": 5,
"region": "us-east-1"
},
"status": {
"current_cfu": 0,
"phase": "PROVISIONED"
}
}
Update a Flink compute pool¶
Update a compute pool in your environment by sending a PATCH request to the Compute Pools endpoint.
- You can update the name of the compute pool, its environment, and the MAX_CFUs setting.
- This request uses your Cloud API key instead of the Flink API key.
Updating a compute pool requires the following inputs:
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
export MAX_CFU="<max-cfu>" # example: 5
export JSON_DATA="<payload-string>"
The following JSON shows an example payload. The network
key is optional.
{
"spec": {
"display_name": "${COMPUTE_POOL_NAME}",
"max_cfu": ${MAX_CFU},
"environment": {
"id": "${ENV_ID}"
}
}
}
Quotation mark characters in the JSON string must be escaped, so the payload string to send resembles the following:
export JSON_DATA="{
\"spec\": {
\"display_name\": \"${COMPUTE_POOL_NAME}\",
\"max_cfu\": ${MAX_CFU},
\"environment\": {
\"id\": \"${ENV_ID}\"
}
}
}"
Run the following command to update the compute pool specified in the COMPUTE_POOL_ID environment variable.
curl --request PATCH \
--url "https://api.confluent.cloud/fcpm/v2/compute-pools/${COMPUTE_POOL_ID}" \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}" \
--header 'content-type: application/json' \
--data "${JSON_DATA}"
Delete a Flink compute pool¶
Delete a compute pool in your environment by sending a DELETE request to the Compute Pools endpoint.
- This request uses your Cloud API key instead of the Flink API key.
Getting details about a compute pool requires the following inputs:
export COMPUTE_POOL_ID="<compute-pool-id>" # example: "lfcp-8m03rm"
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
export ENV_ID="<environment-id>" # example: "env-z3y2x1"
Run the following command to delete the compute pool specified in the COMPUTE_POOL_ID environment variable.
curl --request DELETE \
--url "https://api.confluent.cloud/fcpm/v2/compute-pools/${COMPUTE_POOL_ID}?environment=${ENV_ID}" \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}"
List Flink regions¶
List the regions where Flink is available by sending a GET request to the Regions endpoint.
- This request uses your Cloud API key instead of the Flink API key.
Getting details about a compute pool requires the following inputs:
export CLOUD_API_KEY="<cloud-api-key>"
export CLOUD_API_SECRET="<cloud-api-secret>"
export BASE64_CLOUD_KEY_AND_SECRET=$(echo -n "${CLOUD_API_KEY}:${CLOUD_API_SECRET}" | base64 -w 0)
Run the following command to list the available Flink regions.
curl --request GET \
--url "https://api.confluent.cloud/fcpm/v2/regions" \
--header "Authorization: Basic ${BASE64_CLOUD_KEY_AND_SECRET}" \
| jq -r '.data[].id'
Your output should resemble:
aws.eu-central-1
aws.us-east-1
aws.eu-west-1
aws.us-east-2
...