Flink Table API 和 Flink-SQL使用详解

news/2024/5/28 4:12:26/

Flink Table API 和 Flink-SQL使用详解

1.Table API & Flink SQL-核心概念

​ Apache Flink 有两种关系型 API 来做流批统一处理:

  • Table API

    • Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子
  • Flink SQL

    • Flink SQL 是基于 Apache Calcite 来实现的标准 SQL

​ Apache Calcite 是一种提供了标准的 SQL 语言、多种查询优化和连接各种数据源基础框架,可以让用户轻松的接入各种数据,并实现使用SQL查询。此外,Calcite 还提供了 OLAP 和流处理的查询引擎

​ Table API 和 Flink SQL 这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果

​ Table API 和 Flink SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。可在这些 API 之间或一些基于这些 API 的库之间进行切换。

​ 例如你可以先用 CEP( Flink复杂事件处理 ) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据( Gelly是Flink的图API库 )

​ Table API 和 Flink SQL 现在还处于活跃开发阶段,没有完全实现所有的特性。不是所有的 [ Table API,SQL ] 和 [ 流,批 ] 的组合都支持

1-1.动态表和连续查询介绍

​ 动态表( Dynamic Tables ) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询( Continuous Query )。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其动态结果表,以反映其动态输入表上的更改。

需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果

流程:

  1. 将流转换为动态表( Dynamic Tables )
  2. 在动态表( Dynamic Tables ) 上计算一个连续查询( Continuous Query ),生成一个新的动态表。
  3. 生成的动态表被转换回流

1-2.在流上定义动态表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作

假设有如下格式的数据:

user:  VARCHAR   // 用户名
cTime: TIMESTAMP // 访问 URL 的时间
url:   VARCHAR   // 用户访问的 URL

下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表的数据将不断增长

  • 连续查询

​ 在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表,在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同

连续查询过程:

  1. 查询开始,clicks 表为空时,对应SELECT查询的数据为空
  2. 第一行数据被流入到 clicks 表时,SELECT 查询开始计算结果表。第一行数据 [ Mary,./home ] 产生后,SELECT的结果表则第一行 [ Mary, 1 ] 数据产生
  3. 第二行 [ Bob, ./cart ] 插入到 clicks 表时,查询会更新结果表并插入了一行新数据 [Bob, 1]。
  4. 第三行 [Mary, ./prod?id=1] 将产生已计算的结果行的更新,[Mary, 1] 更新成 [Mary, 2]。
  5. 第四行,当第四行数据加入 clicks 表时,查询将第三行 [Liz, 1] 插入到结果表中
  6. clicks等待新的数据流入,待新数据流入,则继续对应执行计算结果并更新右侧表

2.Flink Table API 开发介绍

2-1.常用依赖

API开发常用依赖配置:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.21</version>
</dependency>

${flink.version}为变量名,在中声明, 例如:

    <properties><flink.version>1.13.6</flink.version><scala.binary.version>2.12</scala.binary.version></properties>

也可以直接写版本号

2-2.表与DataStream的混合使用

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_test01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);Table resultTable = table.where($("id").isEqual("sensor_1")).select($("*"));DataStream<Row> toAppendStream = tableEnv.toAppendStream(resultTable, Row.class);toAppendStream.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
+I[sensor_1, 1000, 10]
+I[sensor_1, 2000, 20]
+I[sensor_1, 4000, 40]
+I[sensor_1, 5000, 50]*/

2-3.聚合操作

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_test02 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);Table resultTable = table.where($("vc").isGreaterOrEqual(20)).groupBy($("id")).aggregate($("vc").sum().as("vc_sum")).select($("id"), $("vc_sum"));DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(resultTable, Row.class);retractStream.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
(true,+I[sensor_1, 20])
(true,+I[sensor_2, 30])
(false,-U[sensor_1, 20])
(true,+U[sensor_1, 60])
(false,-U[sensor_1, 60])
(true,+U[sensor_1, 110])
(false,-U[sensor_2, 30])
(true,+U[sensor_2, 90])*/

2-4.表到流的转换

​ 动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表( 没有 UPDATE 和 DELETE 修改 ),或者介于两者之间的其他表。
​ 在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

  • Append-only流
  • Retract流
  • Upsert流

2-4-1.Append-only流

仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流

2-4-2.Retract流

​ retract 流包含两种类型的 message: add messagesretract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message,将 UPDATE 操作编码为更新先前行的 retract message 和更新最新行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程

  • 操作 - insert
    • 对应: add
  • 操作 - update
    • 对应: retract & add
  • 操作 - delete
    • 对应: retract

2-4-3.Upsert流

​ upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 Upsert流 的 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程

  • 操作 - insert
    • 对应: upsert
  • 操作 - update
    • 对应: upsert
  • 操作 - delete
    • 对应: delete

2-5.通过Connector声明读入数据

动态表直接连接到数据

2-5-1.File source

代码示例:

package com.zenitera.bigdata.flinksql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_FileSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);streamTableEnvironment.connect(new FileSystem().path("input/sensor.txt")).withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")).withSchema(schema).createTemporaryTable("sensor");Table sensor = streamTableEnvironment.from("sensor");Table resultTable = sensor.groupBy($("id")).select($("id"), $("id").count().as("id_count"));DataStream<Tuple2<Boolean, Row>> resultStream = streamTableEnvironment.toRetractStream(resultTable, Row.class);resultStream.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60result:
(true,+I[sensor_1, 1])
(false,-U[sensor_1, 1])
(true,+U[sensor_1, 2])
(true,+I[sensor_2, 1])
(false,-U[sensor_1, 2])
(true,+U[sensor_1, 3])
(false,-U[sensor_2, 1])
(true,+U[sensor_2, 2])
(false,-U[sensor_2, 2])
(true,+U[sensor_2, 3])*/

2-5-2.Kafka Source

代码示例:

package com.zenitera.bigdata.flinksql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_KafkaSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);streamTableEnvironment.connect(new Kafka().version("universal").topic("sensor").startFromLatest().property("group.id", "bigdata").property("bootstrap.servers", "hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");Table sensorTable = streamTableEnvironment.from("sensor");Table resultTable = sensorTable.groupBy($("id")).select($("id"), $("id").count().as("id_count"));DataStream<Tuple2<Boolean, Row>> resultStream = streamTableEnvironment.toRetractStream(resultTable, Row.class);resultStream.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-producer.sh --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092 --topic sensor
>{"id": "sensor_1", "ts": 1000, "vc": 10}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_2", "ts": 3000, "vc": 30}
--------------------------------------------------
(true,+I[sensor_1, 1])
(true,+I[sensor_2, 1])
(false,-U[sensor_1, 1])
(true,+U[sensor_1, 2])
(false,-U[sensor_1, 2])
(true,+U[sensor_1, 3])
(false,-U[sensor_1, 3])
(true,+U[sensor_1, 4])
(false,-U[sensor_2, 1])
(true,+U[sensor_2, 2])
(false,-U[sensor_2, 2])
(true,+U[sensor_2, 3])*/

2-6.通过Connector声明写出数据

2-6-1.File Sink

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_FileSink {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment TableEnvironment = StreamTableEnvironment.create(env);Table table = TableEnvironment.fromDataStream(waterSensorStream);Table ResultTable = table.where($("id").isEqual("sensor_1")).select($("id"), $("ts"), $("vc"));Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());TableEnvironment.connect(new FileSystem().path("output/sensor_0407.csv")).withFormat(new Csv()).withSchema(schema).createTemporaryTable("sensor");ResultTable.executeInsert("sensor");}
}/*
output/sensor_0407.csv
sensor_1,1000,10
sensor_1,2000,20
sensor_1,4000,40
sensor_1,5000,50*/

2-6-2.Kafka Sink

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;public class Flink01_TableApi_KafkaSink {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorDataStreamSource);Table resultTable = sensorTable.where($("id").isEqual("sensor_1")).select($("*"));Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());tableEnv.connect(new Kafka().version("universal").topic("sink_sensor").sinkPartitionerRoundRobin().property("bootstrap.servers", "hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");resultTable.executeInsert("sensor");}
}/*
[wangting@hdt-dmcp-ops01 bin]$ consumer sink_sensor
{"id":"sensor_1","ts":1000,"vc":10}
{"id":"sensor_1","ts":2000,"vc":20}
{"id":"sensor_1","ts":4000,"vc":40}
{"id":"sensor_1","ts":5000,"vc":50}*/

3.Flink SQL 开发介绍

3-1.Flink SQL基本使用

3-1-1.使用sql查询未注册的表

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** 使用sql查询未注册的表*/
public class Flink02_flinksql_test01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);Table resultTable = tableEnv.sqlQuery("select * from " + table + " where id = 'sensor_1'");tableEnv.toRetractStream(resultTable, Row.class).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
(true,+I[sensor_1, 1000, 10])
(true,+I[sensor_1, 2000, 20])
(true,+I[sensor_1, 4000, 40])
(true,+I[sensor_1, 5000, 50])*/

3-1-2.使用sql查询已注册的表

代码示例:

package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** 使用sql查询已注册的表*/
public class Flink02_flinksql_test02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);// 注册为一个临时视图tableEnv.createTemporaryView("sensor", table);Table resultTable = tableEnv.sqlQuery("select * from sensor where id = 'sensor_1'");tableEnv.toRetractStream(resultTable, Row.class).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
(true,+I[sensor_1, 1000, 10])
(true,+I[sensor_1, 2000, 20])
(true,+I[sensor_1, 4000, 40])
(true,+I[sensor_1, 5000, 50])*/

3-2.Flink-SQL从Kafka读数据并写入Kafka

代码示例:

package com.zenitera.bigdata.flinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink02_flinksql_KafkaToKafka {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_source_sensor',"+ "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"+ "'properties.group.id' = 'wangting',"+ "'scan.startup.mode' = 'latest-offset',"+ "'format' = 'json'"+ ")");tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_sink_sensor',"+ "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"+ "'format' = 'json'"+ ")");tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");}
}/*
在topic_source_sensor主题中使用producer生产者输入消息
[wangting@hdt-dmcp-ops01 bin]$ producer topic_source_sensor
>{"id": "sensor_1", "ts": 1000, "vc": 10}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_2", "ts": 4000, "vc": 40}
>{"id": "sensor_1", "ts": 5000, "vc": 50}
>{"id": "sensor_2", "ts": 6000, "vc": 60}
>
---------------------------------------------------
在topic_sink_sensor主题中使用consumer消费者消费消息
[wangting@hdt-dmcp-ops02 ~]$ consumer topic_sink_sensor
{"id":"sensor_1","ts":1000,"vc":10}
{"id":"sensor_1","ts":3000,"vc":30}
{"id":"sensor_1","ts":5000,"vc":50}*/

4. Table API & Flink SQL-时间属性

​ 像窗口(在 Table API 和 Flink SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作

时间属性主要分为:

  • 处理时间
  • 事件时间

4-1.处理时间 - proctime()

代码示例:

package com.zenitera.bigdata.flinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** 处理时间* 声明一个额外的字段来作为处理时间字段*/
public class Flink03_TimeAttributes_01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt as PROCTIME()) with(" +"'connector' = 'filesystem'," +"'path' = 'input/sensor.txt'," +"'format' = 'csv'" +")");TableResult tableResult = tableEnv.executeSql("select * from sensor");tableResult.print();}
}/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
--------------------------------
+----+--------------------------------+----------------------+-------------+-------------------------+
| op |                             id |                   ts |          vc |                      pt |
+----+--------------------------------+----------------------+-------------+-------------------------+
| +I |                       sensor_1 |                    1 |          10 | 2023-04-11 17:13:53.840 |
| +I |                       sensor_1 |                    2 |          20 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    4 |          30 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_1 |                    4 |         400 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    5 |          50 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    6 |          60 | 2023-04-11 17:13:53.842 |
+----+--------------------------------+----------------------+-------------+-------------------------+*/

4-2.事件时间 - rowtime()

​ 事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
​ 除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
​ 为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

​ 事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段

  • 替换一个已经存在的字段

    不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳

代码示例:

package com.zenitera.bigdata.flinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** 事件时间 rowtime()* 声明一个额外的字段来作为事件时间字段*/
public class Flink03_TimeAttributes_02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"rowtime as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for rowtime as rowtime - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
--------------------------------
+----+--------------------------------+----------------------+-------------+-------------------------+
| op |                             id |                   ts |          vc |                 rowtime |
+----+--------------------------------+----------------------+-------------+-------------------------+
| +I |                       sensor_1 |                    1 |          10 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    2 |          20 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    4 |          30 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    4 |         400 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    5 |          50 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    6 |          60 | 1970-01-01 08:00:00.000 |
+----+--------------------------------+----------------------+-------------+-------------------------+*/

5.Table API & Flink SQL-窗口-window

在Table API和Flink SQL中,主要有两种窗口:

  • Group Windows
  • Over Windows

5-1. Table API 中 使用window窗口

5-1-1. Table API - Group Windows

​ 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。
​ Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

  • Table API - Group Windows - 滚动窗口
package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Table API - Group Windows - 滚动窗口* Tumble*/
public class Flink04_TableApi_GroupWindow01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Tumble.over(lit(10).second()).on($("ts")).as("timestamp")).groupBy($("id"), $("timestamp")).select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum")).execute().print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |          90 |
+----+--------------------------------+-------------------------+-------------------------+-------------+*/
  • Table API - Group Windows - 滑动窗口
package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Table API - Group Windows - 滑动窗口* Slide*/
public class Flink04_TableApi_GroupWindow02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("timestamp")).groupBy($("id"), $("timestamp")).select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum")).execute().print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1969-12-31 23:59:55.000 | 1970-01-01 00:00:05.000 |          70 |
| +I |                       sensor_2 | 1969-12-31 23:59:55.000 | 1970-01-01 00:00:05.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |          90 |
| +I |                       sensor_2 | 1970-01-01 00:00:05.000 | 1970-01-01 00:00:15.000 |          60 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 | 1970-01-01 00:00:15.000 |          50 |
+----+--------------------------------+-------------------------+-------------------------+-------------+*/
  • Table API - Group Windows - 会话窗口
package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Table API - Group Windows - 会话窗口* Session*/
public class Flink04_TableApi_GroupWindow03 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Session.withGap(lit(6).second()).on($("ts")).as("timestamp")).groupBy($("id"), $("timestamp")).select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum")).execute().print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 | 1970-01-01 00:00:11.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 | 1970-01-01 00:00:12.000 |          90 |
+----+--------------------------------+-------------------------+-------------------------+-------------+*/

5-1-2. Table API - Over Windows

​ Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
​ Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
​ 无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

  • Table API - Over Windows - Unbounded Over Windows
package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.UNBOUNDED_ROW;/*** Table API - Over Windows - Unbounded Over Windows*/
public class Flink05_TableApi_OverWindow01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((element, recordTimestamp) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorStreamSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w")).select($("id"), $("ts"), $("vc").sum().over($("w")).as("vc_sum")).execute().print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
+----+--------------------------------+-------------------------+-------------+
| op |                             id |                      ts |      vc_sum |
+----+--------------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 |          10 |
| +I |                       sensor_1 | 1970-01-01 00:00:02.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:04.000 |          70 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 |          30 |
| +I |                       sensor_2 | 1970-01-01 00:00:06.000 |          90 |
+----+--------------------------------+-------------------------+-------------+*/
  • Table API - Over Windows - Bounded Over Windows
package com.zenitera.bigdata.flinksql;import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.*;/*** Table API - Over Windows - Bounded Over Windows* 向前推一个时间单位得到窗口  lit(2).second()*/
public class Flink05_TableApi_OverWindow02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((element, recordTimestamp) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(waterSensorStreamSource, $("id"), $("ts").rowtime(), $("vc"));table.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w")).select($("id"), $("ts"), $("vc").sum().over($("w")).as("vc_sum")).execute().print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
+----+--------------------------------+-------------------------+-------------+
| op |                             id |                      ts |      vc_sum |
+----+--------------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 |          10 |
| +I |                       sensor_1 | 1970-01-01 00:00:02.000 |          30 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:04.000 |          70 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 |         110 |
| +I |                       sensor_2 | 1970-01-01 00:00:06.000 |          90 |
+----+--------------------------------+-------------------------+-------------+*/

5-2. Flink SQL API 中 使用window窗口

5-2-1.Flink SQL - Group Windows

  • TUMBLE(time_attr, interval)

​ 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上

  • HOP(time_attr, interval, interval)

​ 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上

  • SESSION(time_attr, interval)

​ 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

Flink SQL - Group Windows - 滚动窗口 代码示例

package com.zenitera.bigdata.tableapiflinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Flink SQL - Group Windows* 滚动窗口 - TUMBLE(time_attr, interval)*/
public class Flink06_FlinkSQL_GroupWindow01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int," +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with(" +"'connector' = 'filesystem'," +"'path' = 'input/sensor.txt'," +"'format' = 'csv'" +")");tableEnv.sqlQuery("SELECT " +" id, " +" TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +" TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +" SUM(vc) sum_vc " +" from sensor " +" GROUP BY TUMBLE(t, INTERVAL '1' minute), id ").execute().print();}
}
/*
+----+-------------+-------------------------+-------------------------+-------------+
| op |          id |                  wStart |                    wEnd |      sum_vc |
+----+-------------+-------------------------+-------------------------+-------------+
| +I |    sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         430 |
| +I |    sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         140 |
+----+-------------+-------------------------+-------------------------+-------------+*/

Flink SQL - Group Windows - 滑动窗口 代码示例

package com.zenitera.bigdata.tableapiflinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Flink SQL - Group Windows* 滑动窗口 - HOP(time_attr, interval, interval)*/
public class Flink06_FlinkSQL_GroupWindow02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int," +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with(" +"'connector' = 'filesystem'," +"'path' = 'input/sensor.txt'," +"'format' = 'csv'" +")");tableEnv.sqlQuery("SELECT " +" id, " +"  hop_start(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkStart,  " +"  hop_end(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkEnd,  " +" SUM(vc) vc_sum_value " +" from sensor " +"GROUP BY hop(t, INTERVAL '10' minute, INTERVAL '1' hour), id").execute().print();}
}
/*
+----+--------------+-------------------------+-------------------------+--------------+
| op |           id |          WatermarkStart |            WatermarkEnd | vc_sum_value |
+----+--------------+-------------------------+-------------------------+--------------+
| +I |     sensor_1 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          140 |
| +I |     sensor_2 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          430 |
| +I |     sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          140 |
+----+--------------+-------------------------+-------------------------+--------------+*/

5-2-2.Flink SQL - Over Windows

sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)

package com.zenitera.bigdata.tableapiflinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink07_FlinkSQL_OverWindow01 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int," +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with(" +"'connector' = 'filesystem'," +"'path' = 'input/sensor.txt'," +"'format' = 'csv'" +")");tableEnv.sqlQuery("select " +"id," +"vc," +"sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row) vc_sum_value " +"from sensor").execute().print();}
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+---------------+-------------+--------------+
| op |            id |          vc | vc_sum_value |
+----+---------------+-------------+--------------+
| +I |      sensor_1 |          10 |           10 |
| +I |      sensor_1 |          20 |           30 |
| +I |      sensor_1 |         400 |          420 |
| +I |      sensor_2 |          30 |           30 |
| +I |      sensor_2 |          50 |           80 |
| +I |      sensor_2 |          60 |          110 |
+----+---------------+-------------+--------------+*/

window w as (partition by id order by t rows between 1 PRECEDING and current row)

package com.zenitera.bigdata.tableapiflinksql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink07_FlinkSQL_OverWindow02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int," +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with(" +"'connector' = 'filesystem'," +"'path' = 'input/sensor.txt'," +"'format' = 'csv'" +")");tableEnv.sqlQuery("select " +"id," +"vc," +"count(vc) over w vc_count_value, " +"sum(vc) over w vc_sum_value " +"from sensor " +"window w as (partition by id order by t rows between 1 PRECEDING and current row)").execute().print();}
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+---------------+-------------+----------------------+--------------+
| op |            id |          vc |       vc_count_value | vc_sum_value |
+----+---------------+-------------+----------------------+--------------+
| +I |      sensor_1 |          10 |                    1 |           10 |
| +I |      sensor_1 |          20 |                    2 |           30 |
| +I |      sensor_1 |         400 |                    2 |          420 |
| +I |      sensor_2 |          30 |                    1 |           30 |
| +I |      sensor_2 |          50 |                    2 |           80 |
| +I |      sensor_2 |          60 |                    2 |          110 |
+----+---------------+-------------+----------------------+--------------+*/

http://www.ppmy.cn/news/47661.html

相关文章

RK3568平台开发系列讲解(调试篇)IS_ERR函数的使用

🚀返回专栏总目录 文章目录 一、IS_ERR函数用法二、IS_ERR函数三、内核错误码沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 IS_ERR 函数的使用。 一、IS_ERR函数用法 先看下用法: 二、IS_ERR函数 对于任何一个指针来说,必然存在三种情况: 一种是合…

电源常识-PCB材质防火等级焊锡工艺

1、目前主流的PCB材质分类主要有以下几种,如图1&#xff0c;图2&#xff0c;图3。FR-4材质比CEM-1好&#xff0c;CEM-1比FR-1好。 按结构分为单面板&#xff0c;双面板&#xff0c;多层板。单面板就是单面铺铜走线&#xff0c;双面板就是上下两面都可以铺铜走线&#xff0c;多层…

Guns社区医疗项目

又是一年毕业季&#xff0c;计算机专业大四的同学们要接受毕业设计的考验啦。又有多少同学为了毕业设计而愁眉苦脸&#xff0c;心力憔悴。考虑到这些&#xff0c;这里为同学们分享一个适合你们毕业设计的作品以及详细介绍&#xff0c;让正在焦头烂额的同学们有所启发&#xff0…

asp.net+C#房地产销售系统文献综述和开题报告+Lw

本系统使用了B/S模式&#xff0c;使用ASP.NET语言和SQL Server来设计开发的。首先把所有人分为了用户和管理员2个部分&#xff0c;一般的用户可以对系统的前台进行访问&#xff0c;对一般的信息进行查看&#xff0c;而注册用户就可以通过登录来完成对房屋信息的查看和对房屋的…

开发插件JFormDesigner(可视化GUI编程)的使用与注册-简单几步即可完成

开发插件JFormDesigner&#xff08;可视化GUI编程&#xff09;的使用与注册 获取链接&#xff1a;1.JFormDesigner获取2.记录插件下载路径3.使用zcj注册4.生成license5.打开idea进行注册 获取链接&#xff1a; https://pan.baidu.com/s/1N9ua2p3BpiMIARCEewRxIw?pwd4e9a 提取…

浅说情绪控制被杏仁体劫持

2023年4月16号&#xff0c;没想到被杏仁体劫持那么严重&#xff0c;触发手抖和口干的症状&#xff0c;这个还真是自己完全没有意识到的【这就是焦虑固化的记忆会持续化】。 【修行】人生要修炼两条线&#xff1a;一条明线是做的事情&#xff0c;那是自己要做的具体事情。 一条…

天梯赛 L2-034 口罩发放

原题链接&#xff1a; PTA | 程序设计类实验辅助教学平台 题目描述&#xff1a; 为了抗击来势汹汹的 COVID19 新型冠状病毒&#xff0c;全国各地均启动了各项措施控制疫情发展&#xff0c;其中一个重要的环节是口罩的发放。 某市出于给市民发放口罩的需要&#xff0c;推出了…

python-day6(补充三:实例变量和函数)

实例变量和函数 实例函数的定义认识__init__函数定义实例变量实例函数中访问实例变量外部访问实例变量与函数 实例函数的定义 定义实例函数 class Student:def say_hello(self, msg):print(f"hello{msg}")实例函数属于对象 class Student:def say_hello(self, m…

JMM之volatile关键字详解

1、概要 在JMM规范下有三大特性分别是&#xff1a;可见性、原子性、有序性。而被volatile关键字修饰的共享变量拥有三大特性的两大特性分别是&#xff1a;可见性和有序性。 为什么被volatile修饰的变量就可以保证变量的可见性和有序性呢&#xff1f;为啥不能保证原子性&#…

使用 PyTorch Geometric 和 GCTConv实现异构图、二部图上的节点分类或者链路预测

解决问题描述 使用 PyTorch Geometric 和 Heterogeneous Graph Transformer 实现异构图上的节点分类 在二部图上应用GTN算法(使用torch_geometric的库HGTConv)&#xff1b; 步骤解释 导入所需的 PyTorch 和 PyTorch Geometric 库。 定义 x1 和 x2 两种不同类型节点的特征&am…

如何在 TensorFlow 中使用 GPU 加速深度学习计算?

一、前言 TensorFlow 是由 Google 开源的深度学习框架,它具有易用、高效、灵活等特点,被广泛应用于学术界和工业界中。而 GPU 是一种高性能的计算设备,可以加速深度学习的计算过程。本文将介绍如何在 TensorFlow 中使用 GPU 加速深度学习计算。 二、安装 TensorFlow 安装…

Python语言中的注释方法应用

Python语言中的注释方法 在Python编程中&#xff0c;与其他编程语言一样&#xff0c;有良好的注释部分&#xff0c;会让你的程序在后续的改进或优化中&#xff0c;变得便利。同时&#xff0c;给自己培养了良好的编程习惯。 在Python语言中&#xff0c;有两种注释方法。 1.单行…

DAY 43 Apache的配置与应用

虚拟Web主机 概述 虚拟web主机指的是在同一台服务器中运行多个web站点&#xff0c;其中每一个站点实际上并不独立占用整个服务器&#xff0c;因此被称为"虚拟"web主机。通过虚拟web主机服务可以充分利用服务器的硬件资源&#xff0c;从而大大降低网站构建及运行成本…

API 接口主流协议有哪些? 如何创建不同协议?

API 接口协议繁多&#xff0c;不同的协议有着不同的使用场景。70% 互联网应用开发者日常仅会接触到最通用的 HTTP 协议&#xff0c;相信大家希望了解更多其他协议的信息。我们今天会给大家介绍各种 API 接口主流协议和他们之间的关系。 1、API 接口主流协议有哪些? 接口协议分…

理解websocket连接的原理

背景 Websocket是一个持久化的协议&#xff0c;相对于HTTP这种非持久的无状态协议来说 一、问题 http long poll&#xff0c;或者ajax轮询都可以实现实时信息传递&#xff0c;为什么还需要websocket&#xff1f; 二、理解 ajax轮询&#xff1a;浏览器隔个几秒就发送一次请求&am…

json for modern c++

目录 json for modern c概述编译问题问题描述问题解决 读取JSON文件demo json for modern c GitHub - nlohmann/json: JSON for Modern C 概述 json for modern c是一个德国大牛nlohmann写的&#xff0c;该版本的json有以下特点&#xff1a; 1.直观的语法。 2.整个代码由一个…

Spring项目创建与 Spring Bean 的存储与读取

目录 一、创建Spring项目 1.1 创建Maven项目 1.2 添加 Spring 框架依赖 1.3 添加启动类 二、Bean对象的创建与存储 2.1 创建Bean 2.2 将Bean注册到容器 2.3 获取并使用Bean对象 2.3.1 创建Spring上下文 2.3.2 从Spring容器中获取Bean对象​编辑 延申&#xff08;多种…

政企数智办公巡展回顾 | 通信赋能传统行业数智化转型的应用实践

在宏观政策引导、技术革新与企业内部数字化改革需求的共同驱使下&#xff0c;数智办公已经成为各行各业转型升级的必由之路。关注【融云 RongCloud】&#xff0c;了解协同办公平台更多干货。 近期&#xff0c;“连接无界 智赋未来” 融云 2023 政企数智办公巡展在北京、杭州相…

X进制转十进制黄金万能算法

单纯、混合进制通吃&#xff0c;真正的黄金万能的进制转换方法。 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么简单…… 地址&#xff1a;https:/…

Qt音视频开发27-ffmpeg视频旋转显示

一、前言 用手机或者平板拍摄的视频文件,很可能是旋转的,比如分辨率是1280x720,确是垂直的,相当于分辨率变成了720x1280,如果不做旋转处理的话,那脑袋必须歪着看才行,这样看起来太难受,所以一定要想办法解析到视频的旋转角度,然后根据这个角度重新绘制。在窗体那边也…