Dataflow SQL streaming extensions

To process streaming data with Dataflow SQL, write queries with Dataflow SQL streaming extensions.

Streaming extensions syntax

SELECT window_timestamps [, ...]
FROM window_function_expression [table_expression, expression [, ...] ]

window_timestamps:
    { window_start 
| window_end 
}

windowing_function_expression:
    { tumble_function 
| hop_function 
| session_function 
}

For the standard SQL query syntax, see the Dataflow SQL syntax query page.

Window timestamps

Expressions in the SELECT list can refer to columns with the starting and ending timestamps of a window.

window_start

A column with the starting timestamps.

window_end

A column with the ending timestamps.

Window function expression

The window_function_expression in the FROM clause is a windowing function .

window_expression can be one of the windowing functions:

Windowing functions

Windowing functions divide streaming data into finite windows. When a watermark indicates that a window is complete, windowing functions return the data in the window, the starting timestamp , and the ending timestamp . Late data is dropped.

To compute aggregate values of streaming data, aggregate the data in each window.

TUMBLE

  TUMBLE 
 ( 
 table_expression 
 , 
  
 DESCRIPTOR 
 ( 
 timestamp_column 
 ), 
  
 window_duration 
 ) 
 

Description

Divides streaming data into tumbling windows. A tumbling window represents a consistent, disjoint time interval in the data stream.

For more information, see Tumbling windows .

Supported Argument Types

The table_expression is a table with the streaming data to divide into tumbling windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column .

The window_duration is a string with the format "INTERVAL int64 date_part" .

  • date_part must be MILLISECOND , SECOND , MINUTE , HOUR , DAY , MONTH , or YEAR .

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses tumbling windows to sum transaction amounts every 15 minutes.

  SELECT 
  
 window_start 
  
 as 
  
 period_start 
 , 
  
 window_end 
  
 as 
  
 period_end 
 , 
  
 SUM 
 ( 
 tr 
 . 
 amount 
 ) 
  
 as 
  
 amount 
 FROM 
  
 TUMBLE 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 transaction 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 tr 
 GROUP 
  
 BY 
  
 tr 
 . 
 window_start 
 
  + 
 ---------------------+---------------------+--------+ 
 | 
  
 period_start 
  
 | 
  
 period_end 
  
 | 
  
 amount 
  
 | 
 + 
 ---------------------+------------------------------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 00 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 15 
 : 
 00 
  
 | 
  
 2 
 . 
 50 
  
 | 
 + 
 ---------------------+---------------------+--------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 15 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 30 
 : 
 00 
  
 | 
  
 2 
 . 
 20 
  
 | 
 + 
 ----------------------------------------------------+ 
 

HOP

  HOP 
 ( 
 table_expression 
 , 
  
 DESCRIPTOR 
 ( 
 timestamp_column 
 ), 
  
 window_period 
 , 
  
 window_duration 
 ) 
 

Description

Divides streaming data into hopping windows. A hopping window represents a consistent time interval in the data stream. Hopping windows can overlap, whereas tumbling windows are disjoint.

For more information, see Hopping windows .

Supported Argument Types

The table_expression is a table with the streaming data to divide into hopping windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column .

The window_period and window_duration is a string with the format "INTERVAL int64 date_part" .

  • date_part must be MILLISECOND , SECOND , MINUTE , HOUR , DAY , MONTH , or YEAR .

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses hopping windows to calculate a 15-minute running total of transaction amounts every 10 minutes.

  SELECT 
  
 window_start 
  
 as 
  
 period_start 
 , 
  
 window_end 
  
 as 
  
 period_end 
 , 
  
 SUM 
 ( 
 tr 
 . 
 amount 
 ) 
  
 as 
  
 amount 
 FROM 
  
 HOP 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 transaction 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 10 MINUTE" 
 , 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 tr 
 GROUP 
  
 BY 
  
 tr 
 . 
 window_start 
 
  + 
 ---------------------+---------------------+--------+ 
 | 
  
 period_start 
  
 | 
  
 period_end 
  
 | 
  
 amount 
  
 | 
 + 
 ---------------------+------------------------------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 00 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 15 
 : 
 00 
  
 | 
  
 2 
 . 
 50 
  
 | 
 + 
 ---------------------+---------------------+--------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 10 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 25 
 : 
 00 
  
 | 
  
 2 
 . 
 30 
  
 | 
 + 
 ----------------------------------------------------+ 
 

SESSION

  SESSION 
 ( 
 table_expression 
 , 
  
 DESCRIPTOR 
 ( 
 timestamp_column 
 ), 
  
 DESCRIPTOR 
 ( 
 key1 
 , 
  
 key2 
 , 
  
 ...), 
  
 session_gap_duration 
 ) 
 

Description

Divides streaming data into session windows. A session window contains elements within a gap duration of another element. The gap duration is an interval between new data in a data stream. If data arrives after the gap duration, the data is assigned to a new window.

For more information, see Session windows .

Supported Argument Types

The table_expression is a table with the streaming data to divide into session windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column .

The session_gap_duration is a string with the format "INTERVAL int64 date_part" .

  • date_part must be MILLISECOND , SECOND , MINUTE , HOUR , DAY , MONTH , or YEAR .

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses session windows to sum transaction amounts that occur within 30 minutes of each other.

  SELECT 
  
 window_start 
  
 as 
  
 period_start 
 , 
  
 window_end 
  
 as 
  
 period_end 
 , 
  
 SUM 
 ( 
 tr 
 . 
 amount 
 ) 
  
 as 
  
 amount 
 FROM 
  
 SESSION 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 transaction 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 DESCRIPTOR 
 ( 
 transaction_id 
 ), 
  
 "INTERVAL 30 MINUTE" 
 ) 
  
 as 
  
 tr 
 GROUP 
  
 BY 
  
 tr 
 . 
 window_start 
 , 
  
 transaction_id 
 
  + 
 ---------------------+---------------------+--------+----------------+ 
 | 
  
 period_start 
  
 | 
  
 period_end 
  
 | 
  
 amount 
  
 | 
  
 transaction_id 
  
 | 
 + 
 ---------------------+---------------------+-------------------------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 00 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 10 
 : 
 00 
  
 | 
  
 2 
 . 
 50 
  
 | 
  
 1001 
  
 | 
 + 
 ---------------------+---------------------+-------------------------+ 
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 00 
 : 
 45 
 : 
 00 
  
 | 
  
 2020 
 - 
 01 
 - 
 01 
  
 01 
 : 
 45 
 : 
 00 
  
 | 
  
 2 
 . 
 25 
  
 | 
  
 1002 
  
 | 
 + 
 ---------------------+---------------------+--------+----------------+ 
 

Window requirements when joining two unbounded sources

To join two unbounded collections, both collections must have the same windows. If the windows are not compatible, Dataflow SQL throws an IllegalArgumentException .

Valid JOIN with windowing

This example joins two unbounded collections with compatible windows. Both collections are divided into tumbling windows with a 15-minute window duration.

  SELECT 
  
 * 
  
 FROM 
  
 ( 
 SELECT 
  
 tr 
 . 
 transaction_id 
  
 AS 
  
 transaction_id 
 , 
  
 SUM 
 ( 
 tr 
 . 
 amount 
 ) 
  
 AS 
  
 combined_transaction_amount 
  
 FROM 
  
 TUMBLE 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 transaction 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 tr 
  
 GROUP 
  
 BY 
  
 tr 
 . 
 transaction_id 
 ) 
 INNER 
  
 JOIN 
  
 ( 
 SELECT 
  
 invalid_tr 
 . 
 transaction_id 
  
 as 
  
 transaction_id 
  
 FROM 
  
 TUMBLE 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 ` 
 invalid 
 - 
 transactions 
 ` 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 invalid_tr 
  
 GROUP 
  
 BY 
  
 invalid_tr 
 . 
 transaction_id 
 ) 
 ON 
  
 tr 
 . 
 transaction_id 
 . 
 transaction_id 
  
 = 
  
 invalid_tr 
 . 
 transaction_id 
 . 
 transaction_id 
 

Invalid JOIN with windowing

This example joins two unbounded sources with incompatible windows. One collection is divided into hopping windows with a 10-minute window duration. The other collection is divided into tumbling windows with a 15-minute window duration.

If you run the following query, Dataflow SQL throws an IllegalArgumentException .

  SELECT 
  
 * 
  
 FROM 
  
 ( 
 SELECT 
  
 tr 
 . 
 transaction_id 
  
 AS 
  
 transaction_id 
 , 
  
 SUM 
 ( 
 tr 
 . 
 amount 
 ) 
  
 AS 
  
 combined_transaction_amount 
  
 FROM 
  
 HOP 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 transaction 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 10 MINUTE" 
 , 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 tr 
  
 GROUP 
  
 BY 
  
 tr 
 . 
 transaction_id 
 ) 
 INNER 
  
 JOIN 
  
 ( 
 SELECT 
  
 invalid_tr 
 . 
 transaction_id 
  
 as 
  
 transaction_id 
  
 FROM 
  
 TUMBLE 
 (( 
 SELECT 
  
 * 
  
 FROM 
  
 pubsub 
 . 
 topic 
 . 
 ` 
 dataflow 
 - 
 sql 
 ` 
 . 
 ` 
 invalid 
 - 
 transactions 
 ` 
 ), 
  
 DESCRIPTOR 
 ( 
 event_timestamp 
 ), 
  
 "INTERVAL 15 MINUTE" 
 ) 
  
 as 
  
 invalid_tr 
  
 GROUP 
  
 BY 
  
 invalid_tr 
 . 
 transaction_id 
 ) 
 ON 
  
 tr 
 . 
 transaction_id 
 . 
 transaction_id 
  
 = 
  
 invalid_tr 
 . 
 transaction_id 
 . 
 transaction_id 
 
Design a Mobile Site
View Site in Mobile | Classic
Share by: