본문 바로가기

JAVA

ParallelStream은 무엇일까?

728x90

 

JAVA8에는 람다식을 효과적으로 사용할 수 있도록 기존 API에 람다를 대폭 적용하였고,

그 대표적인 인터페이스는 Stream입니다. 스트림인터페이스는 컬렉션을 파이프 식으로 처리하도록

하면서 고차함수로 구조를 추상화시킵니다.

 

스트림을 사용하게되면, 여러줄의 코드를 간편하게 처리할 수 있고, 가독성이 쉽습니다.

또한 Parallel Stream이라는 것을 통해 병렬연산을 쉽고 간단하게 할 수 있다고합니다.

 

ForkJoinFrameWork 란?

 

ForkJoinFrameWork 는 큰 작업을 작은 작업들로 쪼개어 작업을 병렬로 처리하고 처리한 작업들을 다시 큰 작업으로 합치는 방식으로 동작합니다.
(마치 분할정복 알고리즘과 같이 동작합니다.)

  • Fork: 작업들을 작은 작업들로 분할함.
  • Join: 분할된 작업들을 큰 작업들로 병합함.

commonPool  사용시

ParallelStream 은 기본적으로 ForkJoinPool  commonPool 을 사용합니다.
 commonPool  size 는 아래와 같이 설정됩니다.

Runtime.getRuntime().availableProcessors() - 1

 

 

일반적인 순차처리

    for (int i = 0; i < dealmaxList.size(); i++) {
            double sum = 0;
            for (int j = 0; j < 1000000; j++) {
                sum += Math.sqrt(j);
            }
            System.out.println("Starting " + Thread.currentThread().getName()
                    + ", index=" + i + ", sum=" + sum + ", " + new Date());
        }

약 1초 1.5ms

일반적인 병렬처리 예제

 

  ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < dealmaxList.size(); i++) {
            final int index = i;
            executor.submit(() -> {
                double sum = 0;
                for (int j = 0; j < 1000000; j++) {
                    sum += Math.sqrt(j);  // CPU 부하 증가
                }
                System.out.println(Thread.currentThread().getName()
                        + ", index=" + index + ", ended at " + new Date() + ", sum=" + sum);
            });
        }
        executor.shutdown();

JAVA Parallel Stream 병렬처리

 

  dealmaxList.parallelStream().forEach(index -> {
            double sum = 0;
            for (int j = 0; j < 1000000; j++) {
                sum += Math.sqrt(j);  // 복잡한 연산 작업
            }
            System.out.println("Starting " + Thread.currentThread().getName()
                    + ", index=" + index + ", sum=" + sum + ", " + new Date());
        });

 

결과를 확인해보면 Parallel Stream은 common fork join pool을 사용하게되는데, 1개의 프로세서 당 1 thread을 사용하도록 되어 있습니다. 저의 맥북은 10core이기때문에 10개의 thread를 생성할 수 있습니다.

 

Java 8이전에는 ExecutorService를 통해 갯수를 지정할 수 있었는데...?

 

그렇다면 Parallel Stream은 어떻게 pool의 크기를 갯수를 지정해야할까 ?

 

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");

 

여기서 또 궁금증은 이렇게 스레드를 많이 늘리는게 가장 효율적인지가 궁금할것 같아요.

 

Parallel Stream은 내부적으로 common ForkJoinPool을 사용하기 때문에 ForkJoinPool을 사용하는 다른 thread

에 영향을 줄 수 있으며, 반대로 영향을 받을 수 있어요. 따라서 실행환경의 성능에 따라 별도로 테스트를 하면서 적절하게 

크기를 조절하는 것이 중요하다고 생각합니다.

 

https://javatechnocampus.wordpress.com/2015/10/03/544/

 

 

기본적으로는 ExecutorService의 구현체이지만, 다른 점은 각 thread들이 개별 큐를 가지게 되며, 다음 그림의 B처럼 자신에게 아무런 task가 없으면 A의 task를 가져와 처리하게 됨으로써 CPU자원이 놀지 않고 최적의 성능을 낼 수 있게 됩니다.

 

Parallel Stream을 이용한다면,
고려해야 할 사항들

 

ForkJoinPool의 특성상 나누어지는 job은 균등하게 처리가 되어야 합니다.
Parallel Stream은 작업을 분할하기 위해 Spliterator의 trySplit()을 사용하게 되는데, 이 분할되는 작업의 단위가 균등하게 나누어져야 하며, 나누어지는 작업에 대한 비용이 높지 않아야 순차적 방식보다 효율적으로 이루어질 수 있습니다. array, arrayList와 같이 정확한 전체 사이즈를 알 수 있는 경우에는 분할 처리가 빠르고 비용이 적게 들지만, linkedList의 경우라면 별다른 효과를 찾기가 어렵습니다.

 

public Spliterator<T> trySplit() {
    int lo = index, mid = (lo + fence) >>> 1;
    return (lo >= mid)
           ? null
           : new ArraySpliterator<>(array,
                                    lo, index = mid,
                                    characteristics);
}

또한, 병렬로 처리되는 작업이 독립적이지 않다면, 수행 성능에 영향이 있을 수 있습니다.
예를 들어, stream의 중간 단계 연산 중 sorted(), distinct()와 같은 작업을 수행하는 경우에는 내부적으로 상태에 대한 변수를 각 작업들이 공유(synchronized)하게 되어 있습니다. 이러한 경우에는 순차적으로 실행하는 경우가 더 효과적일 수 있습니다.

 

 

독립된 처리

ParallelStream 사용시 각 작업이 독립되어야 좋은 성능을 낼 수 있습니다.
stream() 연산 중 distinct(), sorted() 와 같은 작업들은 내부적으로 상태에 대한 변수를 공유하고 동기화하기에
병렬처리에 대한 이점을 살릴 수 없습니다.

 

테스트 시작

 

   // 2. 병렬 처리 vs 순차 처리 (sorted() 성능 비교)
        System.out.println("=== 병렬 스트림 sorted() ===");
        long startParallelSorted = System.currentTimeMillis();
        List<String> parallelSorted = arrayList.parallelStream()
                .sorted()
                .collect(Collectors.toList());
        long endParallelSorted = System.currentTimeMillis();
        System.out.println("병렬 스트림 sorted() 처리 시간: " + (endParallelSorted - startParallelSorted) + "ms");

        System.out.println("=== 순차 스트림 sorted() ===");
        long startSequentialSorted = System.currentTimeMillis();
        List<String> sequentialSorted = arrayList.stream()
                .sorted()
                .collect(Collectors.toList());
        long endSequentialSorted = System.currentTimeMillis();
        System.out.println("순차 스트림 sorted() 처리 시간: " + (endSequentialSorted - startSequentialSorted) + "ms");

 

ArrayList 의 경우 특정 인덱스에 있는 요소에 접근하는 시간 복잡도는 O(1)입니다.
분할 시에도 특정 지점에서 데이터를 빠르게 나눌 수 있기 때문에 성능이 좋습니다.

하지만 LinkedList 의 경우 특정 인덱스에 접근하기 위해 순차적으로 노드를 탐색해야 하므로 시간 복잡도는 O(n)입니다.
즉, 인덱스에 따라 요소를 찾는 데 더 많은 시간이 걸리기 때문에 분할 성능이 떨어집니다.

ArrayList, IntStream.range, LongStream.range, TreeMap, HashSet 과 같은 데이터 타입은 분할하기에 용이합니다.

 

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        // 데이터 준비
        List<Integer> arrayList = new ArrayList<>();
        List<Integer> linkedList = new LinkedList<>();
        int dataSize = 100000; // 데이터 크기 설정
        for (int i = 0; i < dataSize; i++) {
            arrayList.add(i);
            linkedList.add(i);
        }

        // ArrayList 병렬 처리
        System.out.println("=== ArrayList 병렬 처리 ===");
        long startArrayList = System.currentTimeMillis();
        arrayList.parallelStream().forEach(index -> performIndexAccess(arrayList));
        long endArrayList = System.currentTimeMillis();
        System.out.println("ArrayList 병렬 처리 시간: " + (endArrayList - startArrayList) + "ms");

        // LinkedList 병렬 처리
        System.out.println("=== LinkedList 병렬 처리 ===");
        long startLinkedList = System.currentTimeMillis();
        linkedList.parallelStream().forEach(index -> performIndexAccess(linkedList));
        long endLinkedList = System.currentTimeMillis();
        System.out.println("LinkedList 병렬 처리 시간: " + (endLinkedList - startLinkedList) + "ms");
    }

    // 인덱스를 반복적으로 접근하는 메서드
    private static void performIndexAccess(List<Integer> list) {
        int size = list.size();
        int sum = 0;
        for (int i = 0; i < 1000; i++) { // 1000번 반복
            int index = i % size; // 유효한 인덱스를 계산
            sum += list.get(index); // 인덱스를 통해 데이터 접근
        }
    }
}

 

 

성능차이가 어마어마하다 아무리 병렬 스트림을 쓴다고해도 데이터에 접근하는데 linkedlist는 O(N)이 들기 때문에 너무 ArrayList에 비해 너무 성능이 안좋게 나왔다. 오늘의 결론 - 상황에 맞게 적재적소하게 판단하는 것이 중요하다.

728x90

'JAVA' 카테고리의 다른 글

Effective java 정복기 2장  (0) 2025.01.06
스프링 첫요청이 처리되는데 오래 걸리는 이유  (0) 2024.11.25