Disruptor: what's Disruptor
Q: 什么是Disruptor?
A: 就是个message queue。
Q: 满大街都是。。。有什么特别的?
A:快!
Q:好吧,来个example呗。
A:去看disruptor的perftest就行了呗。
Q:你也太懒了吧。。。我就想看一下用起来方便否?
A:OK
首先肯定是定义消息类嘛
package com.lmax.disruptor.support;
import com.lmax.disruptor.EventFactory;
public final class ValueEvent
{
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}
再定义一个消息的handler呗
package com.lmax.disruptor.support;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.util.PaddedLong;
import java.util.concurrent.CountDownLatch;
public final class ValueAdditionEventHandler implements EventHandler<ValueEvent>
{
private final PaddedLong value = new PaddedLong();
private long count;
private CountDownLatch latch;
private long localSequence = -1;
public long getValue()
{
return value.get();
}
public void reset(final CountDownLatch latch, final long expectedCount)
{
value.set(0L);
this.latch = latch;
count = expectedCount;
}
@Override
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
value.set(value.get() + event.getValue());
if (localSequence + 1 == sequence)
{
localSequence = sequence;
}
else
{
System.err.println("Expected: " + (localSequence + 1) + "found: " + sequence);
}
if (count == sequence)
{
latch.countDown();
}
}
}
然后就是定义我们的MQ,在我们的Disruptor中称做RingBuffer和SequenceBarrier,只要搞明白这个Barrier就能搞出各种队列类型。
例如最简单的单进单出队列这样定义:
Disruptor:
==========
track to prevent wrap
+------------------+
| |
| v
+----+ +====+ +====+ +-----+
| P1 |--->| RB |<---| SB | | EP1 |
+----+ +====+ +====+ +-----+
claim get ^ |
| |
+--------+
waitFor
P1 - Publisher 1
RB - RingBuffer
SB - SequenceBarrier
EP1 - EventProcessor 1
private final RingBuffer<ValueEvent> ringBuffer =
createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
{
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
}
最后模拟发发消息,搞个Executor执行EventProcessor就可以玩一下啦
protected long runDisruptorPass() throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(1);
long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;
handler.reset(latch, expectedCount);
executor.submit(batchEventProcessor);
long start = System.currentTimeMillis();
final RingBuffer<ValueEvent> rb = ringBuffer;
for (long i = 0; i < ITERATIONS; i++)
{
long next = rb.next();
rb.get(next).setValue(i);
rb.publish(next);
}
latch.await();
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
waitForEventProcessorSequence(expectedCount);
batchEventProcessor.halt();
Assert.assertEquals(expectedResult, handler.getValue());
return opsPerSecond;
}