温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Disruptor中怎么实现一个高性能队列

发布时间:2021-06-21 16:57:40 来源:亿速云 阅读:148 作者:Leah 栏目:大数据

Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Disruptor 例子

import java.util.concurrent.ThreadFactory import com.lmax.disruptor.dsl.{Disruptor, ProducerType} import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy} object DisruptorTest {   val disruptor = {     val factory = new EventFactory[Event] {       override def newInstance(): Event = Event(-1)     }     val threadFactory = new ThreadFactory(){       override def newThread(r: Runnable): Thread = new Thread(r)     }          val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,                          new BlockingWaitStrategy())     disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)          disruptor   }      val translator = new EventTranslatorOneArg[Event, Int]() {     override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {       event.id = arg       println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")     }   }   def main(args: Array[String]): Unit = {     disruptor.start()     (0 until 100).foreach { i =>       disruptor.publishEvent(translator, i)     }     disruptor.shutdown()   } } case class Event(var id: Int) {   override def toString: String = s"event: ${id}" } object TestHandler extends EventHandler[Event] {   override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {     println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")   } } object ThenHandler extends EventHandler[Event] {   override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {     println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")   } }

源码阅读

disrutpor 初始化

先看 Disruptor 构造方法

public Disruptor(final EventFactory<T> eventFactory,    final int ringBufferSize,    final ThreadFactory threadFactory,    final ProducerType producerType,   final WaitStrategy waitStrategy) {     this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),          new BasicExecutor(threadFactory)); }

在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面

public static <E> RingBuffer<E> create(ProducerType producerType,    EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {     switch (producerType) {         case SINGLE:             return createSingleProducer(factory, bufferSize, waitStrategy);         case MULTI:             return createMultiProducer(factory, bufferSize, waitStrategy);         default:             throw new IllegalStateException(producerType.toString());     } } public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,      WaitStrategy waitStrategy) {     SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);     return new RingBuffer<E>(factory, sequencer); } RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {     this.sequencer = sequencer;     this.bufferSize = sequencer.getBufferSize();     if (bufferSize < 1) {         throw new IllegalArgumentException("bufferSize must not be less than 1");     }     if (Integer.bitCount(bufferSize) != 1) {         throw new IllegalArgumentException("bufferSize must be a power of 2");     }     this.indexMask = bufferSize - 1;     this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];     fill(eventFactory); } private void fill(EventFactory<E> eventFactory) {     for (int i = 0; i < bufferSize; i++) {         entries[BUFFER_PAD + i] = eventFactory.newInstance();     } }

消费事件消息

首先看 disruptor.start(): 消费事件消息入口

private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); public RingBuffer<T> start() {     checkOnlyStartedOnce();     for (final ConsumerInfo consumerInfo : consumerRepository) {         consumerInfo.start(executor);     }     return ringBuffer; }

consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {     return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {     checkNotStarted();     final Sequence[] processorSequences = new Sequence[eventHandlers.length];     final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);     for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {         final EventHandler<? super T> eventHandler = eventHandlers[i];         final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);         if (exceptionHandler != null) {             batchEventProcessor.setExceptionHandler(exceptionHandler);         }         consumerRepository.add(batchEventProcessor, eventHandler, barrier);         processorSequences[i] = batchEventProcessor.getSequence();     }     updateGatingSequencesForNextInChain(barrierSequences, processorSequences);     return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }

回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题

public Disruptor(         final EventFactory<T> eventFactory,         final int ringBufferSize,         final ThreadFactory threadFactory,         final ProducerType producerType,         final WaitStrategy waitStrategy) {     this(         RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),         new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {     this.ringBuffer = ringBuffer;     this.executor = executor; } @Override public void start(final java.util.concurrent.Executor executor){     //EventProcessor extends Runnable     //executor = BasicExecutor      executor.execute(eventprocessor); } public final class BatchEventProcessor<T> implements EventProcessor {   @Override   public void run() {       if (running.compareAndSet(IDLE, RUNNING)) {           sequenceBarrier.clearAlert();           notifyStart();           try {               if (running.get() == RUNNING) {                   processEvents();               }           } finally {               notifyShutdown();               running.set(IDLE);           }       } else {           if (running.get() == RUNNING) {               throw new IllegalStateException("Thread is already running");           } else {               earlyExit();           }       }   } } private void processEvents() {     T event = null;     long nextSequence = sequence.get() + 1L;     while (true) {         try {             final long availableSequence = sequenceBarrier.waitFor(nextSequence);             if (batchStartAware != null) {                 batchStartAware.onBatchStart(availableSequence - nextSequence + 1);             }             while (nextSequence <= availableSequence) {                 event = dataProvider.get(nextSequence);                 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                 nextSequence++;             }             sequence.set(availableSequence);         } catch (final TimeoutException e) {             notifyTimeout(sequence.get());         } catch (final AlertException ex) {             if (running.get() != RUNNING) {                 break;             }         } catch (final Throwable ex) {             exceptionHandler.handleEventException(ex, nextSequence, event);             sequence.set(nextSequence);             nextSequence++;         }     } }

executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法

问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?

我们看 processEvents 方法执行逻辑

  1. 先获取 BatchEventProcessor.sequence 并 +1

  2. 通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的实现

     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,      SequenceBarrier barrier)     throws AlertException, InterruptedException {     long availableSequence;     if (cursorSequence.get() < sequence) {         lock.lock();         try {             while (cursorSequence.get() < sequence) {                 barrier.checkAlert();                 processorNotifyCondition.await();             }         }         finally {             lock.unlock();         }     }     while ((availableSequence = dependentSequence.get()) < sequence) {         barrier.checkAlert();     }     return availableSequence; }


    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence 作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化

    final class ProcessingSequenceBarrier implements SequenceBarrier {     private final WaitStrategy waitStrategy;     private final Sequence dependentSequence;     private volatile boolean alerted = false;     private final Sequence cursorSequence;     private final Sequencer sequencer;     ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,         final Sequence cursorSequence, final Sequence[] dependentSequences) {         this.sequencer = sequencer;         this.waitStrategy = waitStrategy;         this.cursorSequence = cursorSequence;         if (0 == dependentSequences.length) {             dependentSequence = cursorSequence;         } else {             dependentSequence = new FixedSequenceGroup(dependentSequences);         }     } }


    在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 仅会被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    public class Disruptor<T> {     public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {         return createEventProcessors(new Sequence[0], handlers);     }     EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,         final EventHandler<? super T>[] eventHandlers) {         checkNotStarted();         final Sequence[] processorSequences = new Sequence[eventHandlers.length];         final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);         for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {             final EventHandler<? super T> eventHandler = eventHandlers[i];             final BatchEventProcessor<T> batchEventProcessor =                  new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);             if (exceptionHandler != null) {                 batchEventProcessor.setExceptionHandler(exceptionHandler);             }             consumerRepository.add(batchEventProcessor, eventHandler, barrier);             processorSequences[i] = batchEventProcessor.getSequence();         }         updateGatingSequencesForNextInChain(barrierSequences, processorSequences);         return new EventHandlerGroup<>(this, consumerRepository, processorSequences);     } } public class EventHandlerGroup<T> {     private final Disruptor<T> disruptor;     private final ConsumerRepository<T> consumerRepository;     private final Sequence[] sequences;     EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,         final Sequence[] sequences) {         this.disruptor = disruptor;         this.consumerRepository = consumerRepository;         this.sequences = Arrays.copyOf(sequences, sequences.length);     }     public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {         return disruptor.createEventProcessors(sequences, handlers);     }     public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {         return handleEventsWith(handlers);     } }


    EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行

生产事件消息

接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,

public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {     final long sequence = sequencer.next();     translateAndPublish(translator, sequence, arg0); } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {     try {         translator.translateTo(get(sequence), sequence, arg0);     } finally {         sequencer.publish(sequence);     } } public E get(long sequence) {     return elementAt(sequence); }

get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); @Override public void publish(long sequence) {     cursor.set(sequence);     waitStrategy.signalAllWhenBlocking(); }

总结

流程理清楚了,我们看看 知识点

  • ringbuffer

    • 内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。 可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。 做出来的系统容易保持健壮。

  • cpu cache

    • CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。 由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI