Java 任务处理

标签: java 事务 任务处理 | 发表时间:2014-12-25 14:10 | 作者:coderbee
出处:http://coderbee.net

最近梳理其他同事以前写的 job 后有点想法,记录下。

一、业务场景

在大多数的系统都有类似这样的逻辑,比如下单了给用户赠送积分,用户在论坛上发表了帖子,给用户增加积分等等。

下单赠送积分,那么一个订单肯定不能重复赠送积分,所以需要一些状态来比较来哪些是已赠送的,哪些是没有赠送的。或许可以在订单表里加个字段来标记是否赠送了积分。

有时候,业务人员出于营销的需要,可能要搞个某某时间段内下单返券的活动。难道又在订单表里加个字段?肯定不能,谁知道还要搞多少活动呢。

二、实现

为了使核心的业务流程尽可能简单高效,赠送积分、返券(后面简称为task)之类的逻辑应该通过异步的job来处理。

因为 task 的处理状态不能放在核心的业务表里,所以,可以另外建一个表示异步任务的 async_task 表,结构如下:

  -- 业务job处理 任务
create table async_task (
  id number(11) primary key,
  key_work  varchar2(32),  --  不同业务逻辑的task用不同的keyword
  biz_id char(32),         --  业务数据 ID,比如订单号
  biz_data varchar2(256),  --  核心的业务数据,用于避免关联业务表;具体结构取决于keyword
  status number,           --  任务的处理状态; -2:未处理, -1:处理中, 0:已处理, 大于 0 的数字表示失败次数
  create_tm date,          --  任务的创建时间
  modify_tm date           --  任务的修改时间
);

处于性能考虑,可以在 key_work 字段上建立分区,在 biz_id 上建立索引。

当业务表有需要处理的数据时,就往 async_task 插入一条相应的记录(可以异步插入),异步 job 再从 async_task 表里取数据来处理。

注意:处理 task 时,要保证数据的一致性。所在的项目组曾出现过,下单返券的活动里,送券与更新状态的操作没有放在同一个事务里,出现券送了,状态没更新,重复送券的问题。 一定要注意事务的正确处理。

三、单线程、多线程处理 task

不管是用单线程还是多线程,都要考虑有大量 task 的情况,所以不能一次把所有符合条件的 task 都读取到内存里,一定要分页。

单机单线程

不用考虑数据被其他线程重复处理的情况,顺序处理即可:取一批数据处理,处理完了再取下一批,直到所有的都处理完了。

单机多线程

数据量大了,就不能用单线程慢慢地处理了。可以采用一个线程去读取未处理的 task,然后提交到线程池去处理,等这批 task 处理完后再去读取下一批,主流程如下:

     
// 直接使用 ThreadPoolExecutor 是为了使线程池的线程有特定的名字,任务队列有边界。 ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), // 有界队列 new ThreadFactory() { // 使用定制的 private AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("task-handler-" + counter.incrementAndGet()); return null; } }, new CallerRunsPolicy()); do { List<AsyncTask> tasks = getUnhandleTask(); if (tasks.isEmpty()) { break; } List<Callable<Object>> callables = convert2callables(tasks); executorService.invokeAll(callables); } while (true); executorService.shutdown();

线程池采用 CallerRunsPolicy 策略是为了在线程池处理不完任务,线程池的任务队列满的时候,读取 task 的线程可以直接处理 task,这样既减缓了 task 的读取速度,又可以加快 task 的处理速度。

多机处理

数据量实在太多,一台机器处理不完,可以用多台。

在多机处理的时候,上面的代码就有问题了,task 可能在不同的机器上被重复处理。

任务被 getUnhandleTask() 方法读取处理后、处理完成前,另一台机器上的线程也读取到了这个任务,发现是未处理的,它也会进行处理,这样就出现重复处理了。正确的主流程如下:

    public class AsyncTask {
    private long id;
    private int status;

    public static enum STATUS {
        UNHANDLE, HANDLING
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }
}

public class TestTaskHandle {

    private Callable<Object> convert(final AsyncTask task) {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                return doWithTask(task);
            }
        };
    }

    private Object doWithTask(AsyncTask task) {
        return null;
    }

    private List<AsyncTask> getUnhandleTask() {
        return null;
    }

    public void multiMachine() {

        ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    private AtomicInteger counter = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("task-handler-"
                                + counter.incrementAndGet());
                        return null;
                    }

                }, new CallerRunsPolicy());

        do {
            List<AsyncTask> tasks = getUnhandleTask();
            if (tasks.isEmpty()) {
                break;
            }

            for (AsyncTask asyncTask : tasks) {
                // 把 RDBMS 的 update 操作当作一个 CAS 命令
                boolean isSuccess = updateStatus(asyncTask.getId(),
                        AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);
                if (isSuccess) {
                    // 把 task 更新为处理中,成功表示抢占到了这个任务,可以继续处理

                    executorService.submit(convert(asyncTask));

                } // else 被其他线程处理了
            }

        } while (true);

        executorService.shutdown();
    }

    public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,
            AsyncTask.STATUS newStatus) {
        return true;
    }
} 

在上面的实现中,每一条记录都需要通过一个数据库的 update 操作来判断是否可以继续处理,开销不小。一个改进的做法是:在 async_task 表增加一个 owner 字段,每个线程使用一个唯一的标识 tid(比如 UUID)。当 task 读取线程要读取任务时,先对 async_task 表里的未处理 task 执行 update,把状态更新为处理中, owner 更新为自己的 tid。如果这个 update 的影响行数大于 0,表示抢占到了任务,然后根据 tid 去读取任务,再分发给线程池去处理。

在存在并发竞争的情景下,很重要的就是借助数据库事务的ACID来达到一种 CAS 的效果;正确处理并发问题总是需要基础的 CAS 操作或锁。

相关 [java 任务] 推荐:

Java 任务处理

- - 码蜂笔记
最近梳理其他同事以前写的 job 后有点想法,记录下. 在大多数的系统都有类似这样的逻辑,比如下单了给用户赠送积分,用户在论坛上发表了帖子,给用户增加积分等等. 下单赠送积分,那么一个订单肯定不能重复赠送积分,所以需要一些状态来比较来哪些是已赠送的,哪些是没有赠送的. 或许可以在订单表里加个字段来标记是否赠送了积分.

Java Spring注解任务调度并实现AOP监控任务执行情况

- - 极客521 | 极客521
本文讲的是通过Spring注解的方式实现任务调度. 只要引入了spring-context包就能够在项目中使用注解方式的任务调度. 需要在Spring配置文件中加入task的schema. 然后在代码中就可以直接用了,要定时执行的方法必须是void的,并且没有任何参数的. cron表达式请自行问百度,下面只列出几个从网上找的例子.

几种任务调度的 Java 实现方法与比较

- wangyegang - IBM developerWorks 中国 : 文档库
综观目前的 Web 应用,多数应用都具备任务调度的功能. 本文由浅入深介绍了几种任务调度的 Java 实现方法,包括 Timer,Scheduler, Quartz 以及 JCron Tab,并对其优缺点进行比较,目的在于给需要开发任务调度的程序员提供有价值的参考.

捕获Java线程池执行任务抛出的异常

- - BlogJava-首页技术区
Java中线程执行的任务接口java.lang.Runnable 要求不抛出Checked异常,. 那么如果 run() 方法中抛出了RuntimeException,将会怎么处理了. 通常java.lang.Thread对象运行设置一个默认的异常处理方法:. 而这个默认的静态全局的异常捕获方法时输出堆栈.

Java定时任务调度:用ExecutorService取代Timer

- - ITeye博客
《Java并发编程》一书提到,用ExecutorService取代Java Timer有几个理由,我认为其中最重要的理由是:. 如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为. Timer线程并不捕获异常,所以 TimerTask抛出的未检查的异常会终止timer线程. 这种情况下,Timer也不会再重新恢复线程的执行了;它错误的认为整个Timer都被取消了.

java中基于线程池和反射机制实现定时任务

- - CSDN博客推荐文章
调用main方法,开始加载任务配置并执行任务. MyTask 类 实现Runnable接口,在main类中调用. TaskModel: 对任务类的封装. XmlReader 任务配置解析类. System.out.println("距离首次运行还差" + initialDelay + "秒. TaskA TaskB TaskC其中定义静态方法 ,这些类的静态方法配置在 xml文件中,被调用.

Java 如何判断线程池所有任务是否执行完毕

- - CSDN博客推荐文章
启动一次顺序关闭,执行以前提交的任务,但不接受新任务. 如果已经关闭,则调用没有其他作用. 抛出: SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission ("modifyThread")),或者安全管理器的  checkAccess 方法拒绝访问.

Java中的锁(Locks in Java)

- - 并发编程网 - ifeve.com
原文链接 作者:Jakob Jenkov 译者:申章 校对:丁一. 锁像synchronized同步块一样,是一种线程同步机制,但比Java中的synchronized同步块更复杂. 因为锁(以及其它更高级的线程同步机制)是由synchronized同步块的方式实现的,所以我们还不能完全摆脱synchronized关键字( 译者注:这说的是Java 5之前的情况).

Java PaaS 对决

- 呆瓜 - IBM developerWorks 中国 : 文档库
本文为 Java 开发人员比较了三种主要的 Platform as a Service (PaaS) 产品:Google App Engine for Java、Amazon Elastic Beanstalk 和 CloudBees RUN@Cloud. 它分析了每种服务独特的技术方法、优点以及缺点,而且还讨论了常见的解决方法.

Java浮点数

- d0ngd0ng - 译言-电脑/网络/数码科技
Thomas Wang, 2000年3月. Java浮点数的定义大体上遵守了二进制浮点运算标准(即IEEE 754标准). IEEE 754标准提供了浮点数无穷,负无穷,负零和非数字(Not a number,简称NaN)的定义. 在Java开发方面,这些东西经常被多数程序员混淆. 在本文中,我们将讨论计算这些特殊的浮点数相关的结果.