LTS 轻量级分布式任务调度框架(Light Task Schedule)
- - Java - 编程语言 - ITeye博客LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想. 有三种角色, JobClient, JobTracker, TaskTracker. 各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力. 采用Zookeeper暴露节点信息,master选举.
LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想。有三种角色, JobClient, JobTracker, TaskTracker。各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力。 采用Zookeeper暴露节点信息,master选举。Mongo存储任务队列和任务执行日志, netty做底层通信。
框架支持实时任务,也支持定时任务,同时也支持CronExpression, 有问题,请联系QQ254963746, 或加入群:109500214 一起探讨
github地址:https://github.com/qq254963746/light-task-schedule
负载均衡:
健壮性:
伸缩性:
运行 job-example模块中的例子(包含API启动例子和Spring例子) 分别执行 JobTrackerTest TaskTrackerTest JobClientTest
这里给出的是java API(设置配置)方式启动, 也可以使用spring启动默认不启用spring,需引入job-ext-spring包
final JobTracker jobTracker = new JobTracker(); // 节点信息配置 jobTracker.setZookeeperAddress("localhost:2181"); // jobTracker.setListenPort(35001); // 默认 35001 // jobTracker.setClusterName("lts"); // mongo 配置 Config config = new Config(); config.setAddresses(new String[]{"localhost:27017"}); config.setUsername("lts"); config.setPassword("lts"); config.setDbName("job"); jobTracker.setStoreConfig(config); // 启动节点 jobTracker.start(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { jobTracker.stop(); } }));
或者Spring配置
<bean id="mongoConfig" class="com.lts.job.store.Config"> <property name="addresses"> <array> <value>localhost:27017</value> </array> </property> <property name="username" value="lts"/> <property name="password" value="lts"/> <property name="dbName" value="job"/> </bean> <bean id="jobTracker" class="com.lts.job.spring.JobTrackerFactoryBean" init-method="start"> <!--<property name="clusterName" value="lts"/>--> <!-- 集群名称 --> <!--<property name="listenPort" value="35001"/>--> <!-- 默认 35001 --> <property name="zookeeperAddress" value="localhost:2181"/> <property name="storeConfig" ref="mongoConfig"/> <property name="masterNodeChangeListeners"> <array> <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/> </array> </property> </bean>
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(TestJobRunner.class); // jobClient.setClusterName("lts"); taskTracker.setZookeeperAddress("localhost:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setWorkThreads(20); taskTracker.start(); // 任务执行类 public class TestJobRunner implements JobRunner { @Override public void run(Job job) throws Throwable { System.out.println("我要执行"+ job); System.out.println(job.getParam("shopId")); try { Thread.sleep(5*1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
或者Spring方式配置
<bean id="taskTracker" class="com.lts.job.spring.TaskTrackerFactoryBean" init-method="start"> <!--<property name="clusterName" value="lts"/>--> <property name="nodeGroup" value="test_trade_TaskTracker"/><!-- 所属节点组名称 --> <property name="zookeeperAddress" value="localhost:2181"/> <property name="jobRunnerClass" value="com.lts.job.example.support.TestJobRunner"/> <!-- 任务执行类 --> <property name="workThreads" value="1"/> <!-- 工作线程个数 --> <property name="masterNodeChangeListeners"> <!-- 所属节点组中master节点变化监听器,可以不用配置 --> <array> <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/> </array> </property> </bean>
JobClient jobClient = new RetryJobClient(); // JobClient jobClient = new JobClient(); jobClient.setNodeGroup("test_JobClient"); // jobClient.setClusterName("lts"); jobClient.setZookeeperAddress("localhost:2181"); jobClient.start(); // 提交任务 Job job = new Job(); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式 // job.setTriggerTime(new Date().getTime()); // 支持指定时间执行 Response response = jobClient.submitJob(job);
或者spring方式启动
<bean id="jobClient" class="com.lts.job.spring.JobClientFactoryBean" init-method="start"> <property name="clientType" value="retry"/> <!-- 取值: 为空(默认normal), normal, retry --> <!--<property name="clusterName" value="lts"/>--> <!-- 默认 defaultCluster --> <property name="nodeGroup" value="test_JobClient"/> <!-- 节点组名称 --> <property name="zookeeperAddress" value="localhost:2181"/> <property name="jobFinishedHandler"> <bean class="com.lts.job.example.support.JobFinishedHandlerImpl"/> <!-- 任务完成处理器 --> </property> <property name="masterNodeChangeListeners"><!-- 所属节点组中master节点变化监听器 --> <array> <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/> </array> </property> </bean>
// 从Spring容器中取得JobClient Bean JobClient jobClient = (JobClient) applicationContext.getBean("jobClient"); // 提交任务 Job job = new Job(); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式 // job.setTriggerTime(new Date().getTime()); // 支持指定时间执行 Response response = jobClient.submitJob(job);