1. 개요

이 예제에서는 JCTools  (Java Concurrency Tools) 라이브러리를 소개합니다  .

간단히 말해서, 이것은 다중 스레드 환경에서 작업하기에 적합한 여러 유틸리티 데이터 구조를 제공합니다.

2. 논블로킹 알고리즘

전통적으로 변경 가능한 공유 상태에서 작동하는 다중 스레드 코드는 잠금사용 하여 데이터 일관성 및 게시(한 스레드에서 다른 스레드가 볼 수 있는 변경 사항)를 보장합니다.

이 접근 방식에는 여러 가지 단점이 있습니다.

  • 잠금을 얻으려는 시도에서 스레드가 차단되어 다른 스레드의 작업이 완료될 때까지 진행되지 않을 수 있습니다. 이는 병렬 처리를 효과적으로 방지합니다.
  • 잠금 경합이 많을수록 JVM이 스레드 스케줄링, 경합 및 대기 스레드 대기열 관리를 처리하는 데 더 많은 시간을 소비하고 수행하는 실제 작업이 줄어듭니다.
  • 교착 상태는 둘 이상의 잠금이 관련되어 있고 잘못된 순서로 획득/해제되는 경우 가능합니다.
  • 우선 순위 반전 위험이 가능 합니다. 우선 순위 가 높은 스레드는 우선 순위가 낮은 스레드가 보유하는 잠금을 얻으려는 시도에서 잠깁니다.
  • 대부분의 경우 굵은 잠금이 사용되어 병렬 처리에 많은 피해를 줍니다. 미세 잠금을 사용하려면 보다 신중한 설계가 필요하고 잠금 오버헤드가 증가하며 오류가 발생하기 쉽습니다.

대안은 non-blocking 알고리즘 을 사용하는 것입니다 . 즉, 어떤 스레드의 실패나 중단이 다른 스레드의 실패나 중단을 일으키지 않는 알고리즘 입니다.

포함된 스레드 중 적어도 하나가 랜덤의 기간 동안 진행되도록 보장되는 경우, 즉 처리 중에 교착 상태가 발생할 수 없는 경우 비차단 알고리즘은 잠금 이 없습니다.

또한 이러한 알고리즘은  스레드당 진행률이 보장되는 경우 대기하지 않습니다.

다음 은 뛰어난 Java Concurrency in Practice  책 의 비차단 스택 예제 입니다. 기본 상태를 정의합니다.

public class ConcurrentStack<E> {

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    private static class Node <E> {
        public E item;
        public Node<E> next;

        // standard constructor
    }
}

또한 몇 가지 API 메서드:

public void push(E item){
    Node<E> newHead = new Node<E>(item);
    Node<E> oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

We can see that the algorithm uses fine-grained compare-and-swap (CAS) instructions and is lock-free (even if multiple threads call top.compareAndSet() simultaneously, one of them is guaranteed to be successful) but not wait-free as there's no guarantee that CAS eventually succeeds for any particular thread.

3. Dependency

First, let's add the JCTools dependency to our pom.xml:

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>2.1.2</version>
</dependency>

Please note that the latest available version is available on Maven Central.

4. JCTools Queues

The library offers a number of queues to use in a multi-threaded environment, i.e. one or more threads write to a queue and one or more threads read from it in a thread-safe lock-free manner.

The common interface for all Queue implementations is org.jctools.queues.MessagePassingQueue.

4.1. Types of Queues

All queues can be categorized on their producer/consumer policies:

  • single producer, single consumer – such classes are named using the prefix Spsc, e.g. SpscArrayQueue
  • single producer, multiple consumers – use Spmc prefix, e.g. SpmcArrayQueue
  • multiple producers, single consumer – use Mpsc prefix, e.g. MpscArrayQueue
  • multiple producers, multiple consumers – use Mpmc prefix, e.g. MpmcArrayQueue

It's important to note that there are no policy checks internally, i.e. a queue might silently misfunction in case of incorrect usage.

E.g. the test below populates a single-producer queue from two threads and passes even though the consumer is not guaranteed to see data from different producers:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. Queue Implementations

Summarizing the classifications above, here is the list of JCTools queues:

  • SpscArrayQueue single producer, single consumer, uses an array internally, bound capacity
  • SpscLinkedQueue single producer, single consumer, uses linked list internally, unbound capacity
  • SpscChunkedArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity
  • SpscGrowableArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity. This is the same contract as SpscChunkedArrayQueue, the only difference is internal chunks management. It's recommended to use SpscChunkedArrayQueue because it has a simplified implementation
  • SpscUnboundedArrayQueue single producer, single consumer, uses an array internally, unbound capacity
  • SpmcArrayQueue single producer, multiple consumers, uses an array internally, bound capacity
  • MpscArrayQueue multiple producers, single consumer, uses an array internally, bound capacity
  • MpscLinkedQueue multiple producers, single consumer, uses a linked list internally, unbound capacity
  • MpmcArrayQueue multiple producers, multiple consumers, uses an array internally, bound capacity

4.3. Atomic Queues

All queues mentioned in the previous section use sun.misc.Unsafe. However, with the advent of Java 9 and the JEP-260 this API becomes inaccessible by default.

So, there are alternative queues which use java.util.concurrent.atomic.AtomicLongFieldUpdater (public API, less performant) instead of sun.misc.Unsafe.

위의 대기열에서 생성되며 이름  사이에 Atomic 이라는 단어가  삽입됩니다(예:  SpscChunkedAtomicArrayQueue 또는  MpmcAtomicArrayQueue ) .

가능하면 '일반' 대기열을 사용하고 Sun.misc.Unsafe 가 HotSpot Java9+ 및 JRockit과 같이 금지/비효율적인  환경에서만 AtomicQueues에 의존하는 것이 좋습니다 .

4.4. 용량

모든 JCTools 대기열은 최대 용량을 갖거나 바인딩 해제될 수도 있습니다. 대기열이 가득 차고 용량에 의해 제한되면 새 요소 수락을 중지합니다.

다음 예에서 우리는:

  • 대기열을 채우다
  • 그 후에 새 요소를 받아들이지 않는지 확인하십시오.
  • 그것을 배수하고 나중에 더 많은 요소를 추가할 수 있는지 확인하십시오.

가독성을 위해 몇 가지 코드 문을 삭제했습니다. 전체 구현은 GitHub 에서 찾을 수 있습니다 .

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    assertThat(queue.offer(queue.capacity())).isFalse();
    startConsuming.countDown();
    awakeProducer.await();
    assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. 기타 JCTools 데이터 구조

JCTools는 몇 가지 비 대기열 데이터 구조도 제공합니다.

모두 아래에 나열되어 있습니다.

  • NonBlockingHashMap 더 나은 확장 속성과 일반적으로 낮은 변형 비용을 제공하는 잠금이 없는 ConcurrentHashMap 대안입니다. sun.misc.Unsafe 를 통해 구현되므로 HotSpot Java9+ 또는 JRockit 환경에서 이 클래스를 사용하지 않는 것이 좋습니다.
  • NonBlockingHashMapLong -  같은  NonBlockingHashMap 하지만 원시적 사용 키를
  • NonBlockingHashSet JDK의 java.util.Collections.newSetFromMap() 과 같은 NonBlockingHashMap 주변의 간단한 래퍼  
  • NonBlockingIdentityHashMap -  같은  NonBlockingHashMap 하지만 정체성 키를 비교합니다.
  • NonBlockingSetInt –  기본 long 배열로 구현된 다중 스레드 비트 벡터 세트입니다. 자동 자동 박싱의 경우 비효율적으로 작동

6. 성능 테스트

 JDK의 ArrayBlockingQueue 와 JCTools 대기열의 성능 을 비교하기 위해 JMH사용합시다 . JMH는 컴파일러/jvm 최적화 알고리즘의 비결정성으로부터 우리를 보호하는 Sun/Oracle JVM 전문가의 오픈 소스 마이크로 벤치마크 프레임워크입니다. 이에 대한 자세한 내용은 이 기사 에서 자유롭게 확인 하십시오 .

아래 코드 스니펫은 가독성을 높이기 위해 몇 가지 명령문을 누락했습니다. GitHub 에서 전체 소스 코드를 찾으십시오 .

public class MpmcBenchmark {

    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue<Long> queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // intentionally left blank
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && queue.poll() == null) {
            // intentionally left blank
        }
    }
}

결과(95번째 백분위수에서 발췌, 작업당 나노초):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

우리는 것을 알 수 있습니다 MpmcArrayQueue의 수행 단지 약간 더 이상  MpmcAtomicArrayQueue 및  ArrayBlockingQueue 등이 두 배 정도 느립니다. 

7. JCTools 사용의 단점

JCTools를 사용하는 것은 중요한 단점 이 있습니다. 라이브러리 클래스가 올바르게 사용되도록 강제하는 것은 불가능합니다. 예를 들어 크고 성숙한 프로젝트에서 MpscArrayQueue사용하기 시작하는 상황을 생각해 보십시오  (단일 소비자가 있어야 함).

불행히도 프로젝트가 크기 때문에 누군가 프로그래밍 또는 구성 오류를 만들고 대기열이 이제 둘 이상의 스레드에서 읽힐 가능성이 있습니다. 시스템은 이전과 같이 작동하는 것 같지만 이제 소비자가 일부 메시지를 놓칠 가능성이 있습니다. 이는 큰 영향을 미칠 수 있고 디버그하기 매우 어려운 실제 문제입니다.

이상적으로는 JCTools가 스레드 액세스 정책을 보장하도록 하는 특정 시스템 속성으로 시스템을 실행할 수 있어야 합니다. 예를 들어 로컬/테스트/스테이징 환경(프로덕션은 아님)이 켜져 있을 수 있습니다. 안타깝게도 JCTools는 그러한 속성을 제공하지 않습니다.

또 다른 고려 사항은 JCTools가 JDK의 대응물보다 훨씬 더 빠르다는 것을 확인했지만 사용자 지정 대기열 구현을 사용하기 시작할 때 애플리케이션이 동일한 양의 속도를 얻는다는 의미는 아니라는 것입니다. 대부분의 응용 프로그램은 스레드 간에 많은 개체를 교환하지 않으며 대부분 I/O 바인딩됩니다.

8. 결론

이제 JCTools에서 제공하는 유틸리티 클래스에 대한 기본적인 이해를 얻었고 부하가 많은 JDK의 클래스와 비교하여 성능이 얼마나 우수한지 확인했습니다.

결론적으로 쓰레드 간에 많은 객체를 교환할 때만 라이브러리를 사용하는 것이 가치가 있으며, 이 경우에도 쓰레드 접근 정책을 유지하는 데 각별한 주의가 필요하다.

항상 그렇듯이 위 샘플의 전체 소스 코드는 GitHub 에서 찾을 수 있습니다 .

Junit footer banner