高性能队列:Disruptor

背景

Disruptor 是一种高性能的、并发编程框架,最初是为了解决 LMAX 金融交易所中的低延迟和高吞吐量需求而设计的。它主要用来解决在多线程环境中生产者和消费者之间的数据传递问题。Disruptor 由 LMAX 的 Martin Thompson 和 Dave Farley 于 2011 年发明,现在是一个开源项目。

The Disruptor is the result of our efforts to build the world’s highest performance financial exchange at LMAX. Early designs focused on architectures derived from SEDA [1] and Actors [2] using pipelines for throughput. After profiling various implementations it became evident that the queuing of events between stages in the pipeline was dominating the costs. We found that queues also introduced latency and high levels of jitter. We expended significant effort on developing new queue implementations with better performance. However it became evident that queues as a fundamental data structure are limited due to the conflation of design concerns for the producers, consumers, and their data storage. The Disruptor is the result of our work to build a concurrent structure that cleanly separates these concerns.

Disruptor 的核心思想是,通过使用环形的数据结构(RingBuffer)和避免锁来实现数据的高速传递。RingBuffer 类似于一个循环队列,内部包含一个预先分配好的对象数组,该数组的大小为 2 的幂次方。这样设计的一个原因是可以通过位运算快速计算数组中的索引,提升性能。创建此环形数据结构后,生产者可以将事件(通常是某个类的实例)发布到 RingBuffer 的下一个可用位置,同时消费者可以从 RingBuffer 中读取和处理事件。通过使用序号(Sequence)来表示 RingBuffer 中各个条目的位置,生产者和消费者之间可以实现非阻塞的协同工作模式,从而达到最大的性能优势。
Disruptor 还支持多个生产者和多个消费者的场景,消费者可以独立处理事件,也可以按照依赖关系顺序进行处理。这使得Disruptor 可以在保持低延迟的同时,实现并行处理并提高整体系统吞吐量。

那Disruptor相比于Java内置的队列而言,有什么优势呢,这里需要进一步的分析

Java的内置队列

image.png
在Java中ArrayBlockingQueue是靠锁去控制它的并发安全的,而且是有界的队列。

ArrayBlockingQueue(数组阻塞队列)通过内部的锁(ReentrantLock)和两个条件变量(Condition)notEmpty和notFull来保证线程安全。

  1. ReentrantLock(可重入锁):ArrayBlockingQueue使用ReentrantLock作为内部的锁来控制对队列的访问。在读取、写入、删除等操作时,需要先获取锁,完成操作后再释放锁。使用lock及unlock方法对数据进行保护,确保同一时刻只有一个线程能执行特定的操作。
  2. 条件变量(Condition):ArrayBlockingQueue有两个Condition,分别是notEmpty和notFull。当队列为空时,使用notEmpty.await()方法阻塞读线程;当队列满时,使用notFull.await()方法阻塞写线程。当数据被删除后,notEmpty.signal()唤醒读线程继续读取;当数据被添加后,notFull.signal()唤醒写线程继续写入。

通过这些同步措施,ArrayBlockingQueue确保了在多线程环境下的线程安全性
image.png
而LinkedTransferQueue都是通过原子变量CAS这种不加锁的方式来实现的,但是它们都是使用链表的方式去实现的,这种方式的会出现两个问题,一个是由于无界导致的OOM,一个是这种数据结构也会影响到JVM的GC效率,所以,在大多数时候,还是使用ArrayBlockingQueue比较的常见,那么,ArrayBlockingQueue 有什么问题呢。

加锁导致的效率问题

这里引用LMAX交易所的数据:

We will illustrate the cost of locks with a simple demonstration. The focus of this experiment is to call a function which increments a 64-bit counter in a loop 500 million times. This can be executed by a single thread on a 2.4Ghz Intel Westmere EP in just 300ms if written in Java. The language is unimportant to this experiment and results will be similar across all languages with the same basic primitives.
Once a lock is introduced to provide mutual exclusion, even when the lock is as yet un-contended, the cost goes up significantly. The cost increases again, by orders of magnitude, when two or more threads begin to contend. The results of this simple experiment are shown in the table below:

image.png
这里LMAX使用一个程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。测试结果显然,加锁与CAS的差别还是比较大的。
image.png
这个加锁加在了offer,以此来保证线程的安全性,但是这样的方式显然会损失不少性能。
而CAS的呢,这里则使用了unsafe去使用魔法,使其不加锁的情况下,仍然可以保证线程安全:
image.png

那为什么ArrayBlockingQueue非要加锁
看到这里大家可能会有一个疑惑,那为什么ArrayBlockingQueue不用cas而选择加锁呢?
使用ArrayBlockingQueue而非CAS的原因主要包括以下几点:

  1. 锁的简化。ArrayBlockingQueue适用于生产者和消费者的场景。当队列为空时,消费者线程需要等待;当队列已满时,生产者线程需要等待。这种有固定边界的队列在这种场景下可以简化设计。通过使用锁,可以简化这种等待-通知的逻辑。锁本身支持等待-通知机制,使用条件变量Condition实现。
  2. 性能。考虑到在有边界的队列情况下,竞争可能会比较激烈。当队列为空或已满时,CAS操作可能会导致大量的失败尝试和自旋。这会影响性能,并消耗更多的CPU资源。而在这些竞争激烈的情况下,锁的性能往往优于CAS操作。
  3. 简单性。相比于使用CAS操作实现的非阻塞数据结构,使用锁实现的数据结构通常更容易理解和实现。这意味着更少的错误和更容易维护的代码。

而Disruptor是一个高性能队列库,它的设计初衷是为了解决高并发、低延迟的场景。Disruptor之所以能够使用CAS(Compare And Swap)操作,主要是基于以下几个原因:

  1. 缓存行填充。Disruptor利用缓存行填充(Cache Line Padding)来减少伪共享(False Sharing)的问题。伪共享会导致缓存行失效,从而影响性能。通过将生产者和消费者的指针放置在不同的缓存行上,避免了伪共享问题,从而提高了使用CAS操作的性能。
  2. 锁消除。Disruptor消除了锁的使用,通过使用原子操作(如CAS)进行并发控制。这减少了锁带来的开销,提高了性能。在高并发场景下,锁的竞争可能会导致严重的性能问题,而CAS操作可以更好地支持这些场景。
  3. 数据结构设计。Disruptor的数据结构设计是基于环形缓冲区(Ring Buffer)的,这使得它在内存分配和管理上更加高效。通常情况下,无锁队列的设计和实现相对复杂,但Disruptor通过使用环形缓冲区,将其复杂程度降低,使得CAS操作的实现变得可行。
  4. 无阻塞。Disruptor采用无阻塞算法,避免了死锁、阻塞等问题。在高并发场景下,无阻塞算法通常具有更好的伸缩性。当线程之间存在很多竞争时,锁定资源可能会导致性能瓶颈。而CAS操作提供了一种无锁的方式来解决资源竞争问题。
  5. 批处理。Disruptor支持批处理,它可以将多个操作组合在一起执行。这样可以充分利用CPU缓存、减少上下文切换,从而提高CAS操作的性能。

Disruptor之所以能够使用CAS操作,主要是因为其数据结构设计、缓存行填充、无锁算法和批处理等技术的应用。这些技术使得Disruptor在高并发和低延迟场景下具有优秀的性能,满足了高性能队列的需求。

被破坏的共享

这里需要提一提一个比较底层的知识,那就是CPU的三级cache:
4-modified.png
CPU的三级缓存,即L1、L2和L3缓存,是一种位于CPU内部的高速存储器。它们主要用于存储那些频繁使用的数据和指令,从而提高CPU处理速度。
L1缓存(一级缓存):它是CPU内部最接近执行单元的缓存,速度非常快,但存储容量相对较小,通常在32KB-64KB之间。L1缓存通常分为数据缓存(用于存储操作数)和指令缓存(用于存储已经预取的指令)。
L2缓存(二级缓存):它位于L1缓存之后,速度略低于L1缓存,但具有较大的存储容量,通常在256KB到8MB之间。L2缓存通常用于存储那些不太频繁使用的数据和指令。在某些CPU设计中,L2缓存可能是共享的,这意味着它可以存储来自不同执行单元的数据和指令。
L3缓存(三级缓存):它位于L2缓存之后,速度低于L2缓存,但在容量和访问速度方面,它将较之主存储器(例如DDR RAM)有很大优势。L3缓存容量范围通常在8MB至32MB之间,但在某些高端处理器中可能更大。L3缓存主要用于在L2缓存中未命中的数据和指令的存储,它可以在多个核之间共享。
这种分级存储的方式有效地将处理器的计算能力靠近主存储器的速度。当CPU需要访问数据或指令时,它首先检查L1缓存,然后是L2缓存,最后是L3缓存。如果三级缓存都未命中,CPU将从主存储器中获取数据。由于L1、L2和L3缓存的访问速度逐级降低,但存储容量逐级增加,因此它们一起建立了一个高效的层次结构,将最频繁使用的内容存储在越来越快的缓存中,从而提高处理速度。
image.png

共享缓存带来的影响

缓存由许多缓存行组成。通常情况下,每个缓存行包含64字节,它们有效地引用了主内存中的一系列地址。在这种情况下,一个Java的long类型变量占用8字节,所以一个缓存行可以容纳8个long类型的变量。
当CPU从主存中获取数据时,相邻的数据也会被存储到相同的缓存行中。
在访问一个long类型的数组时,如果数组中的一个值被加载到缓存中,相邻的7个值也会自动被加载。这使得你可以快速地遍历该数组。实际上,在连续的内存块中分配的任何数据结构都可以被快速遍历。
以下示例展示了利用缓存行特性与不利用缓存行特性之间效果的对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CacheLineEffect {
//考虑一般缓存行大小是64字节,一个 long 类型占8字节
static long[][] arr;

public static void main(String[] args) {
arr = new long[1024 * 1024][];
for (int i = 0; i < 1024 * 1024; i++) {
arr[i] = new long[8];
for (int j = 0; j < 8; j++) {
arr[i][j] = 0L;
}
}
long sum = 0L;
long marked = System.currentTimeMillis();
for (int i = 0; i < 1024 * 1024; i+=1) {
for(int j =0; j< 8;j++){
sum = arr[i][j];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

marked = System.currentTimeMillis();
for (int i = 0; i < 8; i+=1) {
for(int j =0; j< 1024 * 1024;j++){
sum = arr[j][i];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
}
}

image.png

那么缓存与ArrayBlockingQueue有什么关系呢?

ArrayBlockingQueue有三个成员变量: - takeIndex:需要被取走的元素下标 - putIndex:可被元素插入的位置的下标 - count:队列中元素的数量
这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。
3-modified.png
如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。
这种无法充分使用缓存行特性的现象,称为伪共享。
对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。
不知道大家有没有看过《Java并发编程艺术》,那本书也写了有无共享缓存导致的区别
而解决伪共享的最好方法,就是填充
下面是一个验证程序,去表达有填充和无填充导致的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class FalseSharing implements Runnable {
public final static long ITERATIONS = 500L * 1000L * 100L;
private static ValueNoPadding[] longs;
private int arrayIndex = 0;

public FalseSharing(final int arrayIndex) {
this.arrayIndex = arrayIndex;
}

public static void main(final String[] args) throws Exception {
for (int i = 1; i < 10; i++) {
System.gc();
final long start = System.currentTimeMillis();
runTest(i);
System.out.println("Thread num " + i + " duration = " + (System.currentTimeMillis() - start));
}

}

private static void runTest(int NUM_THREADS) throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
longs = new ValueNoPadding[NUM_THREADS];
for (int i = 0; i < longs.length; i++) {
longs[i] = new ValueNoPadding();
}
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new FalseSharing(i));
}

for (Thread t : threads) {
t.start();
}

for (Thread t : threads) {
t.join();
}
}

public void run() {
long i = ITERATIONS + 1;
while (0 != --i) {
longs[arrayIndex].value = 0L;
}
}

public final static class ValuePadding {
protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value = 0L;
protected long p9, p10, p11, p12, p13, p14;
protected long p15;
}

public final static class ValueNoPadding {
// protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value = 0L;
// protected long p9, p10, p11, p12, p13, p14, p15;
}
}

image.png)image.png
左边是ValueNoPadding,右边是ValuePadding。

由此可见,有无共享导致的区别还是挺大的。
那么这么明显的问题,他们都没有注意到吗?那当然不是,事实上,已经有一个注解@Contended去解决了这个问题:
@Contended 注解用于在Java类及其字段上提高内存访问性能。它可以显式地向JVM表示,被修饰的元素(通常是字段)可能会产生伪共享(false sharing)问题。
伪共享是指多个线程频繁访问不同变量,但这些变量位于同一个缓存行上。这会导致缓存行频繁失效和重载,从而降低多线程程序的性能。
@Contended 注解提示JVM在分配内存时为标注元素添加额外的内存填充,以确保它们分布在不同的缓存行上。这有助于消除伪共享,从而提高线程间的并发性能。
要注意的是,@Contended 支持并不是所有JVM版本都有。在Oracle的Hotspot JVM上,默认情况下仅在Java 9及以上版本可用。 若要在低于Java 9的JVM版本上使用@Contended, 需要启动JVM时添加 -XX:-RestrictContended 参数。
示例使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
import jdk.internal.vm.annotation.Contended;

@Contended
public class Foo {

// Value1 & Value2 with padding to avoid False Sharing
@Contended
private volatile long value1;

@Contended
private volatile long value2;

}

上面的示例中,Foo 类及其字段 value1value2 使用了 @Contended 注解,以避免伪共享问题。

Disruptor的设计

核心设计

Disruptor是一个高性能、低延迟的并发框架,最初由LMAX公司开发。它用于解决在复杂多线程环境下数据共享和并发访问的问题。Disruptor的设计目标是使开发人员能够在高负载场景下编写出较低延迟和高吞吐量的应用程序。其的核心设计模式主要包括以下几个方面:

  1. Ring Buffer(环形缓冲区):Disruptor框架的核心数据结构。Ring Buffer用于存储和共享数据。它是一个环形的数组,长度固定且是2的整数次幂。由于是环形,当写入到达计数器尾部时,将回到数组的开头覆盖旧数据。Ring Buffer提高了数据存储和访问效率,同时降低了并发冲突的可能性。并且,由于数组的特性对处理器的缓存机制更加友好。
  2. Producer(生产者):生产者将事件数据添加到Ring Buffer中。在Disruptor框架中,可以有多个生产者同时添加事件。
  3. Sequence Barrier(序列屏障):提供了一种协调机制,以确保生产者和消费者在并行执行时可以正确访问Ring Buffer。
  4. Consumer(消费者):消费者是处理Ring Buffer中事件的实际执行者。Disruptor支持多个消费者同时处理事件,可以为消费者定义依赖关系。当多个消费者同时访问Ring Buffer时,事件将根据消费者依赖关系顺序处理。
  5. WaitStrategy(等待策略):Disruptor提供了一组内置的等待策略,用于控制生产者和消费者在等待事件处理时的行为。例如,线程执行、空轮询、阻塞等待等。

使用Disruptor可以高效地处理多线程的并发问题。相比传统的阻塞队列和锁,Disruptor通过这些设计模式实现了更低的延迟和更高的吞吐量,适用于高性能需求的场景。
image.png

特点

Disruptor还具有元素位置定位和无锁这两个优势特点,它们可以进一步提高Disruptor的性能。接下来详细介绍这三个特点:

  1. 元素位置定位:
    在Disruptor的Ring Buffer中,每个元素(事件)的位置都是通过一个序列号(Sequence)来表示的。这个序列号对应于Ring Buffer数组的一个索引位置。根据序列号计算索引位置的公式如下:
1
index = sequence % bufferSize

其中sequence表示元素的序列号(从0开始递增),bufferSize表示Ring Buffer的大小(长度)。
由于Ring Buffer的长度是2的整数次幂,这样的设计使得计算索引位置非常高效。只需要进行按位与(sequence & (bufferSize - 1))操作即可完成计算,避免了除法和取余运算,从而提高了性能。

  1. 无锁:
    Disruptor在设计时确保了多个生产者和消费者在操作Ring Buffer时都无需使用锁。利用了无锁数据结构的原子操作,例如CAS(Compare And Swap)等,确保了在高并发环境下正确、高效地访问共享数据。无锁设计相对于基于锁的同步方式减少了线程上下文切换的开销,降低了竞争冲突,从而大幅提高了应用程序的性能。
  2. 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(CPU加载空间局部性原则)。

在Disruptor框架中,生产者和消费者都使用单调递增的序列号(Sequence)来表示所操作的元素。当生产者或消费者需要添加或处理事件时,它们会使用CAS操作更新相应的Sequence。因此,消费者之间即使在并行执行时,也能够正确地知道对方的处理进度,这样消费者之间可以互不干扰地处理事件。

使用无锁的设计

Disruptor中的主要无锁设计是通过RingBuffer来实现的。RingBuffer是Disruptor中一个循环的缓冲区,使用无锁技术实现高性能的并发访问。
在Disruptor中,生产者和消费者共同操作RingBuffer。生产者发布事件时(下面的demo有示例),会通过next()方法申请序号,并一次性地预分配多个序列号。消费者根据序列号处理相应的事件,确保消费者事件间无锁、无阻塞地运行。
Disruptor通过使用设计如SequenceSequencerCursor等类和遵循一定的规则,在高并发环境下保证了生产者和消费者间的正确交互,实现了高性能的无锁设计。
image.png

环形队列

Disruptor的RingBuffer是一个环形缓冲区,它是Disruptor框架的核心组件。它采用环形数据结构使得生产者和消费者能够高效地在无锁的情况下共享数据。下面详细讲解RingBuffer的原理:

  1. 数据结构:RingBuffer实际上是一个包含预定义大小的对象数组,这个数组的大小必须为2的幂。之所以使用2的幂,是因为这样可以优化某些计算,例如通过位运算取模运算,提高性能。数组中的每个元素对应一个事件(Event),生产者负责产生事件,消费者负责处理事件。
  2. 序列号(Sequence):在Disruptor中,生产者和消费者通过序列号(Sequence)来定位RingBuffer中的位置。序列号是一个单调递增的long值,用于表示事件的位置。生产者申请序列号时,序列号递增。消费者处理事件时,根据序列号在RingBuffer中找到相应的事件。
  3. 环形缓冲区:由于RingBuffer是环形的,当数组的末尾已经用完时,生产者可以从数组的开头开始重用位置。这样,在RingBuffer不满的情况下,生产者总是可以写入事件而不会遇到阻塞。当然,在RingBuffer满的情况下,生产者可能需要等待消费者消费事件来释放空间。
  4. 索引计算:由于RingBuffer的长度是2的幂,例如长度为8的RingBuffer,可以通过位运算得到索引:index = sequence & (bufferSize - 1)。这比取模运算效率更高。
  5. 生产者-消费者交互:在Disruptor中,生产者和消费者通过遵循一定的规则实现环形缓冲区的无锁访问,提高并发性能。主要有以下规则:a) 生产者只需要关心最慢的消费者,因为只有当所有消费者完成对当前事件的处理,才能确保生产者可以安全地覆盖之前的事件;b) 消费者之间可能存在依赖关系,可以通过屏障(Barrier)等方法确保消费者按照顺序处理事件。

下面通过一个过程示例,来表达Disruptor的解决思路:
启动时,将预先分配环形缓冲区的所有内存。环形缓冲区可以存储指向 entry 的指针数组,也可以存储表示 entry 的结构数组。这些 entry 中的每一个通常不是传递的数据本身,类似对象池机制,而是它的容器。这种 entry 的预分配消除了支持垃圾回收的语言中的问题,因为 entry 将被重用,并在整个 Disruptor 实例存活期间都有效。这些 entry 的内存是同时分配的。
一般的数据结构是像下面这样的:
5-modified.png
我们可以使用一个环状的数组结构改进成下面这样:
6-modified.png
数组的连续多个元素会一并加载到 CPU Cache 里面来,所以访问遍历的速度会更快。而链表里面各个节点的数据,多半不会出现在相邻的内存空间,自然也就享受不到整个 Cache Line 加载后数据连续从高速缓存里面被访问到的优势。遍历访问时 CPU 层面的分支预测会很准确。这可以使得我们更有效地利用了 CPU 里面的多级流水线,我们的程序就会跑得更快。

在像 Java 这样的托管运行时环境中开发低延迟系统时,垃圾收集机制可能会带来问题。分配的内存越多,给垃圾收集器带来的负担就越大。当对象的寿命很短或实际上是常驻的时候,垃圾收集器工作得最好。在环形缓冲区中预先分配 entry 意味着它对于垃圾收集器来说是常驻内存的,垃圾回收的负担就很轻。同时,数组结构对处理器的缓存机制更加友好。数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。

一般的 Cache Line 大小在 64 字节左右,然后 Disruptor 在非常重要的字段前后加了很多额外的无用字段。可以让这一个字段占满一整个缓存行,这样就可以避免未共享导致的误杀。

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

一个生产者

下面用非环形的结构模拟无锁读写:

  1. 申请写入m个元素
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素
  3. 若是返回的正确,则生产者开始写入元素

7-modified.png

多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor 的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过 CAS 很容易达到。只需要在分配元素的时候,通过 CAS 判断一下这段空间是否已经分配出去即可。

但如何防止读取的时候,读到还未写的元素。Disruptor 在多个生产者的情况下,引入了一个与 Ring Buffer 大小相同的 buffer,Available Buffer。当某个位置写入成功的时候,便把 Availble Buffer 相应的位置置位,标记为写入成功。读取的时候,会遍历 Available Buffer,来判断元素是否已经就绪。

读数据流程

生产者多线程写入的情况会复杂很多:

申请读取到序号n;
若 writer cursor >= n,这时仍然无法确定连续可读的最大下标。从 reader cursor 开始读取 available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
消费者读取元素。
如下图所示,读线程读到下标为 2 的元素,三个线程 Writer1/Writer2/Writer3 正在向 RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是 11。

读线程申请读取到下标从3到11的元素,判断 writer cursor>=11。然后开始读取 availableBuffer,从 3 开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

然后,消费者读取下标从 3 到 6 共计 4 个元素(多个生产者情况下,消费者消费过程示意图)
8-modified.png

写数据流程

多个生产者写入的时候:

  1. 申请写入 m 个元素;
  2. 若是有 m 个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置 available Buffer 里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1 和 Writer2 两个线程写入数组,都申请可写的数组空间。Writer1 被分配了下标 3 到下表 5 的空间,Writer2 被分配了下标 6 到下标 9 的空间。
Writer1 写入下标 3 位置的元素,同时把 available Buffer 相应位置置位,标记已经写入成功,往后移一位,开始写下标 4 位置的元素。Writer2 同样的方式。最终都写入完成。
9-modified.png

解决伪共享的问题

其中一个解决思路,就是让不同线程操作的对象处于不同的缓存行即可即缓存行填充(Padding),使一个对象占用的内存大小刚好为64bytes或它的整数倍,这样就保证了一个缓存行里不会有多个对象,这其实是一种以空间换时间的方案。

image.png

而Sequence选择继承PhsPadding:
image.png

Sequence实际value变量的左右均被填充了7个long型变量,其自身也是long型变量,一个long型变量占据8个字节,所以序号与他上一个/下一个序号之间的最小内存距离为:158=120byte,加上对象头的8个字节,可以确保sequence大小128byte=264byte(有的CPU缓存行是128byte)
这样直接的代价就是增大的15倍的内存消耗空间,这样的设计导致不可能有两个cursor出现在同一个cpu cache line中, 就解决了”伪共享”问题。

测试

最后,官网也提供了很多测试用例:
image.png

延迟性能测试

To measure latency we take the three stage pipeline and generate events at less than saturation. This is achieved by waiting 1 microsecond after injecting an event before injecting the next and repeating 50 million times. To time at this level of precision it is necessary to use time stamp counters from the CPU. We chose CPUs with an invariant TSC because older processors suffer from changing frequency due to power saving and sleep states. Intel Nehalem and later processors use an invariant TSC which can be accessed by the latest Oracle JVMs running on Ubuntu 11.04. No CPU binding has been employed for this test. For comparison we use the ArrayBlockingQueue once again. We could have used ConcurrentLinkedQueueviii which is likely to give better results but we want to use a bounded queue implementation to ensure producers do not outpace consumers by creating back pressure. The results below are for 2.2Ghz Core i7-2720QM running Java 1.6.0_25 64-bit on Ubuntu 11.04. Mean latency per hop for the Disruptor comes out at 52 nanoseconds compared to 32,757 nanoseconds for ArrayBlockingQueue. Profiling shows the use of locks and signalling via a condition variable are the main cause of latency for the ArrayBlockingQueue.

为了测量延迟,我们采用三级管道并在低于饱和度的情况下生成事件。这是通过在注入一个事件之后等待 1 微秒,然后再注入下一个事件并重复 5000 万次来实现的。为了达到这种精度水平,必须使用 CPU 的时间戳计数器。我们选择具有不变 TSC 的 CPU,因为较旧的处理器会因省电和睡眠状态而发生频率变化。Intel Nehalem 及更高版本的处理器使用不变的 TSC,可由 Ubuntu 11.04 上运行的最新 Oracle JVM 访问。此测试未使用 CPU 绑定。为了进行比较,我们再次使用 ArrayBlockingQueue。我们本可以使用 ConcurrentLinkedQueueviii,它可能会提供更好的结果,但我们希望使用有界队列实现来确保生产者不会因产生背压而超过消费者。以下结果适用于在 Ubuntu 11.04 上运行 Java 1.6.0_25 64 位的 2.2Ghz Core i7-2720QM。Disruptor 的每跳平均延迟为 52 纳秒,而 ArrayBlockingQueue 的平均每跳延迟为 32,757 纳秒。分析显示,使用锁和通过条件变量发送信号是 ArrayBlockingQueue 延迟的主要原因。
image.png

结论

Disruptor是一个高性能、低延迟的并发框架,源自LMAX公司,主要用于实现开发者在多线程程序中无锁队列的操作。ArrayBlockingQueue则是一个基于数组实现的有界阻塞队列,是Java标准库中的一个组件。两者各有优点和局限性,具体如下:

根据上面的分析,可以得出一个结论,Disruptor的性能远高于ArrayBlockingQueue。Disruptor采用一种基于环形数组的数据结构(RingBuffer),通过适当的同步策略和无锁优化技术,实现了非常高的并发性能。因此,在高并发、低延迟场景下,Disruptor的吞吐量和延迟表现显著优于ArrayBlockingQueue。相比之下,ArrayBlockingQueue基于传统的锁和条件变量机制,性能较低、延迟能力有限。
但是,相对而言,想要更高的性能吗,就不得不放弃易用。,Disruptor编写相对复杂,可读性较差。由于Disruptor采用了更为底层的原理并使用了许多高级优化技术,例如缓存行填充,预分配对象,内存屏障等,使得其代码实现较为复杂,不易上手。而ArrayBlockingQueue实现相对简单,开发者能更容易理解和使用其原理。
最后,Disruptor对依赖环境要求较高。Disruptor的优化策略需要在特定的运行环境下生效,例如,其环形数组的大小需要设置为2的次幂,以充分利用硬件缓存。此外,Disruptor优化适用于JAVA SE环境,在ANDROID等其他环境尚未被广泛验证。而ArrayBlockingQueue作为JDK内置组件,在不同平台环境下的兼容性和稳定性更优。
所以,可以得出一个结论,Disruptor与ArrayBlockingQueue在性能、易用性和兼容性方面存在差异。若应用场景追求极致的并发性能、低延迟并能承担较高的开发成本,应选择Disruptor。但若应用场景对性能要求较低,需要简单、易用且兼容性强的阻塞队列,ArrayBlockingQueue则是更好的选择。总之,开发者需要根据实际需求权衡两者的优势和劣势。
写到这里,我想起了计算机界的经典名言:*没有银弹 *。意思是没有什么东西是最好最有效而又没有任何缺点的。
在软件工程中也是一样,没有最好的技术,只有最合适的技术。面对不同的场景,应该使用更加适合这个场景的技术,这样才能使得效率提高。我想,如何将手中的知识掌握好,然后在合适的环境中发挥它的效果,不仅仅是技术中的艺术,也是软件的工程。

Demo

https://lmax-exchange.github.io/disruptor/user-guide/index.html#_getting_started

结合用例写一个demo
事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Data
public class LongEvent {
private long value;

public void set(long value) {
this.value = value;
}

@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
1
2
3
4
5
6
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}

消费者:

1
2
3
4
5
6
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event);
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件队列 下一个槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件队列
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3.获取事件队列传递的数据
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
System.out.println("生产者准备发送数据:");
//4.发布事件
ringBuffer.publish(sequence);
}
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LongEventMain {
public static void main(String[] args) throws Exception {
// 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建工厂
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.创建ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
// 4.创建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.连接消费端方法
disruptor.handleEventsWith(new LongEventHandler());
// 6.启动
disruptor.start();
// 7.创建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.关闭disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
}

结果:
image.png

参考

https://lmax-exchange.github.io/disruptor/disruptor.html
https://lmax-exchange.github.io/disruptor/user-guide/index.html
https://lmax-exchange.github.io/disruptor/
https://github.com/LMAX-Exchange/disruptor
https://tech.meituan.com/2016/11/18/disruptor.html
https://zhuanlan.zhihu.com/p/229338771
https://zhuanlan.zhihu.com/p/513468454
https://qin.news/disruptor/