disruptor使用示例
LMAX 开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。
longEvent事件数据
public class LongEvent { private long value; public void set(long value) { this.value = value; } public long get(){ return this.value; } }
LongEventFactory事件工厂
import com.lmax.disruptor.EventFactory; /** * 事件生产工厂 * @author wanghao * */ public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
LongEventProducer事件生产者
import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; /** * 生产者,生产longEvent事件 * @author harry * */ public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void product(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。
消息者事件处理器
为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。
import com.lmax.disruptor.EventHandler; /** * 消息者事件处理器,打印输出到控制台 * @author harry * */ public class LongEventHandler implements EventHandler<LongEvent>{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch); } }
LongEventProducerWithTranslator
import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * post-3.0 the preferred approach for publishing messages is * via the Event Publisher/Event Translator portion of the API. E.g. * @author harry * */ public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void product(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。
product方法,将生产者生产的消息放入ringbuffer中。
LongEventMain
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等
ByteBuffer 类字节buffer,用于包装消息。
ProducerType.SINGLE为单线程 ,可以提高性能。
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { // 执行器,用于构造消费者线程 Executor executor = Executors.newCachedThreadPool(); // 指定事件工厂 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字节大小, must be power of 2. int bufferSize = 1024; //单线程模式,获取额外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize,executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new LongEventHandler()); //启动disruptor线程 disruptor.start(); // 获取 ring buffer环,用于接取生产者生产的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //为 ring buffer指定事件生产者 //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存 for (long l = 0; true; l++) { bb.putLong(0, l); producer.product(bb);//生产者生产数据 Thread.sleep(1000); } } }
实验结果:
consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true
Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐