지금까지 스트림 인터페이스를 이용해 데이터 컬렉션을 선언형으로,
내부 반복으로 스트림 요소의 처리를 제어하는 것을 살펴보았다.
이번장에서는 순차스트림을 병렬 스트림을 바꾸는 것을 설명한다.
7.1 병렬 스트림
: 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
컬렉션에 parallelStream을 호출하면 병렬스트림이 생성된다.
ex) 숫자 n을 인수로 받아 1부터 n까지 모든 숫자의 합계를 반환하는 메서드를 구현해보자.
- 숫자로 이루어진 무한스트림-> 인수로 주어진 크기로 스트림을 제한->두 숫자를 더하는 리듀싱작업
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i+1) // 무한 자연수 스트림생성
.limit(n) // n개 이하로 제한
.reduce(0L, Long::sum); // 모든 숫자를 더하는 리듀싱 연산
}
//전통적 자바의 구현
public long iterativeSum(long n) {
long result = 0;
for(long i=1L; i<= n; i++){
result += i;
}
return result;
}
n이 커진다면 연산을 병렬 처리하는것이 좋겠다.
결과와 변수는 어떻게 동기화 할까?
몇개의 스레드를 사용하나? 숫자는 어떻게 생성? 생성된 숫자는 어떻게 더함?
1 - 순차 스트림을 병렬 스트림으로 변환하기
리듀싱 연산을 여러 청크에 병렬로 수행
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() //스트림을 병렬스트림으로 변환
.reduce(0L, Long::sum);
}
sequential 메서드로 병렬스트림을 순차스트림으로도 바꿀수 있음.
하지만 parallel과 sequential 두 메서드중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 준다.
|
병렬스트림에서 사용하는 스레드 풀설정 병렬스트림은 내부적으로 ForkJoinPool을 사용함. 기본적으로 ForkJoinPool은 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 가짐. 일반적으로 기기 프로세서 수와 같으므로 ForkJoinPool의 기본값 그대로 사용할 것을 권장. |
2 - 스트림 성능 측정
성능을 최적화 할때 기억해야 할 규칙 - 측정
JMH 라이브러리(Java Microbenchmark Harness , 어노테이션 방식 지원)를 이용해 벤치마크를 구현하자.
몇가지 의존성을 추가해 프로젝트에서 JMH를 사용할 수 있다.
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
</dependency>
//자바 아카이브 파일을 이용해 벤치마트를 편리하게 실행가능
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
n개의 숫자를 더하는 함수의 성능 측정
@BenchmarkMode(Mode.AverageTime) //대상메서드 실행시 걸린 평균시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 밀리초 단위로 출력
@Fork(2, jemArgs={"-Xms4G", "-Xmx4G"}) //4Gb 힙공간 에서 두번 수행
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark //대상메서드
public longsequentialSum() {
return Stream.iterate(1L, i -> i+1).limit(N)
.reduce(0L, Long::sum);
}
@TearDown // 가비지컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
컴파일하면 benchmarks.jar 두번째 파일을 만듬. 다음과 같이 실행
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
JMH는 기본적으로 20+20회 프로그램을 반복 실행.
전통적 for 루트를 사용하는 버전의 메소드로 바꾸어서 실행하고 결과를 비교해보면
순차적스트림을 사용하는것에 비해 4배 빠르다는 것을 확인할 수 있을것이다.
(더 저수준으로 동작하며, 기본값을 박싱하거나 언박싱할 필요가 없기 때문)
병렬스트림으로 바꾸어 확인해보면 5배가 느리다. 왜일까?
- 반복결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱 해야하기 때문
- 반복작업은 병렬로 수행할 수 있는 독립단위로 나누기가 어렵다.
- iterate연산은 청크로 분할하기 어렵다.
- 리듀싱 과정을 시작하는 시점에 리스트가 준비되지 않으므로 청크 분할 불가
- 결국 순차처리방식과 같으므로 스레드를 할당하는 오버헤드만 증가하게 됨.
더 특화된 메서드 사용
LongStream.rangeClosed 메서드
- 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라짐
- 쉽게 청크 분할 가능한 숫자범위 생산
함수형 프로그래밍을 올바르게 사용해야만 병렬실행의 장점을 얻을 수 있다.
- 병렬화를 사용하려면 스트림을 재귀적으로 분할, 각 서브스트림을 서로 다른 스레드 리듀싱연산으로 할당, 이들 결과를 값으로 합쳐야 함.
- 따라서 코어간 데이터 전송시간보다 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는게 낫다.
3 - 병렬스크림의 올바른 방법
잘못 사용하여 발생하는 많은 문제는
공유된 상태를 바꾸는 알고리즘을 사용하기 때문이다.
여러 스레드에서 동시에 누적자를 실행하면 문제가 발생.
따라서 병렬스트림과 병렬계산에서는 공유된 가변 상태를 피해야 한다.
4 - 병렬 스트림 효과적으로 사용하기
- 확신이 서지 않으면 직접 측정하라
- 박싱을 주의하라. 자동박싱과 언박싱은 성능을 저하시키는 요소다. 기본형 특화스트림을 사용하자.
- 순차스트림보다 병렬스트림에서 성능이 떨어지는 연산이 있다. (비정렬스트림에 사용하는 메서드 활용하자)
- 전체 파이프라인 연산 비용을 고려하자
- 소량의 데이터에는 병렬스트림이 소용없다.
- 스트림을 구성하는 자료구조가 적절한지 확인하자 ( 요소 전체를 탐색해야하는 LinkedList보다 요소 탐색하지 않고도 분할가능한 ArrayList)
- 파이프라인의 중간연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해과정의 성능이 달라질 수 있다.(filter 연산이 있으면 길이를 예측 불가 하므로 효과적으로 병렬처리 할수 있을기 알수없다.)
- 최종연산의 병합과정 비용을 살펴보자. 병합과정의 비용이 비싸다면 병렬스트림으로 얻은 이익이 상쇄됨.
스트림 소스와 분해성
| ArrayList | LinkedList | IntStream.range | Stream.iterate | HashSet | TreeSet |
| 훌륭함 | 나쁨 | 훌륭함 | 나쁨 | 좋음 | 좋음 |
7.2 포크/조인 프레임워크
포크 조임 프레임워크는 병렬화할 수 있는 작업을 재귀적으로
작은 작업으로 분할한 다음에
서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
서브테스크를 스레드 풀 (ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현.
1- RecursiveTask 활용
스레드 풀을 이용하려면 RecursiveTask<R> 의 서브클래스를 만들어야 한다.
R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을때 RecursiveAction형식이다.
RecursiveTask를 정의하려면 추상메서드 compute를 구현해야 함.
protected abstract R compute();
compute메서드는 태스크를 서브태스크로 분할하는 로직과
더이상 분할할 수 없을때 개별 서브태스크의 결과를 생산할 알고리즘을 정의
if( 태스크가 충분히 작거나 더 이상 분할할 수 없으면 ) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브테스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될때까지 기다림
각 서브태스크의 결과를 합침.
}
분할후정복 알고리즘의 병렬화 버전이다.
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> { // RecursiveTask 상속받아 태스크생성.
private final long[] numbers;
private final int start;
private final int end;
private static final long THRESHOLD = 10_000;//값 이하 서브태스크는 더이상 분할불가.
public ForkJoinSumCalculator(long[] numbers) { //메인테스크 생성시 사용할 공개생성자.
this(numbers, 0, numbers.length);
}
// 메인태스크의 서브태스크를 재귀적으로 만들때 사용할 비공개생성자.
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() { // RecursiveTask의 추상메서드 오버라이드
int length = end - start; // 이 태스크에서 더할 배열의 길이
if(length <= THRESHOLD){
return computeSequentially();
}
ForkJoinSumCalculator leftTask = //배열의 첫번째 절반 서브태스크
new ForkJoinSumCalculator(numbers, start, start+length/2);
leftTask.fork(); // 다른스레드로 비동기 실행
ForkJoinSumCalculator rightTask = //배열의 나머지 절반 서브태스크
new ForkJoinSumCalculator(numbers, start+length/2, end);
Long rightResult = rightTask.compute(); //두번째 태스크 동기 실행
Long leftResult = leftTask.join(); //첫번째 태스크 결과 기다려 받음.
return leftResult + rightResult; //결과 조합
}
//더 분할불가할때 서브태스크 결과를 계산하는 알고리즘
private long computeSequentially() {
long sum = 0;
for(int i=start; i<end; i++) {
sum += number[i];
}
return sum;
}
}
다음코드 처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘길수 있음.
public static long forkJoinSum(long n) {
long[] numbers = LongStream.range(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
2 - 포크/조인 프레임워크를 제대로 사용하는 방법
- join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될때까지 호출자를 블록시킴. 따라서 두 태스크가 모두 시작된 다음에 join을 호출해야함.
- RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용 X, 순차코드에서 병렬계산을 시작할때만 invoke 사용.
- 서브태스크에 fork 메서드를 호출해서 ForkJoinPool 일정을 조절.
- 포크/조인 프레임워크 병렬계산은 디버깅이 어려움.
- 멀티코어에 포크/조인 프레임워크를 사용하는것이 순차처리보다 무조건 빠르지 않을수 있음.
3 - 작업훔치기
이 예제보다 복잡한 시나리오가 사용되는 현실에는 각 서브태스크의 작업완료 시간이 크게 달라질수 있다.
분할 기법이 효율적이지 않을수도, 디스크접근속도가 저하되었을수도, 외부서비스와 협력중 지연이 생길수도.
: 포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 이 문제를 해결함.
이 기법에서는 ForkJoinPool 의 모든 스레드를 거의 공정하게 분할
- 각각의 스레드는 자신에게 할달된 태스크를 가져와 작업 처리
- 이때 한스레드가 먼저 작업이 끝날경우 이 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
- 모든 태스크가 작업을 끝날때까지 모든 큐가 빌때까지 이과정을 반복.
- 스레드간 작업부하를 비슷하나 수준으로 유지.
7.3 Spliterator 인터페이스 구현
자바8 에서 제공하는 Spliterator 인터페이스 : '분할할 수 있는 반복자' splitable iterator
Iterator처럼 소스의 요소 탐색 기능을 제공한다는 것은 같지만 병렬작업에 특화되어 있음.
컬렉션 프레임웤에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공함.
public interface Spliterator<T> { //T는 탐색하는 요소의 형식임.
boolean tryAdvance(Consumer<? super T> action);
// 요소를 순차소비하면서 탐색요소가 남아있으면 참을 반환.
Spliterator<T> trySplit();
//일부요소(자신이 반환한 요소)를 반환해 두번째 Spliterator 생성
long estimateSize();
//탐색해야할 요소 수 정보를 제공
int characteristics();
//Spliterartor 자체 특성집합을 포함하는 int 반환.
}
1 - 분할과정
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어남.
Spliterartor에 trySplit을 호출하면 두번째 Spliterartor가 생성
-> 두개의 Spliterartor에 trySplit를 다시 호출하면 네개의 Spliterartor가 생성
-> trySplit의 결과가 null이 될때까지 이 과정을 반복. 결과가 null이면 재귀분할 종료.
Spliterartor특성
characteristics 추상메서드 정의- 자체 특성집합을 포함하는 int 반환.
이들 특성을 참고해서 Spliterartor를 더 잘 제어하고 최적화 가능.
| ORDERED | 리스트처럼 요소에 정해진 순서가 있음 |
| DISTINCT | x,y 두요소를 방문했을때 x.equals(y)는 항상 false 반환 |
| SORTED | 탐색된 요소는 미리 정의된 정렬순서를 따름. |
| SIZED | 크기가 알려진 소스(Set)로 Spliterartor를 생성했으므로 정확한 값 반환 |
| NOT-NULL | 탐색하는 모든 요소는 null이 아님 |
| IMMUTABLE | 이 Spliterartor 소스는 불변, 요소를 탐색하는동안 수정 불가 |
| CONCURRENT | 동기화 없이 Spliterartor의 소스를 여러스레드에서 동시에 수정가능 |
| SUBSIZED | 이 Spliterartor 그리고 분할된 모든 Spliterartor는 SIZED특성을 가짐. |
2 - 커스텀 Spliterartor 구현하기
단어수를 세는 메서드를 구현해보자.
다음은 반복형으로 구현한 것이다.
public int countWordsIterarively(String s) {
int counter = 0;
boolean lastSpace = true;
for(char c : s.toCharArray()) {
if(Character.isWhitespace(c)) {
lastSpace = true;
}else{
if(lastSpace) counter++; //공백문자 만나면 탐색문자를 단어로 간주
lastSpace = false;
}
}
return counter;
}
final String SENTENCE =
"Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura " +
"ch la dritta via era smarrita ";
System.out.println("Found "+countWordsInteratively(SENTENCE)+ "words");
반복형 대신 함수형을 이용하면 직접 스레드를 동기화 않고도 병렬스트림으로 작업을 병렬화.
함수형으로 단어수를 세는 메서드 재구현
우선 String을 스트림으로 변환해야하지만 기본형만 제공하므로 Stream<Character>사용해야함.
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
이때 지금까지 발견한 단어 수를 계산하는 int변수와 마지막 문자가 공백이었는지 여부를 기억하는 Boolean변수 필요.
이들 변수상태를 캡슐화하는 새로운 클래스 WordCounter를 만들어야 함.
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace){
this.counter = counter;
this.lastSpace = lastSpace;
}
// 반복알고리즘처럼 문자열 문자를 하나씩 탐색
public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter, true);
}else{
return lastSpace ? new WordCounter(counter+1, false) : this;
//공백문자 만나면 단어수 증가시킴.
}
}
// 두 WordCounter의 counter 값을 더한다.
public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace);//마지막공백은 신경쓰지않음.
}
public int getCounter(){
return counter;
}
}
이제 문자스트림의 리듀싱 연산을 직관적으로 구현하자.
private int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
//이제 문자열로 만든 스트림으로 메서드 호출.
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found "+countWords(stream)+" words");
WordCounter 병렬로 수행하기
countWords(stream.parallel()) 로 병렬스트림 하면 원하는 숫자가 나오지 않는다.
문자열을 임의의 위치에서 나누다보니 하나의 단어를 둘로 계산할수 있기 때문이다.
문자열을 단어가 끝나는 위치에서만 분할하는 방법을 구현하는 문자 Spliterator 가 필요.
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string){
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action){
// 현재 인덱스에 해당하는 문자를 Consumer에 제공후 인덱스 증가.
action.accept(string.charAt(currentChar++));//현재문자 소비.
return currentChar < string.length(); //소비할문자가 남아있다면 true.
}
@Override
public Spliterator<Character> trySplit(){
//반복될 자료구조를 분할하는 로직 포함. 가장 중요.
int currentSize = string.lenth() - currentChar;
if (currentSize <10){
return null; //파싱문자열이 충분히 작아져서 null리턴.
}
for(int spliltPos = currenSize/2 + currentChar; splitPos < string.length(); splitPos++){
//다음 공백이 나올때까지 분할 위치를 뒤로 이동.
if(Character.isWhitespace(string.charAt(splitPos))){
// 처음부터 분할위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 생성.
Spliterator<Charactre> apliterator =
new WordCounterSpliterator(string.substring(currentChar, splitPos));
currentChar = splitPos;//이 WordCounterSpliterator의 시작위치를 분할위치로 지정.
return spliterator; // 공백을 찾아 문자열 분리했으므로 루프 종료.
}
}
return null;
}
@Override
public long estimateSize(){
// 탐색해야할 요소의 개수는 파싱할 문자열의 전체길이에서 현재 반복중인 위치를 뺀것.
return string.length() - currentChar;
}
@Override
public int characteristics(){
// 특성을 알려줌.
return ORDERED + SIZED + SUBSIZED + NON_NULL + IMMUTABLE;
}
}
WordCounterSpliterator 활용
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
StreamSupport.stream 팩토리 메서드로 전달한 두번째 불리언 인수는 병렬스트림 생성여부를 지시한다.
특히 Spliterator는 첫번째 탐색, 첫번째 분할, 첫번째 예상크기 요청시점에 요소의 소스를 바인딩 할 수 있음.- 늦은바인딩
- 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리 가능.
- 항상 병렬처리가 빠른것은 아니므로 성능을 직접 측정해봐야 함.
- 병렬스트림은 특히 처리해야할 데이터가 아주많거나 각 요소를 처리하는데 오랜시간이 걸릴경우 성능을 높임.
- 가능하면 기본형 특화스트림 사용. 올바른 자료구조 선택하는것이 병렬처리 성능에 영향을 줌.
- 포크/조인 프레임워크에서는 태스크를 작은 태스크로 분할 후 각각의 스레드로 실행하고 합쳐 최종결과를 생산.
- Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할것인지 정의.
'IT Book Summary > ModernJavaInAction' 카테고리의 다른 글
| Chapter 9 리팩터링, 테스팅, 디버깅 (0) | 2020.04.05 |
|---|---|
| Part 3 스트림과 람다를 이용한 효과적 프로그래밍 - Chapter 8 컬렉션API 개선 (0) | 2020.03.29 |
| Chapter6 스트림으로 데이터 수집 (2) | 2020.03.17 |
| Chapter5 스트림 활용 (0) | 2020.03.11 |
| Part 2 함수형 데이터 처리 - Chapter 4 스트림 소개 (3) | 2020.03.02 |