[BitSail] Connector开发详解系列三:SourceReader

news/2024/2/28 16:17:06

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

Source Connector

本文将主要介绍负责数据读取的组件SourceReader:

SourceReader

每个SourceReader都在独立的线程中执行,只要我们保证SourceSplitCoordinator分配给不同SourceReader的切片没有交集,在SourceReader的执行周期中,我们就可以不考虑任何有关并发的细节。

SourceReader接口

public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {void start();void pollNext(SourcePipeline<T> pipeline) throws Exception;void addSplits(List<SplitT> splits);/*** Check source reader has more elements or not.*/boolean hasMoreElements();/*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() {}/*** Process all events which from {@link SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) {}/*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId);/*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception {}interface Context {TypeInfo<?>[] getTypeInfos();String[] getFieldNames();int getIndexOfSubtask();void sendSplitRequest();}
}

构造方法

这里需要完成和数据源访问各种配置的提取,比如数据库库名表名、消息队列cluster和topic、身份认证的配置等等。

示例

public RocketMQSourceReader(BitSailConfiguration readerConfiguration,Context context,Boundedness boundedness) {this.readerConfiguration = readerConfiguration;this.boundedness = boundedness;this.context = context;this.assignedRocketMQSplits = Sets.newHashSet();this.finishedRocketMQSplits = Sets.newHashSet();this.deserializationSchema = new RocketMQDeserializationSchema(readerConfiguration,context.getTypeInfos(),context.getFieldNames());this.noMoreSplits = false;cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER);topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC);consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG);pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE);pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT);commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT);accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY);secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);
}

start方法

初始化数据源的访问对象,例如数据库的执行对象、消息队列的consumer对象或者文件系统的连接。

示例

消息队列

public void start() {try {if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));consumer = new DefaultMQPullConsumer(aclClientRPCHook);} else {consumer = new DefaultMQPullConsumer();}consumer.setConsumerGroup(consumerGroup);consumer.setNamesrvAddr(cluster);consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE,cluster, topic, consumerGroup, UUID.randomUUID()));consumer.setConsumerPullTimeoutMillis(pollTimeout);consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);}
}

数据库

public void start() {this.connection = connectionHolder.connect();// Construct statement.String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos);String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true);try {this.statement = connection.prepareStatement(querySql);} catch (SQLException e) {throw new RuntimeException("Failed to prepare statement.", e);}LOG.info("Task {} started.", subTaskId);
}

FTP

public void start() {this.ftpHandler.loginFtpServer();if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {this.skipFirstLine = true;}
}

addSplits方法

将SourceSplitCoordinator给当前Reader分配的Splits列表添加到自己的处理队列(Queue)或者集合(Set)中。

示例

public void addSplits(List<RocketMQSplit> splits) {LOG.info("Subtask {} received {}(s) new splits, splits = {}.",context.getIndexOfSubtask(),CollectionUtils.size(splits),splits);assignedRocketMQSplits.addAll(splits);
}

hasMoreElements方法

在无界的流计算场景中,会一直返回true保证Reader线程不被销毁。

在批式场景中,分配给该Reader的切片处理完之后会返回false,表示该Reader生命周期的结束。

public boolean hasMoreElements() {if (boundedness == Boundedness.UNBOUNDEDNESS) {return true;}if (noMoreSplits) {return CollectionUtils.size(assignedRocketMQSplits) != 0;}return true;
}

pollNext方法

在addSplits方法添加完成切片处理队列且hasMoreElements返回true时,该方法调用,开发者实现此方法真正和数据交互。

开发者在实现pollNext方法时候需要关注下列问题:

  • 切片数据的读取

    • 从构造好的切片中去读取数据。

  • 数据类型的转换

    • 将外部数据转换成BitSail的Row类型

示例

以RocketMQSourceReader为例:

从split队列中选取split进行处理,读取其信息,之后需要将读取到的信息转换成BitSail的Row类型,发送给下游处理。

public void pollNext(SourcePipeline<Row> pipeline) throws Exception {for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {MessageQueue messageQueue = rocketmqSplit.getMessageQueue();PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),consumerTag,rocketmqSplit.getStartOffset(),pollBatchSize,pollTimeout);if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) {continue;}for (MessageExt message : pullResult.getMsgFoundList()) {Row deserialize = deserializationSchema.deserialize(message.getBody());pipeline.output(deserialize);if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) {LOG.info("Subtask {} rocketmq split {} in end of stream.",context.getIndexOfSubtask(),rocketmqSplit);finishedRocketMQSplits.add(rocketmqSplit);break;}}rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset());if (!commitInCheckpoint) {consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset());}}assignedRocketMQSplits.removeAll(finishedRocketMQSplits);
}

转换为BitSail Row类型的常用方式

自定义RowDeserializer类

对于不同格式的列应用不同converter,设置到相应Row的Field。

public class ClickhouseRowDeserializer {interface FiledConverter {Object apply(ResultSet resultSet) throws SQLException;}private final List<FiledConverter> converters;private final int fieldSize;public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) {this.fieldSize = typeInfos.length;this.converters = new ArrayList<>();for (int i = 0; i < fieldSize; ++i) {converters.add(initFieldConverter(i + 1, typeInfos[i]));}}public Row convert(ResultSet resultSet) {Row row = new Row(fieldSize);try {for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).apply(resultSet));}} catch (SQLException e) {throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause());}return row;}private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) {if (!(typeInfo instanceof BasicTypeInfo)) {throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet.");}Class<?> curClass = typeInfo.getTypeClass();if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getByte(index);}if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getShort(index);}if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getInt(index);}if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getLong(index);}if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> {BigDecimal dec = resultSet.getBigDecimal(index);return dec == null ? null : dec.toBigInteger();};}if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getFloat(index);}if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDouble(index);}if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBigDecimal(index);}if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getString(index);}if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDate(index);}if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTimestamp(index);}if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTime(index);}if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBoolean(index);}if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> null;}throw new UnsupportedOperationException("Unsupported data type: " + typeInfo);}
}
实现DeserializationSchema接口

相对于实现RowDeserializer,我们更希望大家去实现一个继承DeserializationSchema接口的实现类,将一定类型格式的数据对数据比如JSON、CSV转换为BitSail Row类型。 

在具体的应用时,我们可以使用统一的接口创建相应的实现类

public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {private BitSailConfiguration deserializationConfiguration;private TypeInfo<?>[] typeInfos;private String[] fieldNames;private transient DeserializationSchema<byte[], Row> deserializationSchema;public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,TypeInfo<?>[] typeInfos,String[] fieldNames) {this.deserializationConfiguration = deserializationConfiguration;this.typeInfos = typeInfos;this.fieldNames = fieldNames;ContentType contentType = ContentType.valueOf(deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());switch (contentType) {case CSV:this.deserializationSchema =new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);break;case JSON:this.deserializationSchema =new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);break;default:throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);}}@Overridepublic Row deserialize(Writable message) {return deserializationSchema.deserialize((message.toString()).getBytes());}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}
}

也可以自定义当前需要解析类专用的DeserializationSchema:

public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {private final BitSailConfiguration deserializationConfiguration;private final transient DateTimeFormatter localDateTimeFormatter;private final transient DateTimeFormatter localDateFormatter;private final transient DateTimeFormatter localTimeFormatter;private final int fieldSize;private final TypeInfo<?>[] typeInfos;private final String[] fieldNames;private final List<DeserializationConverter> converters;public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,TypeInfo<?>[] typeInfos,String[] fieldNames) {this.deserializationConfiguration = deserializationConfiguration;this.typeInfos = typeInfos;this.fieldNames = fieldNames;this.localDateTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN));this.localDateFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN));this.localTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN));this.fieldSize = typeInfos.length;this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList());}@Overridepublic Row deserialize(Writable message) {int arity = fieldNames.length;Row row = new Row(arity);Writable[] writables = ((ArrayWritable) message).get();for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).convert(writables[i].toString()));}return row;}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}private interface DeserializationConverter extends Serializable {Object convert(String input);}private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) {Class<?> typeClass = typeInfo.getTypeClass();if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) {return field -> null;}if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) {return this::convertToBoolean;}if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) {return this::convertToInt;}throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED,String.format("Csv format converter not support type info: %s.", typeInfo));}private boolean convertToBoolean(String field) {return Boolean.parseBoolean(field.trim());}private int convertToInt(String field) {return Integer.parseInt(field.trim());}
}

snapshotState方法

生成并保存State的快照信息,用于ckeckpoint。

示例

public List<RocketMQSplit> snapshotState(long checkpointId) {LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);if (commitInCheckpoint) {for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {try {consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),rocketMQSplit.getMessageQueue(),checkpointId);} catch (MQClientException e) {throw new RuntimeException(e);}}}return Lists.newArrayList(assignedRocketMQSplits);
}

hasMoreElements方法

每次调用pollNext方法之前会做sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext方法才会被调用。

示例

public boolean hasMoreElements() {if (noMoreSplits) {return CollectionUtils.size(assignedHadoopSplits) != 0;}return true;
}

notifyNoMoreSplits方法

当Reader处理完所有切片之后,会调用此方法。

示例

public void notifyNoMoreSplits() {LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());noMoreSplits = true;
}

【关于BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和建议:https://github.com/bytedance/bitsail/issues

贡献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

加入BitSail技术社群


http://www.ppmy.cn/news/1039055.html

相关文章

Mysql 事务隔离级别详解

一、什么是事务隔离&#xff1f; 事务隔离是指在数据库中&#xff0c;多个并发执行的事务之间相互隔离的程度。事务隔离级别是一个重要的概念&#xff0c;它定义了事务在读取和修改数据时能够接触到其他事务所做的修改的程度。 事务隔离的目的是确保并发事务能够正确地执行&a…

Python代码之贪吃蛇

Python贪吃蛇游戏的最简单代码&#xff1a; import pygame, sys from pygame.locals import *pygame.init() fpsClock pygame.time.Clock()WINDOW pygame.display.set_mode((400, 300)) pygame.display.set_caption(贪吃蛇)BLACK pygame.Color(0, 0, 0) WHITE pygame.Colo…

urllib爬虫模块

urllib爬取数据 import urllib.request as request# 定义url url "https://www.baidu.com" #模拟浏览器发起请求获取响应对象 response request.urlopen(url)""" read方法返回的是字节形式的二进制数据 二进制--》字符串 解码 decode( 编码的格式…

C++11并发与多线程笔记(6) unique_lock(类模板)

C11并发与多线程笔记&#xff08;6&#xff09; unique_lock&#xff08;类模板&#xff09; 1、unique_lock取代lock_guard2、unique_lock的第二个参数2.1 std::adopt_lock&#xff1a;2.2 std::try_to_lock&#xff1a;2.3 std::defer_lock&#xff1a; 3、unique_lock的成员…

Codeforces 1856E2 复杂度分析 + DP

题意 传送门 Codeforces 1856E2 PermuTree (hard version) 题解 可以独立考虑每一个固定的 p l c a ( u , v ) plca(u,v) plca(u,v) 对答案的贡献。可以观察到&#xff0c;对于 p p p 的每一棵子树&#xff0c;其所有节点在最优情况下仅有 a p < a v a_p < a_v ap…

【NAS群晖drive异地访问】使用cpolar远程访问内网Synology Drive「内网穿透」

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…

时序预测 | MATLAB实现基于CNN-LSTM卷积长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于CNN-LSTM卷积长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于CNN-LSTM卷积长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 MATLAB实现基…

ShowMeBug CEO李亚飞受邀出席ArchSummit 全球架构师峰会

2023年7月21-22日&#xff0c;极客邦科技旗下InfoQ中国举办的ArchSummit 全球架构师峰会&#xff08;深圳站&#xff09;2023 在深圳顺利召开。本次会议&#xff0c;聚集了国内外数百位架构师专家来分享技术内容&#xff0c;像MySQL之父、科大讯飞涵盖语言大模型、AIGC、可观测…

【100天精通python】Day37:GUI界面编程_PyQT从入门到实战(上)

目录 专栏导读 1 PyQt6 简介&#xff1a; 1.1 安装 PyQt6 和相关工具&#xff1a; 1.2 PyQt6 基础知识&#xff1a; 1.2.1 Qt 的基本概念和组件&#xff1a; 1.2.2 创建和使用 Qt 窗口、标签、按钮等基本组件 1.2.3 布局管理器&#xff1a;垂直布局、水平布局、网格布局…

仿牛客论坛项目day4|开发社区登录模块

1、发送邮件 使用spring-boot-starter-mail这个包 2、开发注册功能 &#xff08;1&#xff09;访问注册页面 功能拆解&#xff1a; 点击顶部的注册按钮&#xff0c;打开注册页面 新增文件&#xff1a;controller->login 具体实现过程&#xff1a; 增加一个getregist…

【数据结构】 ArrayList简介与实战

文章目录 什么是ArrayListArrayList相关说明 ArrayList使用ArrayList的构造无参构造指定顺序表初始容量利用其他 Collection 构建 ArrayListArrayList常见操作获取list有效元素个数获取和设置index位置上的元素在list的index位置插入指定元素删除指定元素删除list中index位置上…

Python面向对象版本贪吃蛇实现

先来一波效果图吧 看看如何设计代码实现 import random import sysimport pygame# 游戏状态 ready 未开始 gameing 游戏中 end 游戏结束class Util:"""工具类&#xff1a; 提供静态方法"""staticmethoddef click_check(sprite):""&…

Web AP—事件高级

事件高级 代码下载 注册事件 给元素添加事件&#xff0c;称为注册事件或者绑定事件&#xff0c;注册事件有两种方式: 传统方式&#xff08;on 开头的事件 onclick&#xff1b;注册事件具有唯一性&#xff0c;同一个元素同一个事件只能设置一个处理函数&#xff0c;最后注册…

学习笔记十六:命名空间和标签

命名空间 命名空间什么是命名空间namespace应用场景namespacs使用案例分享namespace资源限额创建pod时候必须设置资源限额&#xff0c;否则创建失败&#xff0c;如下&#xff1a; 标签什么是标签给pod资源打标签查看资源标签 命名空间 什么是命名空间 Kubernetes 支持多个虚拟…

C语言如何实现DES加密与解密

C语言实现DES加密解密 #include "des.h" //移位表 static Table_size const shiftTable[NumberOfKeys] {1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1}; //E扩展表 static Table_size const eTable[des_key_pc2_standard]{32, 1, 2, 3, 4, 5, 4, 5, 6, …

Constanze‘s Machine

一、题目 二、分析 列表找规律&#xff0c;不同长度的u能够带来多少种不同的情况 发现规律&#xff0c;case满足斐波那契数列。 所以可以先预计算斐波那契数列fib。 #include<iostream> #include<cstring> #include<algorithm> #define int long long usi…

【Spring专题】Spring之Bean的生命周期源码解析——阶段二(一)(IOC之实例化)

目录 前言阅读准备阅读指引阅读建议 课程内容一、SpringIOC之实例化1.1 简单回顾1.2 概念回顾1.3 核心方法讲解 二、方法讲解2.1 AbstractBeanFactory#getMergedLocalBeanDefinition&#xff1a;合并BeanDefinition2.2 AbstractAutowireCapableBeanFactory#createBean&#xff…

看似多余实则无用的金手指镀金引线对高速信号有影响吗?

首先简单说明下Chris为什么会写这一篇本来应该属于东哥讲的工艺知识的文章哈&#xff01;这篇文章的诞生完全是出于一次偶然的交谈&#xff0c;这天Chris在饮水机旁装水&#xff0c;刚好看到隔离封装部门的同事小马哥也过来&#xff0c;他刚和某友商板厂开完会&#xff0c;估计…

arm-linux-gnueabihf-g++ gcc编译、优化命令 汇总

gcc优化选项&#xff0c;可在编译时间&#xff0c;目标文件长度&#xff0c;执行效率三个维度&#xff0c;进行不同的取舍和平衡。 gcc 常用编译选项 arm-linux-gnueabihf-g -O3 -marcharmv7-a -mcpucortex-a9 -ftree-vectorize -mfpuneon -mfpuvfpv3-fp16 -mfloat-abihard -…

Python中的字典遍历

Python字典是一种关联数组或哈希表&#xff0c;其中存储了键值对。遍历字典的方法有很多&#xff0c;以下是一些常见的方法&#xff1a; 遍历所有的键: d {a: 1, b: 2, c: 3}for key in d:print(key)或者 for key in d.keys():print(key)遍历所有的值: for value in d.values(…
最新文章