본문 바로가기

JAVA

ParallelStream은 무엇일까?

728x90

 

JAVA8에는 람다식을 효과적으로 사용할 수 있도록 기존 API에 람다를 대폭 적용하였고,

그 대표적인 인터페이스는 Stream입니다. 스트림인터페이스는 컬렉션을 파이프 식으로 처리하도록

하면서 고차함수로 구조를 추상화시킵니다.

 

스트림을 사용하게되면, 여러줄의 코드를 간편하게 처리할 수 있고, 가독성이 쉽습니다.

또한 Parallel Stream이라는 것을 통해 병렬연산을 쉽고 간단하게 할 수 있다고합니다.

 

ForkJoinFrameWork 란?

 

ForkJoinFrameWork 는 큰 작업을 작은 작업들로 쪼개어 작업을 병렬로 처리하고 처리한 작업들을 다시 큰 작업으로 합치는 방식으로 동작합니다.
(마치 분할정복 알고리즘과 같이 동작합니다.)

  • Fork: 작업들을 작은 작업들로 분할함.
  • Join: 분할된 작업들을 큰 작업들로 병합함.

commonPool  사용시

ParallelStream 은 기본적으로 ForkJoinPool  commonPool 을 사용합니다.
 commonPool  size 는 아래와 같이 설정됩니다.

Runtime.getRuntime().availableProcessors() - 1

 

 

일반적인 순차처리

    for (int i = 0; i < dealmaxList.size(); i++) {
            double sum = 0;
            for (int j = 0; j < 1000000; j++) {
                sum += Math.sqrt(j);
            }
            System.out.println("Starting " + Thread.currentThread().getName()
                    + ", index=" + i + ", sum=" + sum + ", " + new Date());
        }

약 1초 1.5ms

일반적인 병렬처리 예제

 

  ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < dealmaxList.size(); i++) {
            final int index = i;
            executor.submit(() -> {
                double sum = 0;
                for (int j = 0; j < 1000000; j++) {
                    sum += Math.sqrt(j);  // CPU 부하 증가
                }
                System.out.println(Thread.currentThread().getName()
                        + ", index=" + index + ", ended at " + new Date() + ", sum=" + sum);
            });
        }
        executor.shutdown();

JAVA Parallel Stream 병렬처리

 

  dealmaxList.parallelStream().forEach(index -> {
            double sum = 0;
            for (int j = 0; j < 1000000; j++) {
                sum += Math.sqrt(j);  // 복잡한 연산 작업
            }
            System.out.println("Starting " + Thread.currentThread().getName()
                    + ", index=" + index + ", sum=" + sum + ", " + new Date());
        });

 

결과를 확인해보면 Parallel Stream은 common fork join pool을 사용하게되는데, 1개의 프로세서 당 1 thread을 사용하도록 되어 있습니다. 저의 맥북은 10core이기때문에 10개의 thread를 생성할 수 있습니다.

 

Java 8이전에는 ExecutorService를 통해 갯수를 지정할 수 있었는데...?

 

그렇다면 Parallel Stream은 어떻게 pool의 크기를 갯수를 지정해야할까 ?

 

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");

 

여기서 또 궁금증은 이렇게 스레드를 많이 늘리는게 가장 효율적인지가 궁금할것 같아요.

 

Parallel Stream은 내부적으로 common ForkJoinPool을 사용하기 때문에 ForkJoinPool을 사용하는 다른 thread

에 영향을 줄 수 있으며, 반대로 영향을 받을 수 있어요. 따라서 실행환경의 성능에 따라 별도로 테스트를 하면서 적절하게 

크기를 조절하는 것이 중요하다고 생각합니다.

 

https://javatechnocampus.wordpress.com/2015/10/03/544/

 

 

기본적으로는 ExecutorService의 구현체이지만, 다른 점은 각 thread들이 개별 큐를 가지게 되며, 다음 그림의 B처럼 자신에게 아무런 task가 없으면 A의 task를 가져와 처리하게 됨으로써 CPU자원이 놀지 않고 최적의 성능을 낼 수 있게 됩니다.

 

Parallel Stream을 이용한다면,
고려해야 할 사항들

 

ForkJoinPool의 특성상 나누어지는 job은 균등하게 처리가 되어야 합니다.
Parallel Stream은 작업을 분할하기 위해 Spliterator의 trySplit()을 사용하게 되는데, 이 분할되는 작업의 단위가 균등하게 나누어져야 하며, 나누어지는 작업에 대한 비용이 높지 않아야 순차적 방식보다 효율적으로 이루어질 수 있습니다. array, arrayList와 같이 정확한 전체 사이즈를 알 수 있는 경우에는 분할 처리가 빠르고 비용이 적게 들지만, linkedList의 경우라면 별다른 효과를 찾기가 어렵습니다.

 

public Spliterator<T> trySplit() {
    int lo = index, mid = (lo + fence) >>> 1;
    return (lo >= mid)
           ? null
           : new ArraySpliterator<>(array,
                                    lo, index = mid,
                                    characteristics);
}

또한, 병렬로 처리되는 작업이 독립적이지 않다면, 수행 성능에 영향이 있을 수 있습니다.
예를 들어, stream의 중간 단계 연산 중 sorted(), distinct()와 같은 작업을 수행하는 경우에는 내부적으로 상태에 대한 변수를 각 작업들이 공유(synchronized)하게 되어 있습니다. 이러한 경우에는 순차적으로 실행하는 경우가 더 효과적일 수 있습니다.

 

 

독립된 처리

ParallelStream 사용시 각 작업이 독립되어야 좋은 성능을 낼 수 있습니다.
stream() 연산 중 distinct(), sorted() 와 같은 작업들은 내부적으로 상태에 대한 변수를 공유하고 동기화하기에
병렬처리에 대한 이점을 살릴 수 없습니다.

 

테스트 시작

 

   // 2. 병렬 처리 vs 순차 처리 (sorted() 성능 비교)
        System.out.println("=== 병렬 스트림 sorted() ===");
        long startParallelSorted = System.currentTimeMillis();
        List<String> parallelSorted = arrayList.parallelStream()
                .sorted()
                .collect(Collectors.toList());
        long endParallelSorted = System.currentTimeMillis();
        System.out.println("병렬 스트림 sorted() 처리 시간: " + (endParallelSorted - startParallelSorted) + "ms");

        System.out.println("=== 순차 스트림 sorted() ===");
        long startSequentialSorted = System.currentTimeMillis();
        List<String> sequentialSorted = arrayList.stream()
                .sorted()
                .collect(Collectors.toList());
        long endSequentialSorted = System.currentTimeMillis();
        System.out.println("순차 스트림 sorted() 처리 시간: " + (endSequentialSorted - startSequentialSorted) + "ms");

 

ArrayList 의 경우 특정 인덱스에 있는 요소에 접근하는 시간 복잡도는 O(1)입니다.
분할 시에도 특정 지점에서 데이터를 빠르게 나눌 수 있기 때문에 성능이 좋습니다.

하지만 LinkedList 의 경우 특정 인덱스에 접근하기 위해 순차적으로 노드를 탐색해야 하므로 시간 복잡도는 O(n)입니다.
즉, 인덱스에 따라 요소를 찾는 데 더 많은 시간이 걸리기 때문에 분할 성능이 떨어집니다.

ArrayList, IntStream.range, LongStream.range, TreeMap, HashSet 과 같은 데이터 타입은 분할하기에 용이합니다.

 

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        // 데이터 준비
        List<Integer> arrayList = new ArrayList<>();
        List<Integer> linkedList = new LinkedList<>();
        int dataSize = 100000; // 데이터 크기 설정
        for (int i = 0; i < dataSize; i++) {
            arrayList.add(i);
            linkedList.add(i);
        }

        // ArrayList 병렬 처리
        System.out.println("=== ArrayList 병렬 처리 ===");
        long startArrayList = System.currentTimeMillis();
        arrayList.parallelStream().forEach(index -> performIndexAccess(arrayList));
        long endArrayList = System.currentTimeMillis();
        System.out.println("ArrayList 병렬 처리 시간: " + (endArrayList - startArrayList) + "ms");

        // LinkedList 병렬 처리
        System.out.println("=== LinkedList 병렬 처리 ===");
        long startLinkedList = System.currentTimeMillis();
        linkedList.parallelStream().forEach(index -> performIndexAccess(linkedList));
        long endLinkedList = System.currentTimeMillis();
        System.out.println("LinkedList 병렬 처리 시간: " + (endLinkedList - startLinkedList) + "ms");
    }

    // 인덱스를 반복적으로 접근하는 메서드
    private static void performIndexAccess(List<Integer> list) {
        int size = list.size();
        int sum = 0;
        for (int i = 0; i < 1000; i++) { // 1000번 반복
            int index = i % size; // 유효한 인덱스를 계산
            sum += list.get(index); // 인덱스를 통해 데이터 접근
        }
    }
}

 

 

성능차이가 어마어마하다 아무리 병렬 스트림을 쓴다고해도 데이터에 접근하는데 linkedlist는 O(N)이 들기 때문에 너무 ArrayList에 비해 너무 성능이 안좋게 나왔다. 오늘의 결론 - 상황에 맞게 적재적소하게 판단하는 것이 중요하다.

728x90

'JAVA' 카테고리의 다른 글

Effective java 정복기 6장  (0) 2025.03.15
Reactor Netty HTTP Client Connection  (0) 2025.02.06
Effective java 정복기 4장  (0) 2025.01.09
Effective java 정복기 3장  (1) 2025.01.06
스프링 첫요청이 처리되는데 오래 걸리는 이유  (0) 2024.11.25