LTS 轻量级分布式任务调度框架(Light Task Schedule)

标签: lts 轻量级 分布 | 发表时间:2015-03-07 09:53 | 作者:huguifuture
出处:http://www.iteye.com

框架概况:

LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想。有三种角色, JobClient, JobTracker, TaskTracker。各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力。 采用Zookeeper暴露节点信息,master选举。Mongo存储任务队列和任务执行日志, netty做底层通信。

  • JobClient : 主要负责提交任务, 和 接收任务执行反馈结果。
  • JobTracker : 负责接收并分配任务,任务调度。
  • TaskTracker: 负责执行任务,执行完反馈给JobTracker。

框架支持实时任务,也支持定时任务,同时也支持CronExpression, 有问题,请联系QQ254963746, 或加入群:109500214 一起探讨

github地址:https://github.com/qq254963746/light-task-schedule

架构图

Aaron Swartz

节点组:

  • 1. 一个节点组等同于一个集群,同一个节点组中的各个节点是对等的,外界无论连接节点组中的任务一个节点都是可以的。
  • 2. 每个节点组中都有一个master节点,采用zookeeper进行master选举(master宕机,会自动选举出新的master节点),框架会提供接口API来监听master节点的变化,用户可以自己使用master节点做自己想做的事情。
  • 3. JobClient和TaskTracker都可以存在多个节点组。譬如 JobClient 可以存在多个节点组。 譬如:JobClient 节点组为 ‘lts_WEB’ 中的一个节点提交提交一个 只有节点组为’lts_TRADE’的 TaskTracker 才能执行的任务。
  • 4. (每个集群中)JobTacker只有一个节点组。
  • 5. 多个JobClient节点组和多个TaskTracker节点组再加上一个JobTacker节点组, 组成一个大的集群。

工作流程:

  • 1. JobClient 提交一个 任务 给 JobTracker, 这里我提供了两种客户端API, 一种是如果JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地leveldb(可以使用NFS来达到同个节点组共享leveldb文件的目的,多线程访问,做了文件锁处理),返回给客户端提交成功的信息,待JobTracker可用的时候,再将任务提交。
  • 2. JobTracker 收到JobClient提交来的任务,先生成一个唯一的JobID。然后将任务储存在Mongo集群中。JobTracker 发现有(任务执行的)可用的TaskTracker节点(组) 之后,将优先级最大,最先提交的任务分发给TaskTracker。这里JobTracker会优先分配给比较空闲的TaskTracker节点,达到负载均衡。
  • 3. TaskTracker 收到JobTracker分发来的任务之后,执行。执行完毕之后,再反馈任务执行结果给JobTracker(成功or 失败[失败有失败错误信息]),如果发现JobTacker不可用,那么存储本地leveldb,等待TaskTracker可用的时候再反馈。反馈结果的同时,询问JobTacker有没有新的任务要执行。
  • 4. JobTacker收到TaskTracker节点的任务结果信息,生成并插入(mongo)任务执行日志。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除, 需要反馈的(同样JobClient不可用存储文件,等待可用重发)。
  • 5. JobClient 收到任务执行结果,进行自己想要的逻辑处理。

特性

  • 负载均衡:

    • JobClient 和 TaskTracker会随机连接JobTracker节点组中的一个节点,实现JobTracker负载均衡。当连接上后,将一直保持连接这个节点,保持连接通道,知道这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
    • JobTracker 分发任务时,是优先分配给最空闲的一个TaskTracker节点,实现TaskTracker节点的负载均衡。
  • 健壮性:

    • 当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。
    • 当执行任务的TaskTracker节点当机之后,JobTracker 会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。
  • 伸缩性:

    • 因为各个节点都是无状态的,可以动态增加机器部署实例, 节点关注者会自动发现。

开发计划:

  • WEB后台管理
  • 框架优化

调用示例

  • 安装 zookeeper 和 mongo , 执行 data/mongo 目录下的 mongo.md 中的语句

运行 job-example模块中的例子(包含API启动例子和Spring例子) 分别执行 JobTrackerTest TaskTrackerTest JobClientTest

这里给出的是java API(设置配置)方式启动, 也可以使用spring启动默认不启用spring,需引入job-ext-spring包

JobTracker 端

    final JobTracker jobTracker = new JobTracker();
    // 节点信息配置
    jobTracker.setZookeeperAddress("localhost:2181");
    // jobTracker.setListenPort(35001); // 默认 35001
    // jobTracker.setClusterName("lts");

    // mongo 配置
    Config config = new Config();
    config.setAddresses(new String[]{"localhost:27017"});
    config.setUsername("lts");
    config.setPassword("lts");
    config.setDbName("job");
    jobTracker.setStoreConfig(config);

    // 启动节点
    jobTracker.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            jobTracker.stop();
        }
    }));

或者Spring配置

    <bean id="mongoConfig" class="com.lts.job.store.Config">
        <property name="addresses">
            <array>
                <value>localhost:27017</value>
            </array>
        </property>
        <property name="username" value="lts"/>
        <property name="password" value="lts"/>
        <property name="dbName" value="job"/>
    </bean>
    <bean id="jobTracker" class="com.lts.job.spring.JobTrackerFactoryBean" init-method="start">
        <!--<property name="clusterName" value="lts"/>--> <!-- 集群名称 -->
        <!--<property name="listenPort" value="35001"/>--> <!-- 默认 35001 -->
        <property name="zookeeperAddress" value="localhost:2181"/>
        <property name="storeConfig" ref="mongoConfig"/>
        <property name="masterNodeChangeListeners">
            <array>
                <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/>
            </array>
        </property>
    </bean>

TaskTracker端

    TaskTracker taskTracker = new TaskTracker();
    taskTracker.setJobRunnerClass(TestJobRunner.class);
    // jobClient.setClusterName("lts");
    taskTracker.setZookeeperAddress("localhost:2181");
    taskTracker.setNodeGroup("test_trade_TaskTracker");
    taskTracker.setWorkThreads(20);
    taskTracker.start();

    // 任务执行类
    public class TestJobRunner implements JobRunner {

        @Override
        public void run(Job job) throws Throwable {

            System.out.println("我要执行"+ job);
            System.out.println(job.getParam("shopId"));

            try {
                Thread.sleep(5*1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

或者Spring方式配置

    <bean id="taskTracker" class="com.lts.job.spring.TaskTrackerFactoryBean" init-method="start">
        <!--<property name="clusterName" value="lts"/>-->
        <property name="nodeGroup" value="test_trade_TaskTracker"/><!-- 所属节点组名称 -->
        <property name="zookeeperAddress" value="localhost:2181"/>
        <property name="jobRunnerClass" value="com.lts.job.example.support.TestJobRunner"/> <!-- 任务执行类 -->
        <property name="workThreads" value="1"/>    <!-- 工作线程个数 -->
        <property name="masterNodeChangeListeners"> <!-- 所属节点组中master节点变化监听器,可以不用配置 -->
            <array>
                <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/>
            </array>
        </property>
    </bean>

JobClient端

    JobClient jobClient = new RetryJobClient();
    // JobClient jobClient = new JobClient();
    jobClient.setNodeGroup("test_JobClient");
    // jobClient.setClusterName("lts");
    jobClient.setZookeeperAddress("localhost:2181");
    jobClient.start();

    // 提交任务
    Job job = new Job();
    job.setParam("shopId", "11111");
    job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
    // job.setCronExpression("0 0/1 * * * ?");  // 支持 cronExpression表达式
    // job.setTriggerTime(new Date().getTime()); // 支持指定时间执行
    Response response = jobClient.submitJob(job);

或者spring方式启动

   <bean id="jobClient" class="com.lts.job.spring.JobClientFactoryBean" init-method="start">
        <property name="clientType" value="retry"/> <!-- 取值: 为空(默认normal), normal, retry  -->
        <!--<property name="clusterName" value="lts"/>--> <!-- 默认 defaultCluster -->
        <property name="nodeGroup" value="test_JobClient"/> <!-- 节点组名称 -->
        <property name="zookeeperAddress" value="localhost:2181"/>
        <property name="jobFinishedHandler">
            <bean class="com.lts.job.example.support.JobFinishedHandlerImpl"/>  <!-- 任务完成处理器 -->
        </property>
        <property name="masterNodeChangeListeners"><!-- 所属节点组中master节点变化监听器 -->
            <array>
                <bean class="com.lts.job.example.support.MasterNodeChangeListenerImpl"/>
            </array>
        </property>
    </bean>
    // 从Spring容器中取得JobClient Bean
    JobClient jobClient = (JobClient) applicationContext.getBean("jobClient");
    // 提交任务
    Job job = new Job();
    job.setParam("shopId", "11111");
    job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
    // job.setCronExpression("0 0/1 * * * ?");  // 支持 cronExpression表达式
    // job.setTriggerTime(new Date().getTime()); // 支持指定时间执行
    Response response = jobClient.submitJob(job);


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [lts 轻量级 分布] 推荐:

LTS 轻量级分布式任务调度框架(Light Task Schedule)

- - Java - 编程语言 - ITeye博客
LTS是一个轻量级分布式任务调度框架,参考hadoop的部分思想. 有三种角色, JobClient, JobTracker, TaskTracker. 各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力. 采用Zookeeper暴露节点信息,master选举.

LTS 分布式任务调度 1.6.9 发布

- - 开源中国社区最新新闻
LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务,定时任务和Cron任务. 有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用. 支持分布式,解决多点故障,支持动态扩容,容错重试等. Spring扩展支持,Spring Quartz Cron任务的无缝接入支持.

Ubuntu 12.04 LTS 代号 Precise Pangolin

- 丁丁 - Solidot
OwnLinux.cn 写道 "Ubuntu Linux 之父 Mark Shuttleworth 已经对外宣布 Ubuntu 12.04 LTS 代号为 Precise Pangolin 正式版将在明年4月26日发布. 从版本号上可以看出来 Ubuntu 12.04 将是一个长期支持版本(LTS:Long Term Support),其桌面版本将得到官方3年的技术支持,而服务器版本将会有长达5年的技术支持.

FastDFS+Nginx轻量级分布式文件系统安装使用

- - Linux - 操作系统 - ITeye博客
FastDFS的安装使用. FastDFS是一个开源的轻量级 分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同步、文件访问(文件上传、文件下载)等,解决了大容量存储和负载均衡的问题. 特别适合以文件为载体的在线服务,如相册网站、视频网站等等. FastDFS服务端有两个角色:跟踪器(tracker)和存储 节点(storage).

一个轻量级分布式 RPC 框架 — NettyRpc

- - ImportNew
最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《 轻量级分布式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架. 花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的 dubbo. 这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下.

Ubuntu 12.04 LTS 代号为 Precise Pangolin

- 洞箫 - cnBeta.COM
感谢OwnLinux.cn的投递. Ubuntu Linux 之父 Mark Shuttleworth 已经对外宣布 Ubuntu 12.04 LTS 代号为 Precise Pangolin 正式版将在明年4月26日发布. 从版本号上可以看出来 Ubuntu 12.04 将是一个长期支持版本(LTS:Long Term Support),其桌面版本将得到官方3年的技术支持,而服务器版本将会有长达5年的技术支持.

Ubuntu 下一個 LTS 版:12.04 Precise Pangolin 命名趣談

- 饭团 - T客邦
關心進度的 Ubuntu 用戶都注意到, Ubuntu 11.10 近期逐步完成所有修改,將在下週(10/13)正式推出. 而 Ubuntu 創辦人 Mark Suttleworth 在自己的部落格發出公告,宣告下一代 12.04 LTS 命名. 開發代號定名為 Precise Pangolin (精準的穿山甲),為什麼.

Ubuntu 12.04 LTS 桌面版技术支持将提升到 5 年

- 海坡 - Wow! Ubuntu
根据 Canonical 的官方通告,为了让 Ubuntu 桌面系统在商业领域也能更加更人关注,决定从 Ubuntu 12.04 开始,之后的所有 LTS (长期技术支持)版本都将会提供长达 5 年的技术支持服务. 此前桌面版为 3 年,而只有服务器版才有 5 年. 以下为未来版本的发布周期表:. PS: Ubuntu 每 2 年发布一个 LTS 版本,下一个 LTS 版  Ubuntu 12.04  的发布计划如下:.

轻量级Kubernetes k3s初探

- - InfoQ推荐
1 k3s简介–5 less than K8s. k3s [1] 是rancher®开源的一个Kubernetes发行版,从名字上就可以看出k3s相对k8s做了很多裁剪和优化,二进制程序不足50MB,占用资源更少,只需要512MB内存即可运行. 而之所以称为k3s是因为相对k8s裁剪了如下5个部分:.

用 toto 快速建轻量级博客

- zhai - 博客园-首页原创精华区
对于程序员或创业团队来说,还是有必要拥有一个属于自己的博客. Wordpress 曾经让个人或企业搭建博客变得非常容易. 但是我们觉得 Wordpress 还是有些重量级,所以选择了一个非常轻便的工具 toto,一段只有200多行代码的Ruby应用程序. toto之所以简单,是因为它利用一些很好的工具和服务.