Disruptor封装

标签: disruptor 封装 | 发表时间:2015-07-24 11:28 | 作者:luoshi0801
出处:http://www.iteye.com

在数据交换场景,disruptor受到越来越多的欢迎。下面是将原生disruptor封装成queue模型的代码,供参考

 

抽象类Disruptor,提供pull、take等接口

 

 

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public abstract class DisruptorQueue {
    
    public static void setUseSleep(boolean useSleep) {
        DisruptorQueueImpl.setUseSleep(useSleep);
    }

    public static DisruptorQueue mkInstance(String queueName,
            ProducerType producerType, int bufferSize, WaitStrategy wait) {
        return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait);
    }

    public abstract String getName();

    public abstract void haltWithInterrupt();

    public abstract Object poll();

    public abstract Object take();

    public abstract void consumeBatch(EventHandler<Object> handler);
    
    public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);

    public abstract void publish(Object obj);

    public abstract void publish(Object obj, boolean block)
            throws InsufficientCapacityException;

    public abstract void consumerStarted();

    public abstract void clear();

    public abstract long population();

    public abstract long capacity();

    public abstract long writePos();

    public abstract long readPos();

    public abstract float pctFull();
    
}

 

 

具体实现

 

import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public class DisruptorQueueImpl extends DisruptorQueue {
    
    private static final Logger logger = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    
    static boolean useSleep = true;
    public static void setUseSleep(boolean useSleep) {
        AbstractSequencerExt.setWaitSleep(useSleep);
    }
    
    private static final Object FLUSH_CACHE = new Object();
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";

    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;

    volatile boolean consumerStartedFlag = false;

    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = cacheLock.readLock();
    private final Lock writeLock = cacheLock.writeLock();

    public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
        this._queueName = PREFIX + queueName;
        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
        _consumer = new Sequence();
        _barrier = _buffer.newBarrier();
        _buffer.addGatingSequences(_consumer);
        if (producerType == ProducerType.SINGLE) {
            consumerStartedFlag = true;
        } else {
            if (bufferSize < 2) {
                throw new RuntimeException("QueueSize must >= 2");
            }
            try {
                publishDirect(FLUSH_CACHE, true);
            } catch (InsufficientCapacityException e) {
                throw new RuntimeException("This code should be unreachable!", e);
            }
        }
    }

    public String getName() {
        return _queueName;
    }

    public void consumeBatch(EventHandler<Object> handler) {
        consumeBatchToCursor(_barrier.getCursor(), handler);
    }

    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    public Object poll() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        if (nextSequence <= _barrier.getCursor()) {
            MutableObject mo = _buffer.get(nextSequence);
            _consumer.set(nextSequence);
            Object ret = mo.o;
            mo.setObject(null);
            return ret;
        }
        return null;
    }

    public Object take() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        try {
            _barrier.waitFor(nextSequence);
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return null;
        } catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        MutableObject mo = _buffer.get(nextSequence);
        _consumer.set(nextSequence);
        Object ret = mo.o;
        mo.setObject(null);
        return ret;
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            final long nextSequence = _consumer.get() + 1;
            final long availableSequence = _barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                consumeBatchToCursor(availableSequence, handler);
            }
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return;
        }catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return ;
        }
    }

    public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){
        for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if (o == FLUSH_CACHE) {
                    Object c = null;
                    while (true) {
                        c = _cache.poll();
                        if (c == null)
                            break;
                        else
                            handler.onEvent(c, curr, true);
                    }
                } else if (o == INTERRUPT) {
                    throw new InterruptedException(
                            "Disruptor processing interrupted");
                } else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
                return;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }
        _consumer.set(cursor);
    }

    public void publish(Object obj) {
        try {
            publish(obj, true);
        } catch (InsufficientCapacityException ex) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    public void publish(Object obj, boolean block)
            throws InsufficientCapacityException {

        boolean publishNow = consumerStartedFlag;

        if (!publishNow) {
            readLock.lock();
            try {
                publishNow = consumerStartedFlag;
                if (!publishNow) {
                    _cache.add(obj);
                }
            } finally {
                readLock.unlock();
            }
        }

        if (publishNow) {
            publishDirect(obj, block);
        }
    }

    protected void publishDirect(Object obj, boolean block)
            throws InsufficientCapacityException {
        final long id;
        if (block) {
            id = _buffer.next();
        } else {
            id = _buffer.tryNext(1);
        }
        final MutableObject m = _buffer.get(id);
        m.setObject(obj);
        _buffer.publish(id);
    }

    public void consumerStarted() {

        writeLock.lock();
        consumerStartedFlag = true;
        
        writeLock.unlock();
    }

    public void clear() {
        while (population() != 0L) {
            poll();
        }
    }

    public long population() {
        return (writePos() - readPos());
    }

    public long capacity() {
        return _buffer.getBufferSize();
    }

    public long writePos() {
        return _buffer.getCursor();
    }

    public long readPos() {
        return _consumer.get();
    }

    public float pctFull() {
        return (1.0F * population() / capacity());
    }

    public Object getState() {
        long rp = readPos();
        long wp = writePos();
        state.put("capacity", capacity());
        state.put("population", wp - rp);
        state.put("write_pos", wp);
        state.put("read_pos", rp);
        return state;
    }

    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        @Override
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }
}

 

代码依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.2.1</version>
</dependency>

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [disruptor 封装] 推荐:

Disruptor封装

- - 开源软件 - ITeye博客
在数据交换场景,disruptor受到越来越多的欢迎. 下面是将原生disruptor封装成queue模型的代码,供参考. 抽象类Disruptor,提供pull、take等接口. 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

disruptor使用示例

- - 企业架构 - ITeye博客
LMAX 开源了一个高性能并发编程框架. 可以理解为消费者-生产者的消息发布订阅模式. 本文下载了官方示例代码,进行实验. longEvent事件数据. LongEventFactory事件工厂. import com.lmax.disruptor.EventFactory; /** * 事件生产工厂 * @author wanghao * */ public class LongEventFactory implements EventFactory {.

Disruptor 学习笔记

- - 开源软件 - ITeye博客
Disruptor 是一个高性能异步处理框架,也可以认为是一个消息框架,它实现了观察者模式. Disruptor 比传统的基于锁的消息框架的优势在于:它是无锁的、CPU友好;它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制启动的频率. Disruptor 为什么快. 通过内存屏障和原子性的CAS操作替换锁.

Disruptor使用入门

- - CSDN博客推荐文章
在最近的项目中看到同事使用到了Disruptor,以前在ifeve上看到过关于Disruptor的文章,但是没有深入研究,现在项目中用到了,就借这个机会对这个并发编程框架进行深入学习. 项目中使用到的是disruptor-2.10.4,所以下面分析到的Disruptor的代码是这个版本的. 并发编程网介绍Disruptor的文章是disruptor1.0版本,所以有一些术语在2.0版本上已经没有了或者被替代了.

并发框架Disruptor译文

- - 酷壳 - CoolShell.cn
(感谢同事 方腾飞投递本文). Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易. 这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单. 业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.

Disruptor 极速体验 - haiq

- - 博客园_首页
      已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的. 于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下.       最近正在进行一项对性能有很高要求的产品项目的研究,自然想起了闪电般的 Disruptor ,这必有它的用武之地,于是进行了一番探查,将成果和体会记录在案.

Disruptor为何这么快

- - 掘金后端
Disruptor 是一个开源并且高效的 生产者-消费者框架,很难直接解释这个框架是做什么的,但是可以把这个框架理解成 Java 中的 BlockingQueue. 这样理解起来是不是轻松多了,这就是一个生产者-消费者队列,只不过它的性能要比 BloockingQueue 好很多,号称单机器可以有百万的 TPS.

GitHub - chanjarster/artemis-disruptor-miaosha: 没有redis也能够支撑

- -
"小米在印度把亚马逊搞挂了"事件的秒杀解决方案. 小米在印度打破了多项记录:. 4分钟内卖出了超过250,000台. ---OPS:1042次抢购/S. 抢购前我们收到了100万“到货提醒”. 亚马逊每分钟收到超过500万个点击. 亚马逊在这个期间每秒收到1500个订单(这是印度电商公司所有销售中最高的).

还在用阻塞队列?读这篇文章,了解下 Disruptor 吧

- - IT瘾-dev
听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,去超市结账,你会看见大家都会一排排的站得好好的,等待结账,为什么要站得一排排的,你想象一下大家都没有素质,一窝蜂的上去结账,不仅让这个超市崩溃,还会容易造成各种踩踏事件,当然这些事其实在我们现实中也是会经常发生. 当然在计算机世界中,队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取.

(转)JavaScript封装

- - CSDN博客Web前端推荐文章
Javascript是一种基于对象(object-based)的语言,你遇到的所有东西几乎都是对象. 但是,它又不是一种真正的面向对象编程(OOP)语言,因为它的语法中没有class(类). 那么,如果我们要把"属性"(property)和"方法"(method),封装成一个对象,甚至要从原型对象生成一个实例对象,我们应该怎么做呢.