disruptor使用示例

标签: disruptor | 发表时间:2015-05-15 19:28 | 作者:wh0426
出处:http://www.iteye.com


LMAX
开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。

longEvent事件数据

 

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    
    public long get(){
        return this.value;
    }
}

 

 

LongEventFactory事件工厂

 

import com.lmax.disruptor.EventFactory;
/**
 * 事件生产工厂
 * @author wanghao
 *
 */
public class LongEventFactory implements EventFactory<LongEvent>
{

	@Override
	public LongEvent newInstance() {
		return new LongEvent();
	}

}

 

 

LongEventProducer事件生产者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

/**
 * 生产者,生产longEvent事件
 * @author harry
 *
 */
public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void product(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。

消息者事件处理器

为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。

 

import com.lmax.disruptor.EventHandler;
/**
 * 消息者事件处理器,打印输出到控制台
 * @author harry
 *
 */
public class LongEventHandler  implements EventHandler<LongEvent>{
	  public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
	    {
	        System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch);
	    }
}

 

LongEventProducerWithTranslator

 

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * post-3.0 the preferred approach for publishing messages is 
 * via the Event Publisher/Event Translator portion of the API. E.g.
 * @author harry
 *
 */
public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void product(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}


translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。

 

product方法,将生产者生产的消息放入ringbuffer中。

LongEventMain

 

消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等

ByteBuffer 类字节buffer,用于包装消息。

ProducerType.SINGLE为单线程 ,可以提高性能。

 

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain {
	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws Exception
	    {
	        // 执行器,用于构造消费者线程
	        Executor executor = Executors.newCachedThreadPool();

	        // 指定事件工厂
	        LongEventFactory factory = new LongEventFactory();

	        // 指定 ring buffer字节大小, must be power of 2.
	        int bufferSize = 1024;

	        //单线程模式,获取额外的性能
	        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, 
                    bufferSize,executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
	        //设置事件业务处理器---消费者
	        disruptor.handleEventsWith(new LongEventHandler());
	        //启动disruptor线程
	        disruptor.start();

	        // 获取 ring buffer环,用于接取生产者生产的事件
	        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
	        //为 ring buffer指定事件生产者
	        //LongEventProducer producer = new LongEventProducer(ringBuffer);
	        LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer);
	        ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存
	        for (long l = 0; true; l++)
	        {
	            bb.putLong(0, l);
	            producer.product(bb);//生产者生产数据
	            Thread.sleep(1000);
	        }
	        
	    }
}

 

 

实验结果:

consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true

 

Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [disruptor] 推荐:

Disruptor封装

- - 开源软件 - ITeye博客
在数据交换场景,disruptor受到越来越多的欢迎. 下面是将原生disruptor封装成queue模型的代码,供参考. 抽象类Disruptor,提供pull、take等接口. 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

disruptor使用示例

- - 企业架构 - ITeye博客
LMAX 开源了一个高性能并发编程框架. 可以理解为消费者-生产者的消息发布订阅模式. 本文下载了官方示例代码,进行实验. longEvent事件数据. LongEventFactory事件工厂. import com.lmax.disruptor.EventFactory; /** * 事件生产工厂 * @author wanghao * */ public class LongEventFactory implements EventFactory {.

Disruptor 学习笔记

- - 开源软件 - ITeye博客
Disruptor 是一个高性能异步处理框架,也可以认为是一个消息框架,它实现了观察者模式. Disruptor 比传统的基于锁的消息框架的优势在于:它是无锁的、CPU友好;它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制启动的频率. Disruptor 为什么快. 通过内存屏障和原子性的CAS操作替换锁.

Disruptor使用入门

- - CSDN博客推荐文章
在最近的项目中看到同事使用到了Disruptor,以前在ifeve上看到过关于Disruptor的文章,但是没有深入研究,现在项目中用到了,就借这个机会对这个并发编程框架进行深入学习. 项目中使用到的是disruptor-2.10.4,所以下面分析到的Disruptor的代码是这个版本的. 并发编程网介绍Disruptor的文章是disruptor1.0版本,所以有一些术语在2.0版本上已经没有了或者被替代了.

并发框架Disruptor译文

- - 酷壳 - CoolShell.cn
(感谢同事 方腾飞投递本文). Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易. 这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单. 业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.

Disruptor 极速体验 - haiq

- - 博客园_首页
      已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的. 于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下.       最近正在进行一项对性能有很高要求的产品项目的研究,自然想起了闪电般的 Disruptor ,这必有它的用武之地,于是进行了一番探查,将成果和体会记录在案.

GitHub - chanjarster/artemis-disruptor-miaosha: 没有redis也能够支撑

- -
"小米在印度把亚马逊搞挂了"事件的秒杀解决方案. 小米在印度打破了多项记录:. 4分钟内卖出了超过250,000台. ---OPS:1042次抢购/S. 抢购前我们收到了100万“到货提醒”. 亚马逊每分钟收到超过500万个点击. 亚马逊在这个期间每秒收到1500个订单(这是印度电商公司所有销售中最高的).

还在用阻塞队列?读这篇文章,了解下 Disruptor 吧

- - IT瘾-dev
听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,去超市结账,你会看见大家都会一排排的站得好好的,等待结账,为什么要站得一排排的,你想象一下大家都没有素质,一窝蜂的上去结账,不仅让这个超市崩溃,还会容易造成各种踩踏事件,当然这些事其实在我们现实中也是会经常发生. 当然在计算机世界中,队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取.