Compare Current and Previous Values in a Data Stream with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® provides a LAG function, which is a built-in function that enables you to access data from a previous event in the same row without the need for a self-join. It gives you the ability to analyze the differences between consecutive rows or to create more complex calculations based on previous events. This can be particularly useful in scenarios such as comparing daily sales values.
In this guide, you will learn how to run an Flink SQL statement that uses the LAG function to compare current and historical order values from a continuous data stream of orders data.
This topic shows the following steps:
Prerequisites¶
- Access to Confluent Cloud.
- The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, contact your OrganizationAdmin or EnvironmentAdmin. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
- A provisioned Flink compute pool.
Step 1: Inspect the example stream¶
In this step, you query the read-only orders
table in the
examples.marketplace
database to inspect the stream for fields that you
can mask.
Log in to Confluent Cloud and navigate to your Flink workspace.
In the Use catalog dropdown, select your environment.
In the Use database dropdown, select your Kafka cluster.
Run the following statement to inspect the example
orders
stream.SELECT * FROM examples.marketplace.orders;
Your output should resemble:
order_id customer_id product_id price 68362284-34df-41a3-87fb-50b79647b786 3195 1267 47.48 6e03663e-d20b-4a23-848a-aec959d794e3 3094 1412 50.92 84217b5d-7dcb-46d1-9600-675a3734a3ed 3038 1094 83.56 ...
Step 2: View aggregated results¶
Run the following statement to start a query on the
orders
data using the LAG function to return current and previous order data for each customer.SELECT $rowtime AS row_time , customer_id , order_id , price , LAG(order_id, 1) OVER (PARTITION BY customer_id ORDER BY $rowtime) previous_order_id , LAG(price, 1) OVER (PARTITION BY customer_id ORDER BY $rowtime) previous_order_price FROM examples.marketplace.orders;
Your output should resemble:
row_time customer_id order_id price previous_order_id previous_order_price 2024-01-11 15:42:00.557 3213 821f81d4-d912-4e0f-ab8b-88fe8d9af397 89.34 2c26a03b-4cd5-4df6-90d0-0b11916533d2 57.89 2024-01-11 15:42:01.079 3090 57b20b43-3f52-49d8-b8bc-3a55d0440482 50.22 c913ea7b-a7dc-4b22-b966-8df3f28e8e5e 66.12 2024-01-11 15:42:01.391 3142 8a536722-3e4f-4920-bd33-2b981179b8f8 10.77 NULL NULL 2024-01-11 15:42:01.482 3006 cabf50e8-129d-4b71-b253-894526a571c1 113.12 NULL NULL 2024-01-11 15:42:01.681 3009 fd96d839-f06b-43ef-a23f-38e4ca6849b4 78.01 d5cdafb2-ddf1-4161-8843-48ae5f46f524 102.34 2024-01-11 15:42:01.910 3158 16165e84-d1d6-49b9-afaf-1856c4f2a751 354.11 NULL NULL ...
Note that there are some
NULL
values forprevious_order_id
andprevious_order_price
. For these customers, the current order is the first order they have made, so there is no historical previous order data to return.