Yarn的状态机框架分析
什么是状态机
状态机(State Machine),是有限状态自动机的简称。简单解释:给定一个状态机,同时给定它的当前状态和输入,那么输出状态时可以明确的运算出来的。
yarn中的状态机
YARN将各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
在Yarn中,App、AppAttempt、Container、Node都可以使用状态机表示。其中,
RMApp:用于维护一个Application的生命周期;
RMAppAttempt:用于维护一次尝试运行的生命周期;
RMContainer:用于维护一个已分配的资源最小单位Container的生命周期;
RMNode:用于维护一个NodeManager的生命周期。
关于yarn的处理过程
状态机是与事件模型结合使用的,在Service中注册转调度处理器,然后在处理器使用状态机。
整个处理过程大致为: 处理请求会作为事件进入系统, 由异步调度器(AsyncDispatcher) 负责传递给相应事件调度器(Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成(达到终止条件) 。
使用状态机的目的
Yarn状态机就负责合理地组织这些状态转换流程,快速找到指定初始状态和事件类型对应的状态转换方法。
状态机举例
例如,对于一个应用RMApp而言,RMApp存在一个初始状态,处理事件时,会根据事件类型匹配对应的转换类Transition,将RMApp从初始状态转化成目标状态。RMApp经历的流程为:初始状态–>转换方法–>目标状态,将其所有流程汇总起来,就是状态机。
各个使用状态机的事件处理类依赖状态工厂类StateMachineFactory
,完成了状态机的初始化。将状态机的处理流程,通过链表结构TransitionsListNode
进行组成。
在RMActiveServices
的serviceInit
方法中注册RMAppEventType
事件的调度处理器ApplicationEventDispatcher
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,new ApplicationEventDispatcher(rmContext));
ApplicationEventDispatcher
中去让RMApp
的实现类RMAppImpl
处理事件。
@Override
public void handle(RMAppEvent event) {ApplicationId appID = event.getApplicationId();RMApp rmApp = this.rmContext.getRMApps().get(appID);if (rmApp != null) {try {rmApp.handle(event);} catch (Throwable t) {LOG.error("Error in handling event type " + event.getType()+ " for application " + appID, t);}}
}
RMAppImpl
的handle
方法中通过调用状态的的转换方法doTransition
来改变RMApp对象状态。
public void handle(RMAppEvent event) {this.writeLock.lock();try {ApplicationId appID = event.getApplicationId();LOG.debug("Processing event for " + appID + " of type "+ event.getType());final RMAppState oldState = getState();try {/* keep the master in sync with the state machine */this.stateMachine.doTransition(event.getType(), event);} catch (InvalidStateTransitionException e) {LOG.error("App: " + appID+ " can't handle this event at current state", e);onInvalidStateTransition(event.getType(), oldState);}// Log at INFO if we're not recovering or not in a terminal state.// Log at DEBUG otherwise.if ((oldState != getState()) &&(((recoveredFinalState == null)) ||(event.getType() != RMAppEventType.RECOVER))) {LOG.info(String.format(STATE_CHANGE_MESSAGE, appID, oldState,getState(), event.getType()));} else if ((oldState != getState()) && LOG.isDebugEnabled()) {LOG.debug(String.format(STATE_CHANGE_MESSAGE, appID, oldState,getState(), event.getType()));}} finally {this.writeLock.unlock();}}
简化实验
简化类图
事件类型
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public enum MyEventType {START,Change1,Change2
}
任务状态
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public enum MyState {NEW,State_1,State_2,FINISHED
}
状态机框架相关
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public interface SingleArcTransition<OPERAND, EVENT> {public void transition(OPERAND operand, EVENT event);
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public interface MultipleArcTransition<OPERAND, EVENT, STATE extends Enum<STATE>> {public STATE transition(OPERAND operand, EVENT event);
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public interface ApplicableTransition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
}
package com.donny.state;import java.util.HashMap;
import java.util.Map;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {final STATE preState;final EVENTTYPE eventType;final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType,Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {this.preState = preState;this.eventType = eventType;this.transition = transition;}@Overridepublic void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap= subject.getStateMachineTable().get(preState);if (transitionMap == null) {transitionMap = new HashMap<EVENTTYPE,Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();subject.getStateMachineTable().put(preState, transitionMap);}transitionMap.put(eventType, transition);}
}
package com.donny.state;import java.util.Set;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class MultipleInternalArc<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {private Set<STATE> validPostStates;private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hookMultipleInternalArc(Set<STATE> postStates,MultipleArcTransition<OPERAND, EVENT, STATE> hook) {this.validPostStates = postStates;this.hook = hook;}@Overridepublic STATE doTransition(OPERAND operand, STATE oldState,EVENT event, EVENTTYPE eventType)throws RuntimeException {STATE postState = hook.transition(operand, event);if (!validPostStates.contains(postState)) {throw new RuntimeException("oldState:" + oldState + " ,eventType:" + eventType);}return postState;}
}
public class MyTransition implements SingleArcTransition {@Overridepublic void transition(Object o, Object o2) {System.out.println("do transition");}
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class SingleInternalArc<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {private STATE postState;private SingleArcTransition<OPERAND, EVENT> hook; // transition hookSingleInternalArc(STATE postState,SingleArcTransition<OPERAND, EVENT> hook) {this.postState = postState;this.hook = hook;}@Overridepublic STATE doTransition(OPERAND operand, STATE oldState,EVENT event, EVENTTYPE eventType) {if (hook != null) {hook.transition(operand, event);}return postState;}
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public interface Transition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType);
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public interface StateMachine<STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {public STATE getCurrentState();public STATE doTransition(EVENTTYPE eventType, EVENT event)throws RuntimeException;
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class TransitionsListNode<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;final TransitionsListNode next;TransitionsListNode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,TransitionsListNode next) {this.transition = transition;this.next = next;}
}
package com.donny.state;import java.util.*;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {private final TransitionsListNode transitionsListNode;private Map<STATE, Map<EVENTTYPE,Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;private STATE defaultInitialState;private final boolean optimized;public StateMachineFactory(STATE defaultInitialState) {this.transitionsListNode = null;this.defaultInitialState = defaultInitialState;this.optimized = false;this.stateMachineTable = null;}private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {this.defaultInitialState = that.defaultInitialState;this.transitionsListNode= new TransitionsListNode(t, that.transitionsListNode);this.optimized = false;this.stateMachineTable = null;}public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>installTopology() {return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);}private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,boolean optimized) {this.defaultInitialState = that.defaultInitialState;this.transitionsListNode = that.transitionsListNode;this.optimized = optimized;if (optimized) {makeStateMachineTable();} else {stateMachineTable = null;}}private void makeStateMachineTable() {Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();prototype.put(defaultInitialState, null);stateMachineTable= new EnumMap<STATE, Map<EVENTTYPE,Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);for (TransitionsListNode cursor = transitionsListNode;cursor != null;cursor = cursor.next) {stack.push(cursor.transition);}while (!stack.isEmpty()) {stack.pop().apply(this);}}public Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> getStateMachineTable() {return stateMachineTable;}private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)throws RuntimeException {Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap= stateMachineTable.get(oldState);if (transitionMap != null) {Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition= transitionMap.get(eventType);if (transition != null) {return transition.doTransition(operand, oldState, event, eventType);}}throw new RuntimeException("RuntimeException: " + oldState + eventType);}public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>addTransition(STATE preState, STATE postState,EVENTTYPE eventType,SingleArcTransition<OPERAND, EVENT> hook) {return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new SingleInternalArc(postState, hook)));}public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>addTransition(STATE preState, Set<STATE> postStates,EVENTTYPE eventType,MultipleArcTransition<OPERAND, EVENT, STATE> hook) {return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this,new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new MultipleInternalArc(postStates, hook)));}private synchronized void maybeMakeStateMachineTable() {if (stateMachineTable == null) {makeStateMachineTable();}}private class InternalStateMachineimplements StateMachine<STATE, EVENTTYPE, EVENT> {private final OPERAND operand;private STATE currentState;InternalStateMachine(OPERAND operand, STATE initialState) {this.operand = operand;this.currentState = initialState;if (!optimized) {maybeMakeStateMachineTable();}}@Overridepublic synchronized STATE getCurrentState() {return currentState;}@Overridepublic synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)throws RuntimeException {STATE oldState = currentState;currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event);return currentState;}}public StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand) {return new InternalStateMachine(operand, defaultInitialState);}
}
package com.donny.state;/*** @author 1792998761@qq.com* @description* @date 2023/9/27*/
public class Test {private final StateMachine stateMachine;private static final StateMachineFactory stateMachineFactory= new StateMachineFactory(MyState.NEW).addTransition(MyState.NEW, MyState.State_1, MyEventType.START, new MyTransition()).addTransition(MyState.State_1, MyState.State_2, MyEventType.Change1, new MyTransition()).addTransition(MyState.State_2, MyState.FINISHED, MyEventType.Change2, new MyTransition()).installTopology();public Test() {this.stateMachine = stateMachineFactory.make(this);}public static void main(String[] args) {Test t = new Test();System.out.println(t.stateMachine.getCurrentState());Object event = new Object();t.stateMachine.doTransition(MyEventType.START, event);System.out.println(t.stateMachine.getCurrentState());t.stateMachine.doTransition(MyEventType.Change1, event);System.out.println(t.stateMachine.getCurrentState());t.stateMachine.doTransition(MyEventType.Change2, event);System.out.println(t.stateMachine.getCurrentState());}
}
实验结果
NEW
do transition
State_1
do transition
State_2
do transition
FINISHED