JAVA8에는 람다식을 효과적으로 사용할 수 있도록 기존 API에 람다를 대폭 적용하였고,
그 대표적인 인터페이스는 Stream입니다. 스트림인터페이스는 컬렉션을 파이프 식으로 처리하도록
하면서 고차함수로 구조를 추상화시킵니다.
스트림을 사용하게되면, 여러줄의 코드를 간편하게 처리할 수 있고, 가독성이 쉽습니다.
또한 Parallel Stream이라는 것을 통해 병렬연산을 쉽고 간단하게 할 수 있다고합니다.
ForkJoinFrameWork 란?
ForkJoinFrameWork 는 큰 작업을 작은 작업들로 쪼개어 작업을 병렬로 처리하고 처리한 작업들을 다시 큰 작업으로 합치는 방식으로 동작합니다.
(마치 분할정복 알고리즘과 같이 동작합니다.)
- Fork: 작업들을 작은 작업들로 분할함.
- Join: 분할된 작업들을 큰 작업들로 병합함.
commonPool 사용시
ParallelStream 은 기본적으로 ForkJoinPool 의 commonPool 을 사용합니다.
이 commonPool 의 size 는 아래와 같이 설정됩니다.
Runtime.getRuntime().availableProcessors() - 1
일반적인 순차처리
for (int i = 0; i < dealmaxList.size(); i++) {
double sum = 0;
for (int j = 0; j < 1000000; j++) {
sum += Math.sqrt(j);
}
System.out.println("Starting " + Thread.currentThread().getName()
+ ", index=" + i + ", sum=" + sum + ", " + new Date());
}
약 1초 1.5ms
일반적인 병렬처리 예제
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < dealmaxList.size(); i++) {
final int index = i;
executor.submit(() -> {
double sum = 0;
for (int j = 0; j < 1000000; j++) {
sum += Math.sqrt(j); // CPU 부하 증가
}
System.out.println(Thread.currentThread().getName()
+ ", index=" + index + ", ended at " + new Date() + ", sum=" + sum);
});
}
executor.shutdown();
JAVA Parallel Stream 병렬처리
dealmaxList.parallelStream().forEach(index -> {
double sum = 0;
for (int j = 0; j < 1000000; j++) {
sum += Math.sqrt(j); // 복잡한 연산 작업
}
System.out.println("Starting " + Thread.currentThread().getName()
+ ", index=" + index + ", sum=" + sum + ", " + new Date());
});
결과를 확인해보면 Parallel Stream은 common fork join pool을 사용하게되는데, 1개의 프로세서 당 1 thread을 사용하도록 되어 있습니다. 저의 맥북은 10core이기때문에 10개의 thread를 생성할 수 있습니다.
Java 8이전에는 ExecutorService를 통해 갯수를 지정할 수 있었는데...?
그렇다면 Parallel Stream은 어떻게 pool의 크기를 갯수를 지정해야할까 ?
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");
여기서 또 궁금증은 이렇게 스레드를 많이 늘리는게 가장 효율적인지가 궁금할것 같아요.
Parallel Stream은 내부적으로 common ForkJoinPool을 사용하기 때문에 ForkJoinPool을 사용하는 다른 thread
에 영향을 줄 수 있으며, 반대로 영향을 받을 수 있어요. 따라서 실행환경의 성능에 따라 별도로 테스트를 하면서 적절하게
크기를 조절하는 것이 중요하다고 생각합니다.
기본적으로는 ExecutorService의 구현체이지만, 다른 점은 각 thread들이 개별 큐를 가지게 되며, 다음 그림의 B처럼 자신에게 아무런 task가 없으면 A의 task를 가져와 처리하게 됨으로써 CPU자원이 놀지 않고 최적의 성능을 낼 수 있게 됩니다.
Parallel Stream을 이용한다면,
고려해야 할 사항들
ForkJoinPool의 특성상 나누어지는 job은 균등하게 처리가 되어야 합니다.
Parallel Stream은 작업을 분할하기 위해 Spliterator의 trySplit()을 사용하게 되는데, 이 분할되는 작업의 단위가 균등하게 나누어져야 하며, 나누어지는 작업에 대한 비용이 높지 않아야 순차적 방식보다 효율적으로 이루어질 수 있습니다. array, arrayList와 같이 정확한 전체 사이즈를 알 수 있는 경우에는 분할 처리가 빠르고 비용이 적게 들지만, linkedList의 경우라면 별다른 효과를 찾기가 어렵습니다.
public Spliterator<T> trySplit() {
int lo = index, mid = (lo + fence) >>> 1;
return (lo >= mid)
? null
: new ArraySpliterator<>(array,
lo, index = mid,
characteristics);
}
또한, 병렬로 처리되는 작업이 독립적이지 않다면, 수행 성능에 영향이 있을 수 있습니다.
예를 들어, stream의 중간 단계 연산 중 sorted(), distinct()와 같은 작업을 수행하는 경우에는 내부적으로 상태에 대한 변수를 각 작업들이 공유(synchronized)하게 되어 있습니다. 이러한 경우에는 순차적으로 실행하는 경우가 더 효과적일 수 있습니다.
독립된 처리
ParallelStream 사용시 각 작업이 독립되어야 좋은 성능을 낼 수 있습니다.
stream() 연산 중 distinct(), sorted() 와 같은 작업들은 내부적으로 상태에 대한 변수를 공유하고 동기화하기에
병렬처리에 대한 이점을 살릴 수 없습니다.
테스트 시작
// 2. 병렬 처리 vs 순차 처리 (sorted() 성능 비교)
System.out.println("=== 병렬 스트림 sorted() ===");
long startParallelSorted = System.currentTimeMillis();
List<String> parallelSorted = arrayList.parallelStream()
.sorted()
.collect(Collectors.toList());
long endParallelSorted = System.currentTimeMillis();
System.out.println("병렬 스트림 sorted() 처리 시간: " + (endParallelSorted - startParallelSorted) + "ms");
System.out.println("=== 순차 스트림 sorted() ===");
long startSequentialSorted = System.currentTimeMillis();
List<String> sequentialSorted = arrayList.stream()
.sorted()
.collect(Collectors.toList());
long endSequentialSorted = System.currentTimeMillis();
System.out.println("순차 스트림 sorted() 처리 시간: " + (endSequentialSorted - startSequentialSorted) + "ms");
ArrayList 의 경우 특정 인덱스에 있는 요소에 접근하는 시간 복잡도는 O(1)입니다.
분할 시에도 특정 지점에서 데이터를 빠르게 나눌 수 있기 때문에 성능이 좋습니다.
하지만 LinkedList 의 경우 특정 인덱스에 접근하기 위해 순차적으로 노드를 탐색해야 하므로 시간 복잡도는 O(n)입니다.
즉, 인덱스에 따라 요소를 찾는 데 더 많은 시간이 걸리기 때문에 분할 성능이 떨어집니다.
ArrayList, IntStream.range, LongStream.range, TreeMap, HashSet 과 같은 데이터 타입은 분할하기에 용이합니다.
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class Main {
public static void main(String[] args) {
// 데이터 준비
List<Integer> arrayList = new ArrayList<>();
List<Integer> linkedList = new LinkedList<>();
int dataSize = 100000; // 데이터 크기 설정
for (int i = 0; i < dataSize; i++) {
arrayList.add(i);
linkedList.add(i);
}
// ArrayList 병렬 처리
System.out.println("=== ArrayList 병렬 처리 ===");
long startArrayList = System.currentTimeMillis();
arrayList.parallelStream().forEach(index -> performIndexAccess(arrayList));
long endArrayList = System.currentTimeMillis();
System.out.println("ArrayList 병렬 처리 시간: " + (endArrayList - startArrayList) + "ms");
// LinkedList 병렬 처리
System.out.println("=== LinkedList 병렬 처리 ===");
long startLinkedList = System.currentTimeMillis();
linkedList.parallelStream().forEach(index -> performIndexAccess(linkedList));
long endLinkedList = System.currentTimeMillis();
System.out.println("LinkedList 병렬 처리 시간: " + (endLinkedList - startLinkedList) + "ms");
}
// 인덱스를 반복적으로 접근하는 메서드
private static void performIndexAccess(List<Integer> list) {
int size = list.size();
int sum = 0;
for (int i = 0; i < 1000; i++) { // 1000번 반복
int index = i % size; // 유효한 인덱스를 계산
sum += list.get(index); // 인덱스를 통해 데이터 접근
}
}
}
성능차이가 어마어마하다 아무리 병렬 스트림을 쓴다고해도 데이터에 접근하는데 linkedlist는 O(N)이 들기 때문에 너무 ArrayList에 비해 너무 성능이 안좋게 나왔다. 오늘의 결론 - 상황에 맞게 적재적소하게 판단하는 것이 중요하다.
'JAVA' 카테고리의 다른 글
Effective java 정복기 2장 (0) | 2025.01.06 |
---|---|
스프링 첫요청이 처리되는데 오래 걸리는 이유 (0) | 2024.11.25 |