ksqlDB for Confluent Cloud Quick Start¶
This quick start gets you up and running with ksqlDB in the Confluent Cloud Console.
In this quick start guide, you perform the following steps to create a simple streaming application that tracks the latest location of simulated riders.
- Step 1: Create a ksqlDB cluster
- Step 2: Create a stream
- Step 3: Create materialized views
- Step 4: Run a push query over the stream
- Step 5: Populate the stream with events
- Step 6: Run a pull query against the materialized view
- Step 7: Clean up
Prerequisites¶
- Access to Confluent Cloud.
Step 1: Create a ksqlDB cluster¶
Log in to Cloud Console.
In the navigation menu, click Environments and click the default tile.
If you don’t have an Apache Kafka® cluster yet, click Add cluster and follow these instructions.
When you have a Kafka cluster available, click the tile of the cluster where you want ksqlDB to run.
In the cluster navigation menu, click ksqlDB, and in the ksqlDB overview page, click Add cluster.
In the New cluster page, click the My account tile and click Continue.
In the Cluster name textbox, enter a name for your cluster or accept the default cluster name, which resembles “ksqlDB_cluster_0”. Accept the default configuration options.
Click Launch cluster to create your new ksqlDB cluster.
The ksqlDB overview page opens and shows your cluster with the Provisioning status.
It may take a few minutes to complete cluster provisioning.
Step 2: Create a stream¶
When your ksqlDB cluster shows the Up status, you can strat running queries. In this step, you create a data stream. A stream associates a schema with an underlying Kafka topic.
Click the cluster you just created to open the query editor.
Copy the following query into the editor window and click Run query to create a stream named “riderLocations”.
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='quickstart-locations', value_format='json', partitions=1);
Your output should resemble:
{ "@type": "currentStatus", "statementText": "CREATE STREAM RIDERLOCATIONS (PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='quickstart-locations', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');", "commandId": "stream/`RIDERLOCATIONS`/create", "commandStatus": { "status": "SUCCESS", "message": "Stream created", "queryId": null }, "commandSequenceNumber": 2, "warnings": [] }
ksqlDB automatically creates the underlying Kafka topic, which is named “quickstart-locations”.
Here’s what each parameter in the CREATE STREAM statement does:
kafka_topic
- Name of the Kafka topic underlying the stream. In this case, it is created automatically, because it doesn’t exist yet, but you can create streams over topics that exist already.value_format
- Encoding of the messages stored in the Kafka topic. For JSON encoding, each row is stored as a JSON object whose keys and values are column names and values, for example:{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
partitions
- Number of partitions to create for thequickstart-locations
topic. This parameter is not needed for topics that exist already.
Step 3: Create materialized views¶
The goal of this quick start is to keep track of the latest location of the riders by using a materialized view.
- You create a
currentLocation
table by running a CREATE TABLE AS SELECT statement over the previously created stream. The table is updated incrementally as new rider location data arrives. The LATEST_BY_OFFSET aggregate function specifies that the application is interested only in the latest location of each rider. - To add more interest, the application materializes a derived table, named
ridersNearMountainView
, that captures the distance of riders from a given location or city.
In the query editor, clear the previous query and run the following statement to create the
currentLocation
table.-- Create the currentLocation table CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES;
Your output should resemble:
{ "@type": "currentStatus", "statementText": "CREATE TABLE CURRENTLOCATION WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='pksqlc-j3yd58CURRENTLOCATION', PARTITIONS=1, REPLICAS=3, RETENTION_MS=604800000) AS SELECT\n RIDERLOCATIONS.PROFILEID PROFILEID,\n LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA,\n LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO\nFROM RIDERLOCATIONS RIDERLOCATIONS\nGROUP BY RIDERLOCATIONS.PROFILEID\nEMIT CHANGES;", "commandId": "table/`CURRENTLOCATION`/create", "commandStatus": { "status": "SUCCESS", "message": "Created query with ID CTAS_CURRENTLOCATION_3", "queryId": "CTAS_CURRENTLOCATION_3" }, "commandSequenceNumber": 4, "warnings": [] }
The CREATE TABLE AS SELECT statement created a persistent query, named
CTAS_CURRENTLOCATION_3
, that runs continuously on the ksqlDB server. The query’s name is prepended with “CTAS”, which stands for “CREATE TABLE AS SELECT”.In the query editor, clear the previous query and run the following statement to create the
ridersNearMountainView
table.-- Create the ridersNearMountainView table CREATE TABLE ridersNearMountainView AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Your output should resemble:
{ "@type": "currentStatus", "statementText": "CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='pksqlc-j3yd58RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=3, RETENTION_MS=604800000) AS SELECT\n ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES,\n COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS,\n COUNT(*) COUNT\nFROM CURRENTLOCATION CURRENTLOCATION\nGROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1)\nEMIT CHANGES;", "commandId": "table/`RIDERSNEARMOUNTAINVIEW`/create", "commandStatus": { "status": "SUCCESS", "message": "Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5", "queryId": "CTAS_RIDERSNEARMOUNTAINVIEW_5" }, "commandSequenceNumber": 6, "warnings": [] }
Step 4: Run a push query over the stream¶
In this step, you create a
push query that filters rows in
the riderLocations
stream.
This query outputs all rows from the riderLocations
stream that have
coordinates within 5 miles of Mountain View, California.
This query never returns, until it’s terminated explicitly. It pushes output
rows perpetually to the client as events are written to the riderLocations
stream.
In the query editor, clear the previous query and run the following statement to create a push query that
filters rows in the riderLocations
stream.
-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
With this query, no output is displayed.
For now, leave this query running in the current session. In the next step, you
write some mock data into the riderLocations
stream so that the query
produces output.
Step 5: Populate the stream with events¶
Because the session from the previous step is busy waiting for output from the push query, start another session that you can use to write some data into ksqlDB.
Press the Ctrl key and click Editor to open a new query editor window.
In the new editor session, run copy the following INSERT statements and click Run query.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
No output is shown from the INSERT INTO VALUES statements, but the rows have been entered into the
riderLocations
table.In the browser, click the tab for the previous editor session. The push query that you started in Step 4 now shows the following output:
{"PROFILEID":"4a7c7b41","LATITUDE":37.4049,"LONGITUDE":-122.0822} {"PROFILEID":"8b6eae59","LATITUDE":37.3944,"LONGITUDE":-122.0813} {"PROFILEID":"4ab5cbad","LATITUDE":37.3952,"LONGITUDE":-122.0813}
Only the rows that have riders who are within 5 miles of Mountain View are displayed.
Step 6: Run a pull query against the materialized view¶
In this step, you run a pull query against the materialized view to retrieve all of the riders who are currently within 10 miles of Mountain View.
In contrast to the previous push query, which runs continuously, the pull query follows a traditional request-response model and retrieves the latest result from the materialized view.
In the second editor session, clear the previous query and run the following statement.
SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
Your output should resemble:
{"DISTANCEINMILES":"10.0","RIDERS":["18f4ea86"],"COUNT":1}
{"DISTANCEINMILES":"0.0","RIDERS":["4ab5cbad","8b6eae59","4a7c7b41"],"COUNT":3}
Step 7: Clean up¶
When you’re done with the quick start, close the second browser tab.
Click the browser tab that has the first editor session.
Click Stop query to end the push query.
Click Persistent queries to view the two queries that you created in Step 3.
For both queries, click Terminate and type their names in the delete dialog to confirm deletion.
Click Tables to view the tables that were created by the queries in Step 3.
Click the RIDERSNEARMOUNTAINVIEW table to open the details view.
Click DROP TABLE and type its name in the delete dialog to confirm deletion.
The corresponding Kafka topic is also deleted.
Repeat the deletion steps for the CURRENTLOCATION table.