Q: 什么是Disruptor?

A: 就是个message queue。

Q: 满大街都是。。。有什么特别的?

A:快!

Q:好吧,来个example呗。

A:去看disruptor的perftest就行了呗。

Q:你也太懒了吧。。。我就想看一下用起来方便否?

A:OK

首先肯定是定义消息类嘛


package com.lmax.disruptor.support;

import com.lmax.disruptor.EventFactory;

public final class ValueEvent
{
    private long value;

    public long getValue()
    {
        return value;
    }

    public void setValue(final long value)
    {
        this.value = value;
    }

    public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
    {
        public ValueEvent newInstance()
        {
            return new ValueEvent();
        }
    };
}

再定义一个消息的handler呗


package com.lmax.disruptor.support;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.util.PaddedLong;

import java.util.concurrent.CountDownLatch;

public final class ValueAdditionEventHandler implements EventHandler<ValueEvent>
{
    private final PaddedLong value = new PaddedLong();
    private long count;
    private CountDownLatch latch;
    private long localSequence = -1;

    public long getValue()
    {
        return value.get();
    }

    public void reset(final CountDownLatch latch, final long expectedCount)
    {
        value.set(0L);
        this.latch = latch;
        count = expectedCount;
    }

    @Override
    public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
    {
        value.set(value.get() + event.getValue());

        if (localSequence + 1 == sequence)
        {
            localSequence = sequence;
        }
        else
        {
            System.err.println("Expected: " + (localSequence + 1) + "found: " + sequence);
        }

        if (count == sequence)
        {
            latch.countDown();
        }
    }
}

然后就是定义我们的MQ,在我们的Disruptor中称做RingBuffer和SequenceBarrier,只要搞明白这个Barrier就能搞出各种队列类型。

例如最简单的单进单出队列这样定义:


  Disruptor:
  ==========
               track to prevent wrap
               +------------------+
               |                  |
               |                  v
  +----+    +====+    +====+   +-----+
  | P1 |--->| RB |<---| SB |   | EP1 |
  +----+    +====+    +====+   +-----+
       claim      get    ^        |
                         |        |
                         +--------+
                           waitFor
 
  P1  - Publisher 1
  RB  - RingBuffer
  SB  - SequenceBarrier
  EP1 - EventProcessor 1


    private final RingBuffer<ValueEvent> ringBuffer =
        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
    private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    {
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
    }
    

最后模拟发发消息,搞个Executor执行EventProcessor就可以玩一下啦


    protected long runDisruptorPass() throws InterruptedException
    {
        final CountDownLatch latch = new CountDownLatch(1);
        long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;
        handler.reset(latch, expectedCount);
        executor.submit(batchEventProcessor);
        long start = System.currentTimeMillis();

        final RingBuffer<ValueEvent> rb = ringBuffer;

        for (long i = 0; i < ITERATIONS; i++)
        {
            long next = rb.next();
            rb.get(next).setValue(i);
            rb.publish(next);
        }

        latch.await();
        long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
        waitForEventProcessorSequence(expectedCount);
        batchEventProcessor.halt();

        Assert.assertEquals(expectedResult, handler.getValue());

        return opsPerSecond;
    }