Flink Catalog

news/2024/4/16 2:59:52

1.Flink侧创建

  按照SQL的解析处理流程在Parse解析SQL以后,进入执行流程——executeInternal。
  其中有个分支专门处理创建Catalog的SQL命令

} else if (operation instanceof CreateCatalogOperation) {return createCatalog((CreateCatalogOperation) operation);
createCatalog方法里完成两件事:1、创建Catalog对象;2、向catalogManager注册
Catalog catalog =FactoryUtil.createCatalog(catalogName, properties, tableConfig, userClassLoader);
catalogManager.registerCatalog(catalogName, catalog);

  创建Catalog会去全包查找对应的CatalogFactory的子类,然后使用配置的子类构建

final CatalogFactory legacyFactory =TableFactoryService.find(CatalogFactory.class, options, classLoader);
return legacyFactory.createCatalog(catalogName, options);

  这里注意,上面的步骤只查询classpath下的类,像HiveCatalog这种外置增加的,在这个步骤里找不到,会抛出NoMatchingTableFactoryException异常之后继续其他步骤处理来获取

} catch (NoMatchingTableFactoryException e) {// No matching legacy factory found, try using the new stackfinal DefaultCatalogContext discoveryContext =new DefaultCatalogContext(catalogName, options, configuration, classLoader);try {final CatalogFactory factory = getCatalogFactory(discoveryContext);

  最终在FactoryUtil.discoverFactory的方法中进行过滤查找,这里用到了type配置做过滤,基于Factory的

factoryIdentifier获取工厂的字段与配置做对比
final List<Factory> matchingFactories =foundFactories.stream().filter(f -> f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());

2.HiveCatalog

  获取到对应的Factory以后,会调用其createCatalog方法创建对应的Catalog

return new HiveCatalog(context.getName(),helper.getOptions().get(DEFAULT_DATABASE),helper.getOptions().get(HIVE_CONF_DIR),helper.getOptions().get(HADOOP_CONF_DIR),helper.getOptions().get(HIVE_VERSION));

  HiveCatalog的整个创建过程主要是发现Hive配置的过程,其他接口就是对库表的操作接口
  获取配置主要是基于上面hive-conf-dir、hadoop-conf-dir来的,首先是根据这两个配置去获取hive配置,如果都获取不到,会从classpath下面去获取hive的配置文件

URL hiveSite =Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);

3.IcebergCatalog

  Iceberg走的应该是前面TableFactoryService.find能找到的接口,因为它实现的是properties参数的接口,clusterHadoopConf()就是调用的Flink里的方法获取Hadoop的配置

@Override
public Catalog createCatalog(String name, Map<String, String> properties) {return createCatalog(name, properties, clusterHadoopConf());
}

3.1.CatalogLoader

  第一步是创建CatalogLoader,这是Iceberg Catalog的类加载器
  这里可以配置自定义类加载器,相关配置:catalog-impl,如果没有配置则走默认
  默认流程根据catalog-type配置选择实例化Hive的还是Hadoop的,默认是Hive的

String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
switch (catalogType.toLowerCase(Locale.ENGLISH)) {case ICEBERG_CATALOG_TYPE_HIVE:// The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in// that case it will// fallback to parse those values from hadoop configuration which is loaded from classpath.String hiveConfDir = properties.get(HIVE_CONF_DIR);String hadoopConfDir = properties.get(HADOOP_CONF_DIR);Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);return CatalogLoader.hive(name, newHadoopConf, properties);case ICEBERG_CATALOG_TYPE_HADOOP:return CatalogLoader.hadoop(name, hadoopConf, properties);
}

  创建CatalogLoader主要就是进行一些基本参数的设置

private HiveCatalogLoader(String catalogName, Configuration conf, Map<String, String> properties) {this.catalogName = catalogName;this.hadoopConf = new SerializableConfiguration(conf);this.uri = properties.get(CatalogProperties.URI);this.warehouse = properties.get(CatalogProperties.WAREHOUSE_LOCATION);this.clientPoolSize =properties.containsKey(CatalogProperties.CLIENT_POOL_SIZE)? Integer.parseInt(properties.get(CatalogProperties.CLIENT_POOL_SIZE)): CatalogProperties.CLIENT_POOL_SIZE_DEFAULT;this.properties = Maps.newHashMap(properties);
}

3.2.FlinkCatalog

  接下来就是进行一些配置然后创建FlinkCatalog
  配置里注意Hadoop有一个特殊的配置:base-namespace,这是配置namespa的,会自动带上前缀,应该就是在warehouse加上前缀
  这里还有缓存配置:cache-enabled、cache.expiration-interval-ms,控制Catalog是否缓存表入口

3.3.loadCatalog

  FlinkCatalog会使用CatalogLoader加载Catalog,最终会到CatalogUtil.loadCatalog()
  这里最终会用Class.forName来加载类,基于Constructor来构建实例

  ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();catalog = ctor.newInstance();

3.4.HiveCatalog

  Hive类型最终创建的是org.apache.iceberg.hive.HiveCatalog
  initialize初始化也基本上是进行配置,有两个注意的对象:FileIO、CachedClientPool
  io-impl可以配置文件读取,默认用Iceberg的HadoopFileIO

this.fileIO =fileIOImpl == null? new HadoopFileIO(conf): CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

  CachedClientPool是一个Hive连接缓存,缓存的是HiveMetaStoreClient

return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName());

http://www.ppmy.cn/news/1365051.html

相关文章

通过一个例子演示golang调用C语言动态链接库中的函数

本例提供了cgo调用C函数的示例&#xff0c;也演示了如何将C函数打印内容保存到golang的变量中 目录和源码 目录结构 adminhpc-1:~/go/my_stdout$ tree . ├── include │ ├── mylibrary.c │ └── mylibrary.h ├── lib └── main.go2 directories, 3 files a…

Android 框架设计模板

不同项目在使用该模板时多少会有出入&#xff0c;应以项目实际情况作为依据。 &#xff08;该文档可以 .md 格式存放于项目根目录&#xff0c;或编写到readme 中&#xff09; 项目描述 涉及如下方面 项目背景 &#xff08;可引用项目立项书&#xff09;项目需求 &#xff08…

【kubernetes】关于k8s集群的声明式管理资源

目录 一、声明式管理方法 二、资源配置清单管理 1、导出资源配置清单 2、修改资源配置清单并应用 2.1离线修改 2.2在线修改 三、通过资源配置清单创建资源对象 获取K8S资源配置清单文件模板&#xff1f; 关于配置清单常见的字段 方案一&#xff1a;手写yaml配置文件 …

QT C++实践|超详细数据库的连接和增删改查操作|附源码

0&#xff1a;前言 &#x1faa7; 什么情况需要数据库? 1 大规模的数据需要处理&#xff08;比如上千上万的数据量&#xff09;2 需要把数据信息存储起来&#xff0c;无论是本地还是服务上&#xff0c;而不是断电后数据信息就消失了。 如果不是上面的原因化&#xff0c;一般…

学习JAVA的第四天(基础)

目录 方法 方法的定义 方法的调用 参数 注意事项 方法的重载 练习 面向对象 类和对象 定义类的注意事项 封装 private关键字 this关键字 构造方法 标准的Javabean类 创建一个对象时&#xff0c;虚拟机做了什么&#xff1f; 方法 方法含义&#xff1a;方法是程序…

Node.js+vue校内二手物品交易系统tdv06-vscode前后端分离

二手物品交易系统采用B/S架构&#xff0c;数据库是MySQL。网站的搭建与开发采用了先进的nodejs进行编写&#xff0c;使用了vue框架。该系统从三个对象&#xff1a;由管理员和用户、店铺来对系统进行设计构建。主要功能包括&#xff1a;个人信息修改&#xff0c;对用户、店铺、二…

【面试题】在浏览器地址栏输入URL后会发生什么

1. 地址栏输入后的本地操作 当我们在浏览器的地址栏中&#xff0c;输入xxx内容后&#xff0c;浏览器的进程首先会判断输入的内容&#xff1a; 如果是普通的字符&#xff0c;那浏览器会使用默认的搜索引擎去对于输入的xxx生成URL。如若输入的是网址&#xff0c;那浏览器会拼接…

golang使用gorm操作mysql1

1.mysql连接配置 package daoimport ("fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger" )var DB *gorm.DB// 连接数据库&#xff0c;启动服务的时候&#xff0c;init方法就会执行 func init() {username : "roo…

binwalk安装记录和burpsuite安装记录

我的虚拟机环境是Ubuntu20.04 python有2.7的和3.8的 [[#binwalk|binwalk]] [[#binwalk#pip|pip]][[#binwalk#安装 sasquatch|安装 sasquatch]][[#binwalk#安装 jefferson|安装 jefferson]][[#binwalk#安装 ubi_reader|安装 ubi_reader]][[#binwalk#安装 yaffshiv|安装 yaffshi…

2.26作业

2.将信号灯集的函数二次封装 sem.c #include<myhead.h>union semun {int val; /* Value for SETVAL */struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */unsigned short *array; /* Array for GETALL, SETALL */struct seminfo *__buf;…

Eigen-Array数组类和系数式运算

Array数组类和系数式运算&#xff09; 一、概述二、数组类型三、访问数组中的值四、加减法五、乘法六、其他系数相关的操作七、数组和矩阵表达式之间转换 一、概述 Array类提供了通用数组&#xff0c;而Matrix类则用于线性代数。此外&#xff0c;Array类提供了一种简单的方法来…

Linux中死锁种类和解决方法

死锁&#xff1a; 第一种&#xff1a; 加了两次锁&#xff0c;导致还没解锁就想获得锁&#xff0c;一直阻塞&#xff1a; void*mythread(void *arg) {int n5000;int x;while(n--){pthread_mutex_lock(&mutex);pthread_mutex_lock(&mutex);xnumber;x;numberx;pthread…

如何选购油烟净化器?环保性能与个人需求的完美契合

我最近分析了餐饮市场的油烟净化器等产品报告&#xff0c;解决了餐饮业厨房油腻的难题&#xff0c;更加方便了在餐饮业和商业场所有需求的小伙伴们。 在选择油烟净化器时&#xff0c;环保性能与个人需求的完美契合至关重要。下面&#xff0c;让我们一起探讨如何选购适合自己的油…

初识Lombok

前言 最近读一些公司的业务代码&#xff0c;发现近几年的java项目工程中都使用了lombok&#xff0c;lombok是一个可以自动生成get,set、toString等模板类方法的工具框架&#xff0c;程序再引入lombok后&#xff0c;添加一个注解便可以不写get\set\toString等方法。 Lombok示例…

蓝桥杯Learning

Part 1 递归和递推 1. 简单斐波那契数列 n int(input())st [0]*(47) # 注意这个地方&#xff0c;需要将数组空间设置的大一些&#xff0c;否则会数组越界 st[1] 0 st[2] 1 # 这个方法相当于是递推&#xff0c;即先求解一个大问题的若干个小问题 def dfs(u):if u 1:print(…

未来新质生产力Agent的起源与应用

Agent是什么&#xff1f; AI Agent的发展经历了从哲学思想启蒙到计算机科学助力、专家系统兴起、机器学习崛起、深度学习突破等多个阶段。如今&#xff0c;AI Agent已经成为人工智能领域的重要组成部分&#xff0c;为人类带来了巨大的便利和发展机遇。早在古希腊时期&#xff0…

第十一天-Excel的操作

目录 1.xlrd-Excel的读模块 安装 使用 获取工作簿 读取工作簿的内容 xlsxwriter-Excel的写模块 安装 使用 生成图表 add_series参数 图表的样式 demo&#xff1a;生成图表 Excel的操作在python中有多个模块&#xff0c;为了能够快速使用&#xff0c;选择了相对简单…

【MySQL】_联合查询基础表

联合查询也称为多表查询&#xff0c;是将多个表联合到一起进行查询&#xff1b; 笛卡尔积是联合查询的基础&#xff0c;笛卡尔积其实就是一种排列组合&#xff0c;把两张表的记录尽可能地排列组合出n种情况&#xff1a; 以两张表&#xff1a;班级表与学生表为例&#xff0c;计…

SpringBoot快速入门(黑马学习笔记)

需求 需求&#xff1a;基于SpringBoot的方式开发一个Web应用&#xff0c;浏览器发起请求/hello后&#xff0c;给浏览器返回字符串"Hello World~"。 开发步骤 第一步&#xff1a;创建SpringBoot工程项目 第二步&#xff1a;定义HelloController类&#xff0c;添加方…

ChatGPT能替代什么人?

上一讲关于ChatGPT的热炒&#xff0c;其实对于我们来说算是敲了敲警钟。 其实在今天&#xff0c;关于ChatGPT&#xff0c;最多人关注的一个问题就是&#xff1a;ChatGPT能取代人吗&#xff0c;或者说能抢人的饭碗么吗&#xff1f; 有人说不能&#xff0c;也有人说能&#xff…
最新文章