为什么80%的码农都做不了架构师?>>>
一、前言
Elastic-Job是一个优秀的分布式作业调度框架。
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
1. Elastic-Job-Lite
-
分布式调度协调
-
弹性扩容缩容
-
失效转移
-
错过执行作业重触发
-
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
-
自诊断并修复分布式不稳定造成的问题
-
支持并行调度
-
支持作业生命周期操作
-
丰富的作业类型
-
Spring整合以及命名空间提供
-
运维平台
2. Elastic-Job-Cloud
-
应用自动分发
-
基于Fenzo的弹性资源分配
-
分布式调度协调
-
弹性扩容缩容
-
失效转移
-
错过执行作业重触发
-
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
-
支持并行调度
-
支持作业生命周期操作
-
丰富的作业类型
-
Spring整合
-
运维平台
-
基于Docker的进程隔离(TBD)
二、导读
1、Elastic-Job的核心思想
2、Elastic-Job的基本使用
三、Elastic-Job的核心思想
对于分布式计算而言,分片是最基本的思想,Elastic-Job也是沿用了这个思想,每个job跑部分数据,所有job执行完成,便是全量数据,官网给出的SimpleJob例子如下:
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ...}}
}
用switch case循环来对应分片的业务逻辑,case分片的index,进入业务逻辑执行。当然这里也有不适应的场景,类似于MapReduce需要shuffle的场景就不适合了,比方说,要根据某一个字段全局分组聚合求结果,这时候怎么分片都可能会不合理,因为每个分片只能处理N分之一的数据,没办法shuffle再聚合,这一点,也要根据具体的业务来使用。
那么ShardingContext可以拿到那些信息呢?源码如下
public final class ShardingContext {/*** 作业名称.*/private final String jobName;/*** 作业任务ID.*/private final String taskId;/*** 分片总数.*/private final int shardingTotalCount;/*** 作业自定义参数.* 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.*/private final String jobParameter;/*** 分配于本作业实例的分片项.*/private final int shardingItem;/*** 分配于本作业实例的分片参数.*/private final String shardingParameter;public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {jobName = shardingContexts.getJobName();taskId = shardingContexts.getTaskId();shardingTotalCount = shardingContexts.getShardingTotalCount();jobParameter = shardingContexts.getJobParameter();this.shardingItem = shardingItem;shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);}
}
以上代码,jobParameter和shardingItem是最有用的参数,shardingItem决定switch case循环的走向,shardingParameter可以用业务的查询条件,也可以用字符串拼接的方式组装很复杂的参数用于特定的业务。
四、Elastic-Job的基本使用
1、Job配置项
public class ElasticJobConfig {private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3).shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,MyElasticJob.class.getCanonicalName());LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}
}
几点说明:
注册中心配置项,设置zookeeper集群地址,我这里用的本地单节点,所以只有一个,当然可以配置任务名称,命名空间(namespace,本质上会在zk里生成一个目录),超时时间,最大重试次数等等
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite参数非常重要,设置这个参数为true,修改过job配置信息才会覆盖zookeeper里的数据,要不然不会生效。
2、SimpleJob的实现
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {switch (shardingContext.getShardingItem()) {case 0: {System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 1: {System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 2: {System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}default: {System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}}}
}
上面设置每5秒钟执行一次,执行ElasticJobConfig的main方法,执行结果如下:
从上面的结果,可以看出,执行每个分片的任务,其实是放到一个线程池去执行的,对应的分片信息和参数信息在shardingContext可以拿到,实现业务非常方便。
最后,如果启动多个JVM,那么这些任务就分散到各个节点里,如果一个节点宕机,下次触发任务时,将把该分片任务丢到健康机器执行,这里做到了节点容错。但是某个分片的任务在执行过程中失败了,那么这里是不会重新触发改分片任务的执行的。