
Flink主要组件

作业管理器(JobManager)
(1) 控制一个应用程序执行的主进程,也就是说,每个应用程序 都会被一个不同的Jobmanager所控制执行
(2) Jobmanager会先接收到要执行的应用程序,这个应用程序会包括:作业图( Job Graph)、逻辑数据流图( ogical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
(3) Jobmanager会把 Jobgraph转换成一个物理层面的 数据流图,这个图被叫做 “执行图”(Executiongraph),包含了所有可以并发执行的任务。Job Manager会向资源管理器( Resourcemanager)请求执行任务必要的资源,也就是 任务管理器(Taskmanager)上的插槽slot。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 Taskmanager上。而在运行过程中Jobmanagera会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
任务管理器(Taskmanager)
(1) Flink中的工作进程。通常在 Flink中会有多个 Taskmanageria运行, 每个 Taskmanageri都包含了一定数量的插槽( slots)。插槽的数量限制了Taskmanageri能够执行的任务数量。
(2) 启动之后, Taskmanager会向资源管理器注册它的插槽;收到资源管理器的指令后, Taskmanageri就会将一个或者多个插槽提供给Jobmanageri调用。Jobmanager就可以向插槽分配任务( tasks)来执行了。
(3) 在执行过程中, 一个 Taskmanagera可以跟其它运行同一应用程序的Taskmanager交换数据。
资源管理器(Resource Manager)
(1) 主要负责管理任务管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定义的处理资源单元。
(2) Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARNMesos、K8s,以及 standalone部署。
(3) 当 Jobmanager申请插槽资源时, Resourcemanager会将有空闲插槽的Taskmanager?分配给Jobmanager。如果 Resourcemanagery没有足够的插槽来满足 Jobmanager的请求, 它还可以向资源提供平台发起会话,以提供启动 Taskmanager进程的容器。
分发器(Dispatcher)
(1) 可以跨作业运行,它为应用提交提供了REST接口。
(2)当一个应用被提交执行时,分发器就会启动并将应用移交给Jobmanage
(3) Dispatcher他会启动一个 WebUi,用来方便地 展示和监控作业执行的信息。
任务提交流程

- 提交应用
- 启动并提交应用
- 请求slots
- 任务启动
- 注册slots
- 发出提供slot的指令
- 提供slots
- 提交要在slots中执行的任务
- 交换数据
任务提交流程(YARN)

a. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
b. 随后向 Yarn ResourceManager提交任务ResourceManager分配 Container资源并通知对应的NodeManager启动
c. ApplicationMaster,ApplicationMaster 启动后加载Flink的Jar包和配置构建环境
d. 然后启动JobManager , 之后ApplicationMaster 向ResourceManager 申请资源启动TaskManager
e. ResourceManager 分配 Container 资源后 , 由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
f. NodeManager 加载 Flink 的 Jar 包和配置构建环境并启动 TaskManager
g. TaskManager 启动后向 JobManager 发送心跳包,并等待 JobManager 向其分配任务。
源码分析--集群启动 JobManager 启动分析
JobManager 的内部包含非常重要的三大组件
- WebMonitorEndpoint
- ResourceManager
- Dispatcher
入口,启动主类:StandaloneSessionClusterEntrypoint
- //入口
- StandaloneSessionClusterEntrypoint.main()ClusterEntrypoint.runClusterEntrypoint(entrypoint);
- clusterEntrypoint.startCluster();
- runCluster(configuration,pluginManager);
- //第一步:初始化各种服务
- /**
- *初始化了主节点对外提供服务的时候所需要的三大核心组件启动时所需要的基础服务
- *初始化服务,如JobManager的AkkaRPC服务,HA服务,心跳检查服务,metricservice
- *这些服务都是Master节点要使用到的一些服务
- *1、commonRpcService:基于Akka的RpcService实现。RPC服务启动Akka参与者来接收从RpcGateway调用RPC
- *2、haServices:提供对高可用性所需的所有服务的访问注册,分布式计数器和领导人选举
- *3、blobServer:负责侦听传入的请求生成线程来处理这些请求。它还负责创建要存储的目录结构blob或临时缓存它们
- *4、heartbeatServices:提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送者。
- *5、metricRegistry:跟踪所有已注册的Metric,它作为连接MetricGroup和MetricReporter
- *6、archivedExecutionGraphStore:存储执行图ExecutionGraph的可序列化形式。
- */
- initializeServices(configuration,pluginManager);
- //创建DispatcherResourceManagerComponentFactory,初始化各种组件的
- 工厂实例
- //其实内部包含了三个重要的成员变量:
- //创建ResourceManager的工厂实例
- //创建Dispatcher的工厂实例
- //创建WebMonitorEndpoint的工厂实例
- createDispatcherResourceManagerComponentFactory(configuration);
- //创建集群运行需要的一些组件:Dispatcher,ResourceManager等
- //创建ResourceManager
- //创建Dispatcher
- //创建WebMonitorEndpoint
- clusterComponent=dispatcherResourceManagerComponentFactory.create(...)
1. initializeServices():初始化各种服务
- //初始化和启动AkkaRpcService,内部其实包装了一个ActorSystemcommonRpcService=AkkaRpcServiceUtils.createRemoteRpcService(...)
- //初始化一个负责IO的线程池
- ioExecutor=Executors.newFixedThreadPool(...)
- //初始化HA服务组件,负责HA服务的是:ZooKeeperHaServiceshaServices=createHaServices(configuration,ioExecutor);
- //初始化BlobServer服务端
- blobServer=newBlobServer(configuration,haServices.createBlobStore());blobServer.start();
- //初始化心跳服务组件,heartbeatServices=HeartbeatServicesheartbeatServices=createHeartbeatServices(configuration);
- //初始化一个用来存储ExecutionGraph的Store,实现是:
- FileArchivedExecutionGraphStore
- archivedExecutionGraphStore=createSerializableExecutionGraphStore(...)
2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多组件的工厂实例
- 1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory
- 2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory
- 3、RestEndpointFactory,默认实现:SessionRestEndpointFactory
- clusterComponent=dispatcherResourceManagerComponentFactory
- .create(configuration,ioExecutor,commonRpcService,haServices,
- blobServer,heartbeatServices,metricRegistry,
- archivedExecutionGraphStore,
- newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
- this);
3. 创建 WebMonitorEndpoint
- /*************************************************
- *创建WebMonitorEndpoint实例,在Standalone模式下:DispatcherRestEndpoint
- *1、restEndpointFactory=SessionRestEndpointFactory
- *2、webMonitorEndpoint=DispatcherRestEndpoint
- *3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService()=ZooKeeperLeaderElectionService
- *当前这个DispatcherRestEndpoint的作用是:
- *1、初始化的过程中,会一大堆的Handler
- *2、启动一个Netty的服务端,绑定了这些Handler
- *3、当client通过flink命令执行了某些操作(发起restful请求),服务端由webMonitorEndpoint来执行处理
- *4、举个例子:如果通过flinkrun提交一个Job,那么最后是由webMonitorEndpoint中的JobSubmitHandler来执行处理
- *5、补充一个:job由JobSubmitHandler执行完毕之后,转交给Dispatcher去调度执行
- */
- webMonitorEndpoint=restEndpointFactory.createRestEndpoint(
- configuration,dispatcherGatewayRetriever,resourceManagerGatewayRetriever,
- blobServer,executor,metricFetcher,
- highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
- fatalErrorHandler
- );
4. 创建 resourceManager
- /*************************************************
- *创建StandaloneResourceManager实例对象
- *1、resourceManager=StandaloneResourceManager
- *2、resourceManagerFactory=StandaloneResourceManagerFactory
- */
- resourceManager=resourceManagerFactory.createResourceManager(
- configuration,ResourceID.generate(),
- rpcService,highAvailabilityServices,heartbeatServices,
- fatalErrorHandler,newClusterInformation(hostname,blobServer.getPort()),
- webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname
- );
- protectedResourceManager<ResourceID>createResourceManager(
- Configurationconfiguration,
- ResourceIDresourceId,
- RpcServicerpcService,
- HighAvailabilityServiceshighAvailabilityServices,
- HeartbeatServicesheartbeatServices,
- FatalErrorHandlerfatalErrorHandler,
- ClusterInformationclusterInformation,
- @NullableStringwebInterfaceUrl,
- ResourceManagerMetricGroupresourceManagerMetricGroup,
- ResourceManagerRuntimeServicesresourceManagerRuntimeServices){
- finalTimestandaloneClusterStartupPeriodTime=ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
- /*************************************************
- *注释:得到一个StandaloneResourceManager实例对象
- */
- returnnewStandaloneResourceManager(
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- resourceManagerRuntimeServices.getSlotManager(),
- ResourceManagerPartitionTrackerImpl::new,
- resourceManagerRuntimeServices.getJobLeaderIdService(),
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup,
- standaloneClusterStartupPeriodTime,
- AkkaUtils.getTimeoutAsTime(configuration)
- );
- }
- /**
- requestSlot():接受solt请求
- sendSlotReport(..):将solt请求发送TaskManager
- registerJobManager(...):注册job管理者。该job指的是提交给flink的应用程序
- registerTaskExecutor(...):注册task执行者。
- **/
- publicResourceManager(RpcServicerpcService,ResourceIDresourceId,HighAvailabilityServiceshighAvailabilityServices,
- HeartbeatServicesheartbeatServices,SlotManagerslotManager,ResourceManagerPartitionTrackerFactoryclusterPartitionTrackerFactory,
- JobLeaderIdServicejobLeaderIdService,ClusterInformationclusterInformation,FatalErrorHandlerfatalErrorHandler,
- ResourceManagerMetricGroupresourceManagerMetricGroup,TimerpcTimeout){
- /*************************************************
- *注释:当执行完毕这个构造方法的时候,会触发调用onStart()方法执行
- */
- super(rpcService,AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),null);
- protectedRpcEndpoint(finalRpcServicerpcService,finalStringendpointId){
- this.rpcService=checkNotNull(rpcService,"rpcService");
- this.endpointId=checkNotNull(endpointId,"endpointId");
- /*************************************************
- *注释:ResourceManager或者TaskExecutor中的RpcServer实现
- *以ResourceManager为例说明:
- *启动ResourceManager的RPCServer服务
- *这里启动的是ResourceManager的Rpc服务端。
- *接收TaskManager启动好了而之后,进行注册和心跳,来汇报Taskmanagaer的资源情况
- *通过动态代理的形式构建了一个Server
- */
- this.rpcServer=rpcService.startServer(this);
5. 在创建resourceManager同级:启动任务接收器Starting Dispatcher
- /*************************************************
- *创建并启动Dispatcher
- *1、dispatcherRunner=DispatcherRunnerLeaderElectionLifecycleManager
- *2、dispatcherRunnerFactory=DefaultDispatcherRunnerFactory
- *第一个参数:ZooKeeperLeaderElectionService
- *-
- *老版本:这个地方是直接创建一个Dispatcher对象然后调用dispatcher.start()来启动
- *新版本:直接创建一个DispatcherRunner,内部就是要创建和启动Dispatcher
- *-
- *DispatcherRunner是对Dispatcher的封装。
- *DispatcherRunner被创建的代码的内部,会创建Dispatcher并启动
- */
- log.debug("StartingDispatcher.");
- dispatcherRunner=dispatcherRunnerFactory.createDispatcherRunner(
- highAvailabilityServices.getDispatcherLeaderElectionService(),fatalErrorHandler,
- //TODO_ZYM注释:注意第三个参数
- newHaServicesJobGraphStoreFactory(highAvailabilityServices),
- ioExecutor,rpcService,partialDispatcherServices
- );
Dispatcher 启动后,将会等待任务提交,如果有任务提交,则会经过submitJob(...)函数进入后续处理。
提交(一个Flink应用的提交必须经过三个graph的转换)

首先看下一些名词
StreamGraph
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。可以用一个 DAG 来表示),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。
- StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
- StreamEdge:表示连接两个StreamNode的边。

DataStream 上常见的 transformation 有 map、flatmap、filter等(见DataStream Transformation了解更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph
以map方法为例,看看源码
- public<R>SingleOutputStreamOperator<R>map(MapFunction<T,R>mapper){
- //通过javareflection抽出mapper的返回值类型
- TypeInformation<R>outType=TypeExtractor.getMapReturnTypes(clean(mapper),getType(),
- Utils.getCallLocationName(),true);
- //返回一个新的DataStream,SteramMap为StreamOperator的实现类
- returntransform("Map",outType,newStreamMap<>(clean(mapper)));
- }
- public<R>SingleOutputStreamOperator<R>transform(StringoperatorName,TypeInformation<R>outTypeInfo,OneInputStreamOperator<T,R>operator){
- //readtheoutputtypeoftheinputTransformtocoaxouterrorsaboutMissingTypeInfo
- transformation.getOutputType();
- //新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树
- OneInputTransformation<T,R>resultTransform=newOneInputTransformation<>(
- this.transformation,
- operatorName,
- operator,
- outTypeInfo,
- environment.getParallelism());
- @SuppressWarnings({"unchecked","rawtypes"})
- SingleOutputStreamOperator<R>returnStream=newSingleOutputStreamOperator(environment,resultTransform);
- //所有的transformation都会存到env中,调用execute时遍历该list生成StreamGraph
- getExecutionEnvironment().addOperator(resultTransform);
- returnreturnStream;
- }
map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph
JobGraph
(1) StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。

- 将并不涉及到 shuffle 的算子进行合并。
- 对于同一个 operator chain 里面的多个算子,会在同一个 task 中执行。
- 对于不在同一个 operator chain 里的算子,会在不同的 task 中执行。
(2) JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(JobVertex)、中间结果(IntermediateDataSet)和边(JobEdge)组成的 DAG 图。
(3) JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置。
JobVertex
JobVertex 相当于是 JobGraph 的顶点。经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
IntermediateDataSet
JobVertex的输出,即经过operator处理产生的数据集。
JobEdge
job graph中的一条数据传输通道。source 是IntermediateDataSet,sink 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
(1) 首先是通过API会生成transformations,通过transformations会生成StreamGraph。
(2)将StreamGraph的某些StreamNode Chain在一起生成JobGraph,前两步转换都是在客户端完成。
(3)最后会将JobGraph转换为ExecutionGraph,相比JobGraph会增加并行度的概念,这一步是在Jobmanager里完成。

ExecutionJobVertex
ExecutionJobVertex一一对应JobGraph中的JobVertex
ExecutionVertex
一个ExecutionJobVertex对应n个ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任务的一个子任务
Execution
Execution 是对 ExecutionVertex 的一次执行,通过 ExecutionAttemptId 来唯一标识。
IntermediateResult
在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的对外输出,一个 JobGraph 可能有 n(n >=0) 个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult。每一个 IntermediateResult 就有 numParallelProducers(并行度) 个生产者,每个生产者的在相应的 IntermediateResult 上的输出对应一个 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一个输出分区
ExecutionEdge
ExecutionEdge 表示 ExecutionVertex 的输入,通过 ExecutionEdge 将 ExecutionVertex 和 IntermediateResultPartition 连接起来,进而在不同的 ExecutionVertex 之间建立联系。
ExecutionGraph的构建
- 构建JobInformation
- 构建ExecutionGraph
- 将JobGraph进行拓扑排序,获取sortedTopology顶点集合
- //ExecutionGraphBuilder
- publicstaticExecutionGraphbuildGraph(
- @NullableExecutionGraphprior,
- JobGraphjobGraph,
- ...)throwsJobExecutionException,JobException{
- //构建JobInformation
- //构建ExecutionGraph
- //将JobGraph进行拓扑排序,获取sortedTopology顶点集合
- List<JobVertex>sortedTopology=jobGraph.getVerticesSortedTopologicallyFromSources();
- executionGraph.attachJobGraph(sortedTopology);
- returnexecutionGraph;
- }
构建ExecutionJobVertex,连接IntermediateResultPartition和ExecutionVertex
- //ExecutionGraph
- publicvoidattachJobGraph(List<JobVertex>topologiallySorted)throwsJobException{
- for(JobVertexjobVertex:topologiallySorted){
- //构建ExecutionJobVertex
- ExecutionJobVertexejv=newExecutionJobVertex(
- this,
- jobVertex,
- 1,
- maxPriorAttemptsHistoryLength,
- rpcTimeout,
- globalModVersion,
- createTimestamp);
- //连接IntermediateResultPartition和ExecutionVertex
- ev.connectToPredecessors(this.intermediateResults);
- }
- //ExecutionJobVertex
- publicvoidconnectToPredecessors(Map<IntermediateDataSetID,IntermediateResult>intermediateDataSets)throwsJobException{
- List<JobEdge>inputs=jobVertex.getInputs();
- for(intnum=0;num<inputs.size();num++){
- JobEdgeedge=inputs.get(num);
- IntermediateResultires=intermediateDataSets.get(edge.getSourceId());
- this.inputs.add(ires);
- intconsumerIndex=ires.registerConsumer();
- for(inti=0;i<parallelism;i++){
- ExecutionVertexev=taskVertices[i];
- ev.connectSource(num,ires,edge,consumerIndex);
- }
- }
- }
拆分计划(可执行能力)
- //ExecutionVertex
- publicvoidconnectSource(intinputNumber,IntermediateResultsource,JobEdgeedge,intconsumerNumber){
- finalDistributionPatternpattern=edge.getDistributionPattern();
- finalIntermediateResultPartition[]sourcePartitions=source.getPartitions();
- ExecutionEdge[]edges;
- switch(pattern){
- //下游JobVertex的输入partition算法,如果是forward或rescale的话为POINTWISE
- casePOINTWISE:
- edges=connectPointwise(sourcePartitions,inputNumber);
- break;
- //每一个并行的ExecutionVertex节点都会链接到源节点产生的所有中间结果IntermediateResultPartition
- caseALL_TO_ALL:
- edges=connectAllToAll(sourcePartitions,inputNumber);
- break;
- default:
- thrownewRuntimeException("Unrecognizeddistributionpattern.");
- }
- inputEdges[inputNumber]=edges;
- for(ExecutionEdgeee:edges){
- ee.getSource().addConsumer(ee,consumerNumber);
- }
- }
- privateExecutionEdge[]connectPointwise(IntermediateResultPartition[]sourcePartitions,intinputNumber){
- finalintnumSources=sourcePartitions.length;
- finalintparallelism=getTotalNumberOfParallelSubtasks();
- //如果并发数等于partition数,则一对一进行连接
- if(numSources==parallelism){
- returnnewExecutionEdge[]{newExecutionEdge(sourcePartitions[subTaskIndex],this,inputNumber)};
- }
- //如果并发数大于partition数,则一对多进行连接
- elseif(numSources<parallelism){
- intsourcePartition;
- if(parallelism%numSources==0){
- intfactor=parallelism/numSources;
- sourcePartition=subTaskIndex/factor;
- }
- else{
- floatfactor=((float)parallelism)/numSources;
- sourcePartition=(int)(subTaskIndex/factor);
- }
- returnnewExecutionEdge[]{newExecutionEdge(sourcePartitions[sourcePartition],this,inputNumber)};
- }
- //果并发数小于partition数,则多对一进行连接
- else{
- if(numSources%parallelism==0){
- intfactor=numSources/parallelism;
- intstartIndex=subTaskIndex*factor;
- ExecutionEdge[]edges=newExecutionEdge[factor];
- for(inti=0;i<factor;i++){
- edges[i]=newExecutionEdge(sourcePartitions[startIndex+i],this,inputNumber);
- }
- returnedges;
- }
- else{
- floatfactor=((float)numSources)/parallelism;
- intstart=(int)(subTaskIndex*factor);
- intend=(subTaskIndex==getTotalNumberOfParallelSubtasks()-1)?
- sourcePartitions.length:
- (int)((subTaskIndex+1)*factor);
- ExecutionEdge[]edges=newExecutionEdge[end-start];
- for(inti=0;i<edges.length;i++){
- edges[i]=newExecutionEdge(sourcePartitions[start+i],this,inputNumber);
- }
- returnedges;
- }
- }
- }
- privateExecutionEdge[]connectAllToAll(IntermediateResultPartition[]sourcePartitions,intinputNumber){
- ExecutionEdge[]edges=newExecutionEdge[sourcePartitions.length];
- for(inti=0;i<sourcePartitions.length;i++){
- IntermediateResultPartitionirp=sourcePartitions[i];
- edges[i]=newExecutionEdge(irp,this,inputNumber);
- }
- returnedges;
- }

返回ExecutionGraph
TaskManager
TaskManager启动
- publicstaticvoidrunTaskManager(Configurationconfiguration,ResourceIDresourceId)throwsException{
- //主要初始化一堆的service,并新建一个org.apache.flink.runtime.taskexecutor.TaskExecutor
- finalTaskManagerRunnertaskManagerRunner=newTaskManagerRunner(configuration,resourceId);
- //调用TaskExecutor的start()方法
- taskManagerRunner.start();
- }
TaskExecutor :submitTask()
接着的重要函数是shumitTask()函数,该函数会通过AKKA机制,向TaskManager发出一个submitTask的消息请求,TaskManager收到消息请求后,会执行submitTask()方法。(省略了部分代码)。
- publicCompletableFuture<Acknowledge>submitTask(
- TaskDeploymentDescriptortdd,
- JobMasterIdjobMasterId,
- Timetimeout){
- jobInformation=tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
- taskInformation=tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
- TaskMetricGrouptaskMetricGroup=taskManagerMetricGroup.addTaskForJob(xxx);
- InputSplitProviderinputSplitProvider=newRpcInputSplitProvider(xxx);
- TaskManagerActionstaskManagerActions=jobManagerConnection.getTaskManagerActions();
- CheckpointRespondercheckpointResponder=jobManagerConnection.getCheckpointResponder();
- LibraryCacheManagerlibraryCache=jobManagerConnection.getLibraryCacheManager();
- ResultPartitionConsumableNotifierresultPartitionConsumableNotifier=jobManagerConnection.getResultPartitionConsumableNotifier();
- PartitionProducerStateCheckerpartitionStateChecker=jobManagerConnection.getPartitionStateChecker();
- finalTaskLocalStateStorelocalStateStore=localStateStoresManager.localStateStoreForSubtask(
- jobId,
- tdd.getAllocationId(),
- taskInformation.getJobVertexId(),
- tdd.getSubtaskIndex());
- finalJobManagerTaskRestoretaskRestore=tdd.getTaskRestore();
- finalTaskStateManagertaskStateManager=newTaskStateManagerImpl(
- jobId,
- tdd.getExecutionAttemptId(),
- localStateStore,
- taskRestore,
- checkpointResponder);
- //新建一个Task
- Tasktask=newTask(xxxx);
- log.info("Receivedtask{}.",task.getTaskInfo().getTaskNameWithSubtasks());
- booleantaskAdded;
- try{
- taskAdded=taskSlotTable.addTask(task);
- }catch(SlotNotFoundException|SlotNotActiveExceptione){
- thrownewTaskSubmissionException("Couldnotsubmittask.",e);
- }
- if(taskAdded){
- //启动任务
- task.startTaskThread();
- returnCompletableFuture.completedFuture(Acknowledge.get());
- }
最后创建执行Task的线程,然后调用startTaskThread()来启动具体的执行线程,Task线程内部的run()方法承载了被执行的核心逻辑。
Task是执行在TaskExecutor进程里的一个线程,下面来看看其run方法
(1) 检测当前状态,正常情况为CREATED,如果是FAILED或CANCELING直接返回,其余状态将抛异常。
(2) 读取DistributedCache文件。
(3) 启动ResultPartitionWriter和InputGate。
(4) 向taskEventDispatcher注册partitionWriter。
(5) 根据nameOfInvokableClass加载对应的类并实例化。
(6) 将状态置为RUNNING并执行invoke方法。
- publicvoidrun(){
- while(true){
- ExecutionStatecurrent=this.executionState;
- invokable=loadAndInstantiateInvokable(userCodeClassLoader,nameOfInvokableClass);
- network.registerTask(this);
- Environmentenv=newRuntimeEnvironment(....);
- invokable.setEnvironment(env);
- //actualtaskcorework
- if(!transitionState(ExecutionState.DEPLOYING,ExecutionState.RUNNING)){
- }
- //notifyeveryonethatweswitchedtorunning
- notifyObservers(ExecutionState.RUNNING,null);
- executingThread.setContextClassLoader(userCodeClassLoader);
- //runtheinvokable
- invokable.invoke();
- if(transitionState(ExecutionState.RUNNING,ExecutionState.FINISHED)){
- notifyObservers(ExecutionState.FINISHED,null);
- }
- Finally{
- //freethenetworkresources
- network.unregisterTask(this);
- //freememoryresources
- if(invokable!=null){
- memoryManager.releaseAll(invokable);
- }
- libraryCache.unregisterTask(jobId,executionId);
- removeCachedFiles(distributedCacheEntries,fileCache);
总结
整体的流程与架构可能三两张图或者三言两语就可以勾勒出画面,但是背后源码的实现是艰辛的。源码的复杂度和当初设计框架的抓狂感,我们只有想象。现在我们只是站在巨人的肩膀上去学习。
本篇的主题是"Flink架构与执行流程",做下小结,Flink on Yarn的提交执行流程:
1 Flink任务提交后,Client向HDFS上传Flink的Jar包和配置。
2 向Yarn ResourceManager提交任务。
3 ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster。
4 ApplicationMaster启动后加载Flink的Jar包和配置构建环境。
5 启动JobManager之后ApplicationMaster向ResourceManager申请资源启动TaskManager。
6 ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager。
7 NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager。
8 TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
原文链接:https://mp.weixin.qq.com/s/iA_nMURBPMeqkSJ-kQUEjw








发表评论
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。