十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
创新互联是专业的辉县网站建设公司,辉县接单;提供成都网站制作、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行辉县网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
import java.util.concurrent.ThreadFactory import com.lmax.disruptor.dsl.{Disruptor, ProducerType} import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy} object DisruptorTest { val disruptor = { val factory = new EventFactory[Event] { override def newInstance(): Event = Event(-1) } val threadFactory = new ThreadFactory(){ override def newThread(r: Runnable): Thread = new Thread(r) } val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()) disruptor.handleEventsWith(TestHandler).`then`(ThenHandler) disruptor } val translator = new EventTranslatorOneArg[Event, Int]() { override def translateTo(event: Event, sequence: Long, arg: Int): Unit = { event.id = arg println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}") } } def main(args: Array[String]): Unit = { disruptor.start() (0 until 100).foreach { i => disruptor.publishEvent(translator, i) } disruptor.shutdown() } } case class Event(var id: Int) { override def toString: String = s"event: ${id}" } object TestHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } } object ThenHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } }
先看 Disruptor 构造方法
public Disruptor(final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
public staticRingBuffer create(ProducerType producerType, EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } public static RingBuffer createSingleProducer(EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer (factory, sequencer); } RingBufferFields(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
首先看 disruptor.start(): 消费事件消息入口
private final ConsumerRepositoryconsumerRepository = new ConsumerRepository<>(); public RingBuffer start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
public final EventHandlerGrouphandleEventsWith(final EventHandler super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler super T> eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题
public Disruptor( final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } @Override public void start(final java.util.concurrent.Executor executor){ //EventProcessor extends Runnable //executor = BasicExecutor executor.execute(eventprocessor); } public final class BatchEventProcessor implements EventProcessor { @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法
问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?
我们看 processEvents 方法执行逻辑
先获取 BatchEventProcessor.sequence 并 +1
通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的实现
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) { lock.lock(); try { while (cursorSequence.get() < sequence) { barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; private final Sequence dependentSequence; private volatile boolean alerted = false; private final Sequence cursorSequence; private final Sequencer sequencer; ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } } }
在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
createEventProcessors 仅会被 Disruptor.handleEventsWith和 EventHandlerGroup.handleEventsWith
public class Disruptor{ public final EventHandlerGroup handleEventsWith(final EventHandler super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler super T> eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } } public class EventHandlerGroup { private final Disruptor disruptor; private final ConsumerRepository consumerRepository; private final Sequence[] sequences; EventHandlerGroup(final Disruptor disruptor, final ConsumerRepository consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); } public final EventHandlerGroup handleEventsWith(final EventHandler super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); } public final EventHandlerGroup then(final EventHandler super T>... handlers) { return handleEventsWith(handlers); } }
EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then
(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行
接着看 disruptor.publishEvent(translator, i)
就是往 ringBuffer 里面放数据,
public void publishEvent(EventTranslatorOneArgtranslator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } private void translateAndPublish(EventTranslatorOneArg translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public E get(long sequence) { return elementAt(sequence); }
get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); @Override public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); }
流程理清楚了,我们看看 知识点
ringbuffer
内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。 可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。 做出来的系统容易保持健壮。
cpu cache
CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。 由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。