前言
以采集数据处理逻辑为例,数据采集分为不同种类如:MQTT、MODBUS、HTTP等,不同的采集数据有不同的解析处理逻辑。但
总体解析处理步骤是固定的
。可以使用工厂方法设计模式简化代码,让代码变得更加优雅。
代码实战
抽象类
总体步骤一致,先声明一个抽象类包含所有处理步骤,具体处理步骤由不同子类自行实现。【大体处理框架
】
public abstract class AbstractCollectService {protected abstract Boolean handleAlarm(CollectDataMessage message);protected abstract Boolean handleCollect(CollectDataMessage message);public CollectDataMessage parseKafkaMessage(String kafkaMessage, CollectTypeEnum collectTypeEnum){// 工厂方法return CollectFactory.getInstance(collectTypeEnum).parseKafkaMessage2DataMessage(kafkaMessage);}protected abstract CollectDataMessage parseKafkaMessage2DataMessage(String kafkaMessage);public Boolean doHandle(CollectDataMessage message) {Boolean ret;switch (message.getHandleTypeEnum()){case ALARM:ret = handleAlarm(message) ;break ;case COLLECT:ret = handleCollect(message) ;break ;default:ret = false ;}return ret ;}}
枚举类
采集数据枚举类与子实现类一一对应
:
public enum CollectTypeEnum {MQTT,MODBUS,HTTP;
}
子实现类
不同采集数据的子类处理逻辑,各自实现抽象类中抽象方法(核心逻辑)。
工厂方法
定义工厂方法,使用枚举做判断条件,真正处理不同逻辑时,需要显示地传出对应枚举参数
以便得到对应实现类对象。
public class CollectFactory {public static AbstractCollectService getInstance(CollectTypeEnum collectTypeEnum) {switch (collectTypeEnum) {case MQTT:return MqttCollectService.getInstance();case MODBUS:return ModbusCollectService.getInstance();case HTTP:return HttpCollectService.getInstance();default:throw new IllegalArgumentException("Unknown collect type");}}
}
具体子类对象,都是采用【基于类初始化】获取的单例对象
。随便一个为例,其他子类同理。
简单的工厂方法设计模式就这样实现了~
最终使用
显示指定枚举参数
处理函数:
public class KafkaMsg2CollectMsgRichMapFunction extends RichMapFunction<String, CollectDataMessage> {private static final Logger log = LoggerFactory.getLogger(KafkaMsg2CollectMsgRichMapFunction.class) ;private final CollectTypeEnum collectTypeEnum;public KafkaMsg2CollectMsgRichMapFunction(CollectTypeEnum collectTypeEnum) {this.collectTypeEnum = collectTypeEnum;}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic CollectDataMessage map(String kafkaMessage) {try {// 根据显示指定的枚举类,获取对应子类实现相应逻辑AbstractCollectService collectService = CollectFactory.getInstance(collectTypeEnum);return collectService.parseKafkaMessage(kafkaMessage, collectTypeEnum);} catch (RuntimeException e) {log.info("解析采集数据异常", e);throw new RuntimeException(e);}}@Overridepublic void close() throws Exception {}
}