flinkSQL Table转DataStream

news/2023/12/4 18:50:10

flink版本1.14

flinksql 来源于kafka json格式数据

变化的表

业务中sql可能不完全满足使用,需要转换成DataStream 更灵活一些,所以需要互相转换,发挥各自的优势。

 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toDataStream(tableResult, Row.class);dataStream.flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

会发现报错

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

那是因为在table-to-stream中,数据在发生变化,因此需要用toChangelogStream来转换

修改成如下内容:

 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toChangelogStream(tableResult).flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

在网上找了一圈,没有类似的文章,自己做个记录。
这里虽然也可以用createTemporaryView来把表转换成DataStream,但是这种方式的表是固定的,在实际应用的使用场景中,还是toChangelogStream的应用更广一些。

executeSql() 与 sqlQuery()

你可能会发现 有的地方用的是executeSql() 有的地方用的是 sqlQuery() ,这两者是什么不同呢,对此我特意去看了一下源码里的注释。

sqlQuery(String query)

针对sqlQuery()他是这么说的

评估对已注册表的 SQL 查询并返回一个 Table 对象,该对象描述了进一步转换的管道。查询引用的所有表和其他对象都必须在 TableEnvironment 中注册。例如,使用 createTemporaryView(String, Table)) 来引用 Table 对象或使用 createTemporarySystemFunction(String, Class) 来引用函数。
或者,当调用 Table.toString() 方法时,会自动注册 Table 对象,例如当它被嵌入到字符串中时。因此,SQL 查询可以直接内联(即匿名)引用 Table 对象,如下所示:
Table table = ...; 
String tableName = table.toString(); 
// 表没有注册到表环境 
tEnv.sqlQuery("SELECT * FROM " + tableName + " WHERE a > 12");
请注意,返回的 Table 是一个 API 对象,仅包含管道描述。它实际上对应于 SQL 术语中的视图。调用 Table.execute() 触发执行或直接使用 executeSql(String)。

executeSql(String statement)

执行给定的单个语句并返回执行结果。
语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。对于 DML 和 DQL,此方法在提交作业后返回 TableResult。对于 DDL 和 DCL 语句,操作完成后将返回 TableResult。
如果多个管道应将数据作为单个执行的一部分插入到一个或多个接收器表中,请使用 StatementSet(请参阅 createStatementSet())。
默认情况下,所有 DML 操作都是异步执行的。使用 TableResult.await() 或 TableResult.getJobClient() 来监控执行。设置 TableConfigOptions.TABLE_DML_SYNC 以始终同步执行。

需要强调的是,这两个是不能强行互相转换的,出现以下报错

java.lang.ClassCastException:org.apache.flink.table.api.internal.TableResultImpl cannot be cast to org.apache.flink.table.api.Table

还有一点, sqlQuery(String query)不能执行复杂的语句,会出现以下报错

Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.

固定的表

如果需求只是一个固定的表可以通过这种下面的案例

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// connector doris的一个表 PerCountNametEnv.executeSql(CreateTableSQL.PerCountNDoris);Table perCountName = tEnv.from("PerCountName");// PerCountName表对应的实体类 PerCountName.classtEnv.toDataStream(perCountName, PerCountName.class).flatMap(new FlatMapFunction<PerCountName, Object>() {@Overridepublic void flatMap(PerCountName value, Collector<Object> out) throws Exception {String s = value.getName()+"->"+value.getNum();out.collect(s);}}).print();env.execute();

http://www.ppmy.cn/news/52667.html

相关文章

「SQL面试题库」 No_49 产品销售分析 I

&#x1f345; 1、专栏介绍 「SQL面试题库」是由 不是西红柿 发起&#xff0c;全员免费参与的SQL学习活动。我每天发布1道SQL面试真题&#xff0c;从简单到困难&#xff0c;涵盖所有SQL知识点&#xff0c;我敢保证只要做完这100道题&#xff0c;不仅能轻松搞定面试&#xff0…

【社区图书馆】30+危机,最值得读烂的5本书|必读

“年少时总会抱怨读书无用&#xff0c;殊不知那只是你没有用心感受读书带给你的好处。” ㅤ 一直觉得迈入30大门并不可怕&#xff0c;“35岁危机”也离自己甚远。然而&#xff0c;近1年自己身上发生的一些事情&#xff0c;让自己越来越认识到&#xff1a;只有不断丰富内在、提高…

优漫动游APP四类页面样式设计

在设计过程中&#xff0c;在新设计一个页面时&#xff0c;会遇到一个基本的问题&#xff0c;我的页面背景应该是底色&#xff1f;页面的要素怎么组合展现才能达到更好的展现美观度&#xff0c;贴合业务需要&#xff0c;实现更高的转化率&#xff1f;   基于上面的问题&…

java版本电子招标采购系统源码—企业战略布局下的采购

​ 智慧寻源 多策略、多场景寻源&#xff0c;多种看板让寻源过程全程可监控&#xff0c;根据不同采购场景&#xff0c;采取不同寻源策略&#xff0c; 实现采购寻源线上化管控&#xff1b;同时支持公域和私域寻源。 询价比价 全程线上询比价&#xff0c;信息公开透明&#xff0…

ISO-27145故障诊断说明

ISO-27145故障诊断说明 2.1 27145目录说明 ISO27145-1: 这里边介绍的是一般信息和用例定义&#xff1b; ISO27145-2: 这里边介绍的是与排放相关的通用数据规则&#xff0c;用于查询&#xff1b; ISO27145-3: 这里边主要介绍了支持的服务 12服务 14服务 19服务 22服务 31服务&…

StringRedisTemplate-基本使用

StringRedisTemplate继承自RedisTemplate,在这里说明一下&#xff0c;当我们使用RedisTemplate往redis中存储java对象的时候&#xff0c;他会顺带着将该java对象的字节码文件也同时存进了内存中&#xff0c;这是为了实现自动反序列化Autowired private StringRedisTemplate red…

【JavaWeb】JavaScript

1、JavaScript 介绍 Javascript 语言诞生主要是完成页面的数据验证。因此它运行在客户端&#xff0c;需要运行浏览器来解析执行 JavaScript 代码。 JS 是 Netscape 网景公司的产品&#xff0c;最早取名为 LiveScript;为了吸引更多 java 程序员。更名为 JavaScript。 JS 是弱…

OpenCL与Metal API下如何合理地安排线程组大小

我们玩过OpenCL的朋友都知道&#xff0c;我们可以通过clGetDeviceInfo接口来查询当前计算设备的几乎所有属性&#xff0c;包括当前计算单元的个数、最大工作组大小、本地存储器大小等等。但这些属性值都是基于当前计算设备的最大可支持能力&#xff0c;而不是当前内核程序执行上…

机器学习实战教程(九):模型泛化

泛化能力 模型泛化是指机器学习模型对新的、未见过的数据的适应能力。在机器学习中&#xff0c;我们通常会将已有的数据集划分为训练集和测试集&#xff0c;使用训练集训练模型&#xff0c;然后使用测试集来评估模型的性能。模型在训练集上表现得好&#xff0c;并不一定能在测…

rk3568 适配摄像头 (mipi 单摄)

rk3568 适配摄像头 (mipi 单摄) MIPI CSI&#xff08;Mobile Industry Processor Interface Camera Serial Interface&#xff09;是一种用于移动设备的高速串行接口标准&#xff0c;用于连接图像传感器和图像处理器。MIPI CSI接口使用差分信号传输技术&#xff0c;将数据分为…

微信小程序对接在线客服系统,对接小程序订阅消息模板,小程序订阅方法以及后端发送订阅模板消息的方法...

微信小程序想要对接独立在线客服系统&#xff0c;除了使用小程序消息推送接口外&#xff0c;还可以使用webview嵌入的形式嵌入聊天链接。 但是&#xff0c;使用webview嵌入的形式&#xff0c;当用户离开页面以后&#xff0c;就收不到客服回复的消息了 所以&#xff0c;我们需要…

ICMP 协议详解

文章目录 1 概述2 ICMP 协议2.1 工作原理2.2 报文格式2.3 ICMP 类型 1 概述 #mermaid-svg-6yUB8ZNYSzjbbDDq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-6yUB8ZNYSzjbbDDq .error-icon{fill:#552222;}#mermaid-s…

苦熬10年,国产操作系统“归零”,新操作系统上新,跟Excel很像

苦熬10余年&#xff0c;国产操作系统自主研发 说到国内自主研发的操作系统&#xff0c;经验最丰富的品牌&#xff0c;当然是麒麟OS. 从诞生到发展&#xff0c;历经10多年的努力&#xff0c;麒麟os逐渐成为了国内自主研发操作系统领域中的一颗耀眼的明珠。麒麟OS不仅推出了许多…

【链表】力扣203题:移除链表元素

【链表】力扣203题&#xff1a;移除链表元素 力扣203题&#xff1a;移除链表元素 建议在看题目之前先了解数组的具体知识点&#xff0c;可以看这里&#xff1a; 算法基础&#xff08;三&#xff09;&#xff1a;链表知识点及题型讲解。 其它题目&#xff1a; 【链表】力扣206题…

设计模式简谈

设计模式是我们软件架构开发中不可缺失的一部分&#xff0c;通过学习设计模式&#xff0c;我们可以更好理解的代码的结构和层次。 设计原则 设计原则是早于设计方法出现的&#xff0c;所以的设计原则都要依赖于设计方法。这里主要有八个设计原则。 推荐一个零声学院免费教程&…

Spring IOC之对象的创建方式、策略及销毁时机和生命周期且获取方式

目录 一、对象的创建方式 1. 使用构造方法 2. 使用工厂类方法 3. 使用工厂类的静态方法 二、对象的创建策略 1. 单例策略 2. 多例策略 三、对象的销毁时机 四、生命周期方法 1. 定义生命周期方法 2. 配置生命周期方法 3. 测试 五、获取Bean对象的方式 1. 通过id/…

参与C++项目时的那些事儿

开发工具 在开发团队内部&#xff0c;使用相同的IDE、编译器等开发工具&#xff0c;工具的版本号和配置保持一致&#xff0c;便于开发团队积累使用经验&#xff0c;避免、消除工具的差异引入的问题。 代码质量 从检查时机看&#xff0c;分为&#xff1a; 开发人员本地检查&am…

以轻量级服务器niginx为核心的JavaWeb项目:第一章 项目设计

这里写目录标题 一 需求分析与环境搭建1.需求分析2.环境搭建1.2.1首先配置mysql环境1.2.2 配置maven环境 二 打成War包&#xff0c;发到linux上 一 需求分析与环境搭建 1.需求分析 2.环境搭建 1.2.1首先配置mysql环境 先查找一下mysql环境 [roothadoop122 ~]# mysql --vers…

vue---mixin混入

mixins是一种分发 Vue 组件中可复用功能的非常灵活的方式。混合对象可以包含任意组件选项。当组件使用混合对象时&#xff0c;所有混合对象的选项将被混入该组件本身的选项。 一个混入对象可以包含任意组件选项&#xff08;如data、methods、created、mounted等等&#xff09;。…

Stable Diffusion XL:更快,更强

Stable Diffusion XL&#xff1a;更快&#xff0c;更强 今天&#xff0c;Stability AI 的创始人兼首席执行官 Emad Mostaque 发推宣布&#xff0c;Stable Diffusion XL 进入公测阶段。 核心信息总结起来有2点&#xff1a; “XL”不是新模型的官方名称&#xff0c;Stability …
最新文章