Flink协调器Coordinator及自定义Operator
最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。
协调器Coordinator
Flink中的协调器是用来协调运行时的算子,运行在JobManager中,通过事件的方式与算子通信。例如Source和Sink算子中的协调器是用来发现和分配工作或者聚合和提交元数据。
线程模型
所有协调器方法都由作业管理器的主线程(邮箱线程)调用。这意味着这些方法在任何情况下都不得执行阻塞操作(如 I/ O 或等待锁或或Futures)。这很有可能使整个 JobManager 瘫痪。
因此,涉及更复杂操作的协调器应生成线程来处理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地从另一个线程调用,而不是从调用协调器方法的线程调用。
一致性
与调度程序的视图相比,协调器对任务执行的视图高度简化,但允许与在并行子任务上运行的操作员进行一致的交互。具体而言,保证严格按顺序调用以下方法:
- executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任务就绪的时候调用一次。SubtaskGateway是用来与子任务交互的网关。这是与子任务尝试交互的开始。
executionAttemptFailed(int, int, Throwable):在尝试失败或取消后立即调用每个子任务。此时,应停止与子任务尝试的交互。 - subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦调度程序确定了要还原的检查点,这些方法就会通知协调器。前一种方法在发生区域故障/ 恢复(可能影响子任务的子集)时调用,后一种方法在全局故障/ 恢复的情况下调用。此方法应用于确定要恢复的操作,因为它会告诉要回退到哪个检查点。协调器实现需要恢复自还原的检查点以来与相关任务的交互。只有在子任务的所有尝试被调用后 executionAttemptFailed(int, int, Throwable) ,才会调用它。
- executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢复的任务(新尝试)准备就绪后再次调用。这晚于 subtaskReset(int, long),因为在这些方法之间,会计划和部署新的尝试。
接口方法说明
实现自定义的协调器需要实现OperatorCoordinator接口方法,各方法说明如下所示:
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {// ------------------------------------------------------------------------/*** 启动协调器,启动时调用一次当前方法在所有方法之前* 此方法抛出的异常都会导致当前作业失败*/void start() throws Exception;/*** 释放协调器时调用当前方法,此方法应当释放持有的资源* 此方法抛出的异常不会导致作业失败*/@Overridevoid close() throws Exception;// ------------------------------------------------------------------------/*** 处理来自并行算子实例的事件* 此方法抛出的异常会导致作业失败并恢复*/void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)throws Exception;// ------------------------------------------------------------------------/*** 为协调器做checkpoint,将当前协调器中的状态序列化到checkpoint中,执行成功需要调用CompletableFuture的complete方法,失败需要调用CompletableFuture的completeExceptionally方法*/void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)throws Exception;/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the* method.*/@Overridevoid notifyCheckpointComplete(long checkpointId);/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the* method.*/@Overridedefault void notifyCheckpointAborted(long checkpointId) {}/*** 从checkpoint重置当前的协调器*/void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;// ------------------------------------------------------------------------/*** 子任务重置时调用此方法*/void subtaskReset(int subtask, long checkpointId);/*** 子任务失败时调用此方法 */void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);/*** 子任务就绪时调用此方法*/void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}
算子Operator
Flink中执行计算任务的算子,像使用DataStream API时调用的map、flatmap、process传入的自定义函数最终都会封装为一个一个的算子。使用UDF已经能够满足大多数的开发场景,但涉及到与协调器打交道时需要自定义算子,自定义算子相对比较好简单,具体可以参考org.apache.flink.streaming.api.operators.KeyedProcessOperator的实现。
自定义算子需要实现AbstractStreamOperator和OneInputStreamOperator接口方法
实现定时器功能,需要实现Triggerable接口方法
实现处理协调器的事件功能,需要实现OperatorEventHandler接口方法
示例
自定义算子
这里实现一个自定义的算子,用来处理KeyedStream的数据,它能够接受来自协调器的事件,并且能够给协调器发送事件。
MyKeyedProcessOperator实现代码如下:
package com.examples.operator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 自定义的KeyedProcessOperator* @author shirukai*/
public class MyKeyedProcessOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>implements OneInputStreamOperator<IN, OUT>,Triggerable<KEY, VoidNamespace>,OperatorEventHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);private transient TimestampedCollector<OUT> collector;private transient TimerService timerService;private final OperatorEventGateway operatorEventGateway;public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {this.processingTimeService = processingTimeService;this.operatorEventGateway = operatorEventGateway;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);timerService = new SimpleTimerService(internalTimerService);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {LOG.info("processElement: {}", element);collector.setTimestamp(element);// 注册事件时间定时器timerService.registerEventTimeTimer(element.getTimestamp() + 10);// 注册处理时间定时器timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);// 给协调器发送消息operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));// 不做任何处理直接发送到下游collector.collect((OUT) element.getValue());}@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onEventTime: {}", timer);}@Overridepublic void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onProcessingTime: {}", timer);}@Overridepublic void handleOperatorEvent(OperatorEvent evt) {LOG.info("handleOperatorEvent: {}", evt);}
}
算子工厂类MyKeyedProcessOperatorFactory:
package com.examples.operator;import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;/*** 自定义算子工厂类* @author shirukai*/
public class MyKeyedProcessOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN>implements OneInputStreamOperatorFactory<IN, IN>,CoordinatedOperatorFactory<IN>,ProcessingTimeServiceAware {@Overridepublic OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);}@Overridepublic <T extends StreamOperator<IN>> T createStreamOperator(StreamOperatorParameters<IN> parameters) {final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();final OperatorEventGateway gateway =parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);try {final MyKeyedProcessOperator<?, IN, IN> operator = new MyKeyedProcessOperator<>(processingTimeService, gateway);operator.setup(parameters.getContainingTask(),parameters.getStreamConfig(),parameters.getOutput());parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, operator);return (T) operator;} catch (Exception e) {throw new IllegalStateException("Cannot create operator for "+ parameters.getStreamConfig().getOperatorName(),e);}}@Overridepublic Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {return MyKeyedProcessOperator.class;}
}
自定义协调器
协调器执行器线程工厂类CoordinatorExecutorThreadFactory,当前类可以通用,用来创建协调器线程。
package com.examples.coordinator;import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FatalExitExceptionHandler;import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;/*** A thread factory class that provides some helper methods.*/
public class CoordinatorExecutorThreadFactoryimplements ThreadFactory, Thread.UncaughtExceptionHandler {private final String coordinatorThreadName;private final ClassLoader classLoader;private final Thread.UncaughtExceptionHandler errorHandler;@Nullableprivate Thread thread;// TODO discuss if we should fail the job(JM may restart the job later) or directly kill JM// process// Currently we choose to directly kill JM processCoordinatorExecutorThreadFactory(final String coordinatorThreadName, final ClassLoader contextClassLoader) {this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);}@VisibleForTestingCoordinatorExecutorThreadFactory(final String coordinatorThreadName,final ClassLoader contextClassLoader,final Thread.UncaughtExceptionHandler errorHandler) {this.coordinatorThreadName = coordinatorThreadName;this.classLoader = contextClassLoader;this.errorHandler = errorHandler;}@Overridepublic synchronized Thread newThread(Runnable r) {thread = new Thread(r, coordinatorThreadName);thread.setContextClassLoader(classLoader);thread.setUncaughtExceptionHandler(this);return thread;}@Overridepublic synchronized void uncaughtException(Thread t, Throwable e) {errorHandler.uncaughtException(t, e);}public String getCoordinatorThreadName() {return coordinatorThreadName;}boolean isCurrentThreadCoordinatorThread() {return Thread.currentThread() == thread;}
}
协调器上下文CoordinatorContext,当前类可以通用。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.examples.coordinator;import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;/*** A context class for the {@link OperatorCoordinator}.** <p>The context serves a few purposes:** <ul>* <li>Thread model enforcement - The context ensures that all the manipulations to the* coordinator state are handled by the same thread.* </ul>*/
@Internal
public class CoordinatorContext implements AutoCloseable {private static final Logger LOG =LoggerFactory.getLogger(CoordinatorContext.class);private final ScheduledExecutorService coordinatorExecutor;private final ScheduledExecutorService workerExecutor;private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;private final OperatorCoordinator.Context operatorCoordinatorContext;private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;public CoordinatorContext(CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this(Executors.newScheduledThreadPool(1, coordinatorThreadFactory),Executors.newScheduledThreadPool(1,new ExecutorThreadFactory(coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),coordinatorThreadFactory,operatorCoordinatorContext);}public CoordinatorContext(ScheduledExecutorService coordinatorExecutor,ScheduledExecutorService workerExecutor,CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this.coordinatorExecutor = coordinatorExecutor;this.workerExecutor = workerExecutor;this.coordinatorThreadFactory = coordinatorThreadFactory;this.operatorCoordinatorContext = operatorCoordinatorContext;this.subtaskGateways = new HashMap<>(operatorCoordinatorContext.currentParallelism());}@Overridepublic void close() throws InterruptedException {// Close quietly so the closing sequence will be executed completely.ComponentClosingUtils.shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));ComponentClosingUtils.shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));}public void runInCoordinatorThread(Runnable runnable) {coordinatorExecutor.execute(runnable);}// --------- Package private methods for the DynamicCepOperatorCoordinator ------------ClassLoader getUserCodeClassloader() {return this.operatorCoordinatorContext.getUserCodeClassloader();}void subtaskReady(OperatorCoordinator.SubtaskGateway gateway) {final int subtask = gateway.getSubtask();if (subtaskGateways.get(subtask) == null) {subtaskGateways.put(subtask, gateway);} else {throw new IllegalStateException("Already have a subtask gateway for " + subtask);}}void subtaskNotReady(int subtaskIndex) {subtaskGateways.put(subtaskIndex, null);}Set<Integer> getSubtasks() {return subtaskGateways.keySet();}public void sendEventToOperator(int subtaskId, OperatorEvent event) {callInCoordinatorThread(() -> {final OperatorCoordinator.SubtaskGateway gateway =subtaskGateways.get(subtaskId);if (gateway == null) {LOG.warn(String.format("Subtask %d is not ready yet to receive events.",subtaskId));} else {gateway.sendEvent(event);}return null;},String.format("Failed to send event %s to subtask %d", event, subtaskId));}/*** Fail the job with the given cause.** @param cause the cause of the job failure.*/void failJob(Throwable cause) {operatorCoordinatorContext.failJob(cause);}// ---------------- private helper methods -----------------/*** A helper method that delegates the callable to the coordinator thread if the current thread* is not the coordinator thread, otherwise call the callable right away.** @param callable the callable to delegate.*/private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {// Ensure the split assignment is done by the coordinator executor.if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()&& !coordinatorExecutor.isShutdown()) {try {final Callable<V> guardedCallable =() -> {try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Coordinator Executor",t);ExceptionUtils.rethrowException(t);return null;}};return coordinatorExecutor.submit(guardedCallable).get();} catch (InterruptedException | ExecutionException e) {throw new FlinkRuntimeException(errorMessage, e);}}try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Source Coordinator Executor", t);throw new FlinkRuntimeException(errorMessage, t);}}
}
自定义协调器
package com.examples.coordinator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;/*** 自定义协调器* 需要实现 OperatorCoordinator 接口* @author shirukai*/
public class MyKeyedProcessCoordinator implements OperatorCoordinator {private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessCoordinator.class);/*** The name of the operator this RuleDistributorCoordinator is associated with.*/private final String operatorName;private final CoordinatorContext context;private boolean started;public MyKeyedProcessCoordinator(String operatorName, CoordinatorContext context) {this.operatorName = operatorName;this.context = context;}@Overridepublic void start() throws Exception {LOG.info("Starting Coordinator for {}: {}.",this.getClass().getSimpleName(),operatorName);// we mark this as started first, so that we can later distinguish the cases where 'start()'// wasn't called and where 'start()' failed.started = true;runInEventLoop(() -> {LOG.info("Coordinator started.");},"do something for coordinator.");}@Overridepublic void close() throws Exception {}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {LOG.info("Recovering subtask {} to checkpoint {} for operator {} to checkpoint.",subtask,checkpointId,operatorName);runInEventLoop(() -> {},"making event gateway to subtask %d available",subtask);}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {runInEventLoop(() -> {LOG.info("Removing itself after failure for subtask {} of operator {}.",subtask,operatorName);context.subtaskNotReady(subtask);},"handling subtask %d failure",subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {assert subtask == gateway.getSubtask();LOG.debug("Subtask {} of operator {} is ready.", subtask, operatorName);runInEventLoop(() -> {context.subtaskReady(gateway);sendEventToOperator(new MyEvent("hello,I'm from coordinator"));},"making event gateway to subtask %d available",subtask);}private void sendEventToOperator(OperatorEvent event) {for (Integer subtask : context.getSubtasks()) {try {context.sendEventToOperator(subtask, event);} catch (Exception e) {LOG.error("Failed to send OperatorEvent to operator {}",operatorName,e);context.failJob(e);return;}}}private void runInEventLoop(final ThrowingRunnable<Throwable> action,final String actionName,final Object... actionNameFormatParameters) {ensureStarted();context.runInCoordinatorThread(() -> {try {action.run();} catch (Throwable t) {// If we have a JVM critical error, promote it immediately, there is a good// chance the logging or job failing will not succeed any moreExceptionUtils.rethrowIfFatalErrorOrOOM(t);final String actionString =String.format(actionName, actionNameFormatParameters);LOG.error("Uncaught exception in the coordinator for {} while {}. Triggering job failover.",operatorName,actionString,t);context.failJob(t);}});}private void ensureStarted() {if (!started) {throw new IllegalStateException("The coordinator has not started yet.");}}
}
自定义协调器提供器
package com.examples.coordinator;import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;/*** 自定义协调器的提供者** @author shirukai*/
public class MyKeyedProcessCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {private static final long serialVersionUID = 1L;private final String operatorName;public MyKeyedProcessCoordinatorProvider(String operatorName, OperatorID operatorID) {super(operatorID);this.operatorName = operatorName;}@Overrideprotected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {final String coordinatorThreadName = " MyKeyedProcessCoordinator-" + operatorName;CoordinatorExecutorThreadFactory coordinatorThreadFactory =new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader());CoordinatorContext coordinatorContext =new CoordinatorContext(coordinatorThreadFactory, context);return new MyKeyedProcessCoordinator(operatorName, coordinatorContext);}
}
执行测试
package com.examples;import com.examples.operator.MyKeyedProcessOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author shirukai*/
public class CoordinatorExamples {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<MyData> source = env.fromElements(new MyData(1, 1.0), new MyData(2, 2.0), new MyData(1, 3.0));MyKeyedProcessOperatorFactory<MyData> operatorFactory = new MyKeyedProcessOperatorFactory<>();source.keyBy((KeySelector<MyData, Integer>) MyData::getId).transform("MyKeyedProcess", TypeInformation.of(MyData.class), operatorFactory).print();env.execute();}public static class MyData {private Integer id;private Double value;public MyData(Integer id, Double value) {this.id = id;this.value = value;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Double getValue() {return value;}public void setValue(Double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"id=" + id +", value=" + value +'}';}}
}