深度了解flink(七) JobManager(1) 组件启动流程分析

news/2025/6/21 16:02:20/

前言

JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。

集群启动模式

ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下

通过上图可知道,Flink有3种集群模式

Flink Session集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
  • KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
  • YarnSessionClusterEntrypoint Yarn session模式下集群的入口类

集群生命周期

在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:

Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

适用场景:

因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等

Flink Per Job集群

只要Yarn提供了继承的子类:YarnJobClusterEntrypoint

集群生命周期

在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。

Flink Application集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
  • KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
  • YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类

集群生命周期

Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业

JobManager启动流程

JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。

main方法入口

public static void main(String[] args) {// 打印系统相关信息EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);//信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭SignalHandler.register(LOG);//注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭JvmShutdownSafeguard.installAsShutdownHook(LOG);// 解析命令行参数,获取配置信final EntrypointClusterConfiguration entrypointClusterConfiguration =ClusterEntrypointUtils.parseParametersOrExit(args,new EntrypointClusterConfigurationParserFactory(),StandaloneSessionClusterEntrypoint.class);//加载config.yaml,构建Configuration对象Configuration configuration = loadConfiguration(entrypointClusterConfiguration);StandaloneSessionClusterEntrypoint entrypoint =new StandaloneSessionClusterEntrypoint(configuration);ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

主要步骤

  1. 打印系统信息。
  2. 注册信号处理器,注册系统级别的信号,确保优雅关闭。
  3. 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
  4. 解析命令行参数,加载配置文件。
  5. 初始化 StandaloneSessionClusterEntrypoint
  6. 调用 ClusterEntrypoint#runClusterEntrypoint 方法启动集群。

ClusterEntrypoint#runClusterEntrypoint

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {//clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);
}//无关代码 无需关注
}

核心步骤

  • 调用 clusterEntrypoint.startCluster() 启动集群。

ClusterEntrypoint#startCluster

public void startCluster() throws ClusterEntrypointException {//无关代码 无需关注try {FlinkSecurityManager.setFromConfiguration(configuration);//插件管理类,用来加载插件。插件加载两种方式。//1).通过如下参数配置FLINK_PLUGINS_DIR。//2).将插件jar包放入到plugins下PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);//初始化文件系统的配置configureFileSystems(configuration, pluginManager);//初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,//使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。//总结:初始化安全环境,创建安全环境的时候会做一系列的检查。SecurityContext securityContext = installSecurityContext(configuration);ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);//安全的情况下调用runCluster开始初始化组件securityContext.runSecured((Callable<Void>)() -> {runCluster(configuration, pluginManager);return null;});} catch (Throwable t) {//异常处理代码 无需关注}}

startCluster方法主要做了一些环境和配置初始化的工作

主要步骤

  1. 初始化插件管理器,用来加载插件。
  2. 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
  3. 初始化安全环境。
  4. 安全环境下调用 runCluster 方法。

ClusterEntrypoint#runCluster

private void  runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {//初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等initializeServices(configuration, pluginManager);// write host information into configurationconfiguration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());//创建Dispatcher和ResourceManger组件的工厂类final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建Dispatcher和ResourceManger组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,delegationTokenManager,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),failureEnrichers,this);//组件停止运行后的异步方法clusterComponent.getShutDownFuture().whenComplete(//代码省略)}}

主要步骤

1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等

2.创建Dispatcher和ResourceManger组件的工厂类

3.创建Dispatcher和ResourceManger组件

4.定义组件停止运行后的异步方法

总结

本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。


http://www.ppmy.cn/news/1543405.html

相关文章

C#二分查找算法

前言 二分查找算法是一种在有序数组中查找特定元素的搜索算法。 实现原理 二分查找的实现依赖于以下几个关键步骤&#xff1a; 计算查找范围的中间索引。 比较中间索引处的值与目标值。 根据比较结果调整查找范围&#xff08;左半部分或右半部分&#xff09;。 重复上述步…

论文提交步骤 | 2024年第五届MathorCup大数据竞赛

2024年第五届MathorCup数学应用挑战赛—大数据竞赛于2024年10月25日下午6点正式开赛。 论文和承诺书、支撑材料&#xff08;可选&#xff09;及计算结果文档由各参赛队队长电脑登录下方报名主页提交&#xff1a; https://www.saikr.com/vse/bigdata2024 初赛作品提交截止时间为…

线性可分支持向量机代码 举例说明 具体的变量数值变化

### 实现线性可分支持向量机 ### 硬间隔最大化策略 class Hard_Margin_SVM:### 线性可分支持向量机拟合方法def fit(self, X, y):# 训练样本数和特征数m, n X.shape# 初始化二次规划相关变量&#xff1a;P/q/G/hself.P matrix(np.identity(n 1, dtypenp.float))self.q matr…

程序员工作七年,我踩过的那七个坑

作者&#xff1a;东东拿铁 引言 今天想聊聊&#xff0c;自己在7年工作中踩过坑的7件事情。 一、不会问问题 一杯茶一包烟&#xff0c;一个bug改一天。 程序员面对技术难题是非常正常的事情&#xff0c;谁还没有碰见问题束手无策的时候呢。 记得刚工作的时候&#xff0c;我…

机器人技术基础(4章逆运动解算和雅克比矩阵)

逆运动解算&#xff1a; 雅克比矩阵&#xff1a; 将动力学分析转向运动的物体 下图中的 n o y 反映了机器人的姿态矩阵&#xff0c; 最后一列 p 反应了机器人在空间中的位置&#xff1a;

YOLOv4和Darknet实现坑洼检测

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【MobileViT实现垃圾分类】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现mni…

【SpringCloud】06-Sentinel

1. 雪崩问题 一个微服务出现问题导致一系列微服务都不可以正常工作。 服务保护方案&#xff1a; 请求限流。线程隔离。 服务熔断 2. Sentinel 启动Sentinel java -Dserver.port8090 -Dcsp.sentinel.dashboard.serverlocalhost:8090 -Dproject.namesentinel-dashboard -ja…

Spring Cache-基于注解的缓存

Spring Cache 是 Spring 提供的缓存抽象框架&#xff0c;能够将数据缓存到内存或外部缓存中&#xff0c;减少数据库或远程服务的访问频率&#xff0c;从而显著提升应用性能。Spring Cache 通过注解的方式实现缓存逻辑&#xff0c;使用方便&#xff0c;支持多种缓存实现&#xf…