Java 任务处理
最近梳理其他同事以前写的 job 后有点想法,记录下。
一、业务场景
在大多数的系统都有类似这样的逻辑,比如下单了给用户赠送积分,用户在论坛上发表了帖子,给用户增加积分等等。
下单赠送积分,那么一个订单肯定不能重复赠送积分,所以需要一些状态来比较来哪些是已赠送的,哪些是没有赠送的。或许可以在订单表里加个字段来标记是否赠送了积分。
有时候,业务人员出于营销的需要,可能要搞个某某时间段内下单返券的活动。难道又在订单表里加个字段?肯定不能,谁知道还要搞多少活动呢。
二、实现
为了使核心的业务流程尽可能简单高效,赠送积分、返券(后面简称为task)之类的逻辑应该通过异步的job来处理。
因为 task 的处理状态不能放在核心的业务表里,所以,可以另外建一个表示异步任务的 async_task 表,结构如下:
-- 业务job处理 任务
create table async_task (
id number(11) primary key,
key_work varchar2(32), -- 不同业务逻辑的task用不同的keyword
biz_id char(32), -- 业务数据 ID,比如订单号
biz_data varchar2(256), -- 核心的业务数据,用于避免关联业务表;具体结构取决于keyword
status number, -- 任务的处理状态; -2:未处理, -1:处理中, 0:已处理, 大于 0 的数字表示失败次数
create_tm date, -- 任务的创建时间
modify_tm date -- 任务的修改时间
);
处于性能考虑,可以在 key_work 字段上建立分区,在 biz_id 上建立索引。
当业务表有需要处理的数据时,就往 async_task 插入一条相应的记录(可以异步插入),异步 job 再从 async_task 表里取数据来处理。
注意:处理 task 时,要保证数据的一致性。所在的项目组曾出现过,下单返券的活动里,送券与更新状态的操作没有放在同一个事务里,出现券送了,状态没更新,重复送券的问题。 一定要注意事务的正确处理。
三、单线程、多线程处理 task
不管是用单线程还是多线程,都要考虑有大量 task 的情况,所以不能一次把所有符合条件的 task 都读取到内存里,一定要分页。
单机单线程
不用考虑数据被其他线程重复处理的情况,顺序处理即可:取一批数据处理,处理完了再取下一批,直到所有的都处理完了。
单机多线程
数据量大了,就不能用单线程慢慢地处理了。可以采用一个线程去读取未处理的 task,然后提交到线程池去处理,等这批 task 处理完后再去读取下一批,主流程如下:
// 直接使用 ThreadPoolExecutor 是为了使线程池的线程有特定的名字,任务队列有边界。
ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), // 有界队列
new ThreadFactory() { // 使用定制的
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("task-handler-"
+ counter.incrementAndGet());
return null;
}
}, new CallerRunsPolicy());
do {
List<AsyncTask> tasks = getUnhandleTask();
if (tasks.isEmpty()) {
break;
}
List<Callable<Object>> callables = convert2callables(tasks);
executorService.invokeAll(callables);
} while (true);
executorService.shutdown();
线程池采用 CallerRunsPolicy 策略是为了在线程池处理不完任务,线程池的任务队列满的时候,读取 task 的线程可以直接处理 task,这样既减缓了 task 的读取速度,又可以加快 task 的处理速度。
多机处理
数据量实在太多,一台机器处理不完,可以用多台。
在多机处理的时候,上面的代码就有问题了,task 可能在不同的机器上被重复处理。
任务被 getUnhandleTask()
方法读取处理后、处理完成前,另一台机器上的线程也读取到了这个任务,发现是未处理的,它也会进行处理,这样就出现重复处理了。正确的主流程如下:
public class AsyncTask {
private long id;
private int status;
public static enum STATUS {
UNHANDLE, HANDLING
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}
public class TestTaskHandle {
private Callable<Object> convert(final AsyncTask task) {
return new Callable<Object>() {
@Override
public Object call() throws Exception {
return doWithTask(task);
}
};
}
private Object doWithTask(AsyncTask task) {
return null;
}
private List<AsyncTask> getUnhandleTask() {
return null;
}
public void multiMachine() {
ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("task-handler-"
+ counter.incrementAndGet());
return null;
}
}, new CallerRunsPolicy());
do {
List<AsyncTask> tasks = getUnhandleTask();
if (tasks.isEmpty()) {
break;
}
for (AsyncTask asyncTask : tasks) {
// 把 RDBMS 的 update 操作当作一个 CAS 命令
boolean isSuccess = updateStatus(asyncTask.getId(),
AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);
if (isSuccess) {
// 把 task 更新为处理中,成功表示抢占到了这个任务,可以继续处理
executorService.submit(convert(asyncTask));
} // else 被其他线程处理了
}
} while (true);
executorService.shutdown();
}
public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,
AsyncTask.STATUS newStatus) {
return true;
}
}
在上面的实现中,每一条记录都需要通过一个数据库的 update 操作来判断是否可以继续处理,开销不小。一个改进的做法是:在 async_task 表增加一个 owner 字段,每个线程使用一个唯一的标识 tid(比如 UUID)。当 task 读取线程要读取任务时,先对 async_task 表里的未处理 task 执行 update,把状态更新为处理中, owner 更新为自己的 tid。如果这个 update 的影响行数大于 0,表示抢占到了任务,然后根据 tid 去读取任务,再分发给线程池去处理。
在存在并发竞争的情景下,很重要的就是借助数据库事务的ACID来达到一种 CAS 的效果;正确处理并发问题总是需要基础的 CAS 操作或锁。