Disruptor: code overview
Producers claim entries in sequence via a ProducerBarrier, write their changes into the claimed entry, then commit that entry back via the ProducerBarrier making them available for consumption. As a consumer all one needs do is provide a BatchHandler implementation that receives call backs when a new entry is available.
A RingBuffer exists at the core of the Disruptor pattern providing storage for data exchange without contention. The concurrency concerns are separated out for the producers and consumers interacting with the RingBuffer. The ProducerBarrier manages any concurrency concerns associated with claiming slots in the ring buffer, while tracking dependant consumers to prevent the ring from wrapping. The ConsumerBarrier notifies consumers when new entries are available, and Consumers can be constructed into a graph of dependencies representing multiple stages in a processing pipeline.
以上文字来自LMAX的官方文档,通过这些文字我们可以大致了解Disruptor的工作流程。下面我们再仔细看看各部分。
RingBuffer
其构造方法定义为缺省的,创建RingBuffer实例是通过静态工厂方法。
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
静态工厂方法的第一个参数EventFactory<E> factory
用于初始化private final Object[] entries
;第二个参数bufferSize
用于指定SingleProducerSequencer
或MultiProducerSequencer
中sequence的长度,也用于指定private final Object[] entries
的长度。但三个参数waitStrategy指定EventProcessor
的等待策略。
RingBuffer
中的public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
用于生成SequenceBarrier
,合适地组合这些Barrier可以构建特定的Dependency Graphs。
ProducerBarrier
-
生产者向RingBuffer中发布数据,首先通过
long sequence = ringBuffer.next()
获取数据环中下一个可以可用实体的序列号,该方法将会调用到SingleProducerSequencer
或MultiProducerSequencer
中的public long next(int n)
方法。对于
SingleProducerSequencer
不存在竞争,唯一需要考虑的是生产者是否会覆盖未消费的数据,在运行系统之前通过RingBuffer的public void addGatingSequences(Sequence... gatingSequences)
方法设置ProducerSequencer需要跟踪的最后一个或几个消费者的sequence,这样就可以防止覆盖未消费的数据。在
SingleProducerSequencer
中定义了一个内部类Padding记录下一个可获得的序列号和消费者消费到的序列号。对于
MultiProducerSequencer
,除了要防止覆盖未消费的数据,还有解决竞争写的问题,笔者可能会直接使用AtomicLong,而在Disruptor中是用Unsafe实现的,在原理上都是用CAS。 - 然后通过
ringBuffer.get(sequence).setValue(i)
获取对应的实体,并且设置好数据。 - 最后通过
ringBuffer.publish(sequence)
安全发布数据,所谓安全就是要让发布的数据对所有的线程都是可见的。在SingleProducerSequencer
和MultiProducerSequencer
中都是通过调用UNSAFE.putOrderedLong(...)
完成安全发布,该方法可以使写操作穿过memory barrier。成功发布数据后还需要根据不同的WaitStrategy执行不同的操作,比如BusySpinWaitStrategy不会做任何事情,而BlockingWaitStrategy将会signalAll。
ConsumerBarrier
对于消费者这边,需要有一个或多个EventProcessor
做为消费者,具体的处理逻辑通过实现EventHandler
或WorkHandler
中的onEvent
方法定义,从使用者的角度来说是非常清晰的。
对于BatchEventProcessor
和WorkProcessor
的构造方法都需要传入一个SequenceBarrier
,使得消费者可以跟踪生产者的状态,以及有依赖关系的其它消费者的状态。这里传入的SequenceBarrier
是通过RingBuffer
中的newBarrier(Sequence... sequencesToTrack)
方法生成的ProcessingSequenceBarrier
。
BatchEventProcessor
线程起动后就调用ProcessingSequenceBarrier
的public long waitFor(final long sequence)
方法等待发布的数据,具体的等待策略又因WaitStrategy而不同,等到有新发布的数据就调用重载的onEvent
方法执行客户逻辑。