지금까지 스트림 인터페이스를 이용해 데이터 컬렉션을 선언형으로,
내부 반복으로 스트림 요소의 처리를 제어하는 것을 살펴보았다.
이번장에서는 순차스트림을 병렬 스트림을 바꾸는 것을 설명한다.
7.1 병렬 스트림
: 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
컬렉션에 parallelStream을 호출하면 병렬스트림이 생성된다.
ex) 숫자 n을 인수로 받아 1부터 n까지 모든 숫자의 합계를 반환하는 메서드를 구현해보자.
- 숫자로 이루어진 무한스트림-> 인수로 주어진 크기로 스트림을 제한->두 숫자를 더하는 리듀싱작업
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i+1) // 무한 자연수 스트림생성
.limit(n) // n개 이하로 제한
.reduce(0L, Long::sum); // 모든 숫자를 더하는 리듀싱 연산
}
//전통적 자바의 구현
public long iterativeSum(long n) {
long result = 0;
for(long i=1L; i<= n; i++){
result += i;
}
return result;
}
n이 커진다면 연산을 병렬 처리하는것이 좋겠다.
결과와 변수는 어떻게 동기화 할까?
몇개의 스레드를 사용하나? 숫자는 어떻게 생성? 생성된 숫자는 어떻게 더함?
1 - 순차 스트림을 병렬 스트림으로 변환하기
리듀싱 연산을 여러 청크에 병렬로 수행
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() //스트림을 병렬스트림으로 변환
.reduce(0L, Long::sum);
}
sequential 메서드로 병렬스트림을 순차스트림으로도 바꿀수 있음.
하지만 parallel과 sequential 두 메서드중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 준다.
병렬스트림에서 사용하는 스레드 풀설정 병렬스트림은 내부적으로 ForkJoinPool을 사용함. 기본적으로 ForkJoinPool은 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 가짐. 일반적으로 기기 프로세서 수와 같으므로 ForkJoinPool의 기본값 그대로 사용할 것을 권장. |
2 - 스트림 성능 측정
성능을 최적화 할때 기억해야 할 규칙 - 측정
JMH 라이브러리(Java Microbenchmark Harness , 어노테이션 방식 지원)를 이용해 벤치마크를 구현하자.
몇가지 의존성을 추가해 프로젝트에서 JMH를 사용할 수 있다.
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
</dependency>
//자바 아카이브 파일을 이용해 벤치마트를 편리하게 실행가능
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
n개의 숫자를 더하는 함수의 성능 측정
@BenchmarkMode(Mode.AverageTime) //대상메서드 실행시 걸린 평균시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 밀리초 단위로 출력
@Fork(2, jemArgs={"-Xms4G", "-Xmx4G"}) //4Gb 힙공간 에서 두번 수행
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark //대상메서드
public longsequentialSum() {
return Stream.iterate(1L, i -> i+1).limit(N)
.reduce(0L, Long::sum);
}
@TearDown // 가비지컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
컴파일하면 benchmarks.jar 두번째 파일을 만듬. 다음과 같이 실행
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
JMH는 기본적으로 20+20회 프로그램을 반복 실행.
전통적 for 루트를 사용하는 버전의 메소드로 바꾸어서 실행하고 결과를 비교해보면
순차적스트림을 사용하는것에 비해 4배 빠르다는 것을 확인할 수 있을것이다.
(더 저수준으로 동작하며, 기본값을 박싱하거나 언박싱할 필요가 없기 때문)
병렬스트림으로 바꾸어 확인해보면 5배가 느리다. 왜일까?
- 반복결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱 해야하기 때문
- 반복작업은 병렬로 수행할 수 있는 독립단위로 나누기가 어렵다.
- iterate연산은 청크로 분할하기 어렵다.
- 리듀싱 과정을 시작하는 시점에 리스트가 준비되지 않으므로 청크 분할 불가
- 결국 순차처리방식과 같으므로 스레드를 할당하는 오버헤드만 증가하게 됨.
더 특화된 메서드 사용
LongStream.rangeClosed 메서드
- 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라짐
- 쉽게 청크 분할 가능한 숫자범위 생산
함수형 프로그래밍을 올바르게 사용해야만 병렬실행의 장점을 얻을 수 있다.
- 병렬화를 사용하려면 스트림을 재귀적으로 분할, 각 서브스트림을 서로 다른 스레드 리듀싱연산으로 할당, 이들 결과를 값으로 합쳐야 함.
- 따라서 코어간 데이터 전송시간보다 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는게 낫다.
3 - 병렬스크림의 올바른 방법
잘못 사용하여 발생하는 많은 문제는
공유된 상태를 바꾸는 알고리즘을 사용하기 때문이다.
여러 스레드에서 동시에 누적자를 실행하면 문제가 발생.
따라서 병렬스트림과 병렬계산에서는 공유된 가변 상태를 피해야 한다.
4 - 병렬 스트림 효과적으로 사용하기
- 확신이 서지 않으면 직접 측정하라
- 박싱을 주의하라. 자동박싱과 언박싱은 성능을 저하시키는 요소다. 기본형 특화스트림을 사용하자.
- 순차스트림보다 병렬스트림에서 성능이 떨어지는 연산이 있다. (비정렬스트림에 사용하는 메서드 활용하자)
- 전체 파이프라인 연산 비용을 고려하자
- 소량의 데이터에는 병렬스트림이 소용없다.
- 스트림을 구성하는 자료구조가 적절한지 확인하자 ( 요소 전체를 탐색해야하는 LinkedList보다 요소 탐색하지 않고도 분할가능한 ArrayList)
- 파이프라인의 중간연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해과정의 성능이 달라질 수 있다.(filter 연산이 있으면 길이를 예측 불가 하므로 효과적으로 병렬처리 할수 있을기 알수없다.)
- 최종연산의 병합과정 비용을 살펴보자. 병합과정의 비용이 비싸다면 병렬스트림으로 얻은 이익이 상쇄됨.
스트림 소스와 분해성
ArrayList | LinkedList | IntStream.range | Stream.iterate | HashSet | TreeSet |
훌륭함 | 나쁨 | 훌륭함 | 나쁨 | 좋음 | 좋음 |
7.2 포크/조인 프레임워크
포크 조임 프레임워크는 병렬화할 수 있는 작업을 재귀적으로
작은 작업으로 분할한 다음에
서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
서브테스크를 스레드 풀 (ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현.
1- RecursiveTask 활용
스레드 풀을 이용하려면 RecursiveTask<R> 의 서브클래스를 만들어야 한다.
R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을때 RecursiveAction형식이다.
RecursiveTask를 정의하려면 추상메서드 compute를 구현해야 함.
protected abstract R compute();
compute메서드는 태스크를 서브태스크로 분할하는 로직과
더이상 분할할 수 없을때 개별 서브태스크의 결과를 생산할 알고리즘을 정의
if( 태스크가 충분히 작거나 더 이상 분할할 수 없으면 ) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브테스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될때까지 기다림
각 서브태스크의 결과를 합침.
}
분할후정복 알고리즘의 병렬화 버전이다.
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> { // RecursiveTask 상속받아 태스크생성.
private final long[] numbers;
private final int start;
private final int end;
private static final long THRESHOLD = 10_000;//값 이하 서브태스크는 더이상 분할불가.
public ForkJoinSumCalculator(long[] numbers) { //메인테스크 생성시 사용할 공개생성자.
this(numbers, 0, numbers.length);
}
// 메인태스크의 서브태스크를 재귀적으로 만들때 사용할 비공개생성자.
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() { // RecursiveTask의 추상메서드 오버라이드
int length = end - start; // 이 태스크에서 더할 배열의 길이
if(length <= THRESHOLD){
return computeSequentially();
}
ForkJoinSumCalculator leftTask = //배열의 첫번째 절반 서브태스크
new ForkJoinSumCalculator(numbers, start, start+length/2);
leftTask.fork(); // 다른스레드로 비동기 실행
ForkJoinSumCalculator rightTask = //배열의 나머지 절반 서브태스크
new ForkJoinSumCalculator(numbers, start+length/2, end);
Long rightResult = rightTask.compute(); //두번째 태스크 동기 실행
Long leftResult = leftTask.join(); //첫번째 태스크 결과 기다려 받음.
return leftResult + rightResult; //결과 조합
}
//더 분할불가할때 서브태스크 결과를 계산하는 알고리즘
private long computeSequentially() {
long sum = 0;
for(int i=start; i<end; i++) {
sum += number[i];
}
return sum;
}
}
다음코드 처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘길수 있음.
public static long forkJoinSum(long n) {
long[] numbers = LongStream.range(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
2 - 포크/조인 프레임워크를 제대로 사용하는 방법
- join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될때까지 호출자를 블록시킴. 따라서 두 태스크가 모두 시작된 다음에 join을 호출해야함.
- RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용 X, 순차코드에서 병렬계산을 시작할때만 invoke 사용.
- 서브태스크에 fork 메서드를 호출해서 ForkJoinPool 일정을 조절.
- 포크/조인 프레임워크 병렬계산은 디버깅이 어려움.
- 멀티코어에 포크/조인 프레임워크를 사용하는것이 순차처리보다 무조건 빠르지 않을수 있음.
3 - 작업훔치기
이 예제보다 복잡한 시나리오가 사용되는 현실에는 각 서브태스크의 작업완료 시간이 크게 달라질수 있다.
분할 기법이 효율적이지 않을수도, 디스크접근속도가 저하되었을수도, 외부서비스와 협력중 지연이 생길수도.
: 포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 이 문제를 해결함.
이 기법에서는 ForkJoinPool 의 모든 스레드를 거의 공정하게 분할
- 각각의 스레드는 자신에게 할달된 태스크를 가져와 작업 처리
- 이때 한스레드가 먼저 작업이 끝날경우 이 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
- 모든 태스크가 작업을 끝날때까지 모든 큐가 빌때까지 이과정을 반복.
- 스레드간 작업부하를 비슷하나 수준으로 유지.
7.3 Spliterator 인터페이스 구현
자바8 에서 제공하는 Spliterator 인터페이스 : '분할할 수 있는 반복자' splitable iterator
Iterator처럼 소스의 요소 탐색 기능을 제공한다는 것은 같지만 병렬작업에 특화되어 있음.
컬렉션 프레임웤에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공함.
public interface Spliterator<T> { //T는 탐색하는 요소의 형식임.
boolean tryAdvance(Consumer<? super T> action);
// 요소를 순차소비하면서 탐색요소가 남아있으면 참을 반환.
Spliterator<T> trySplit();
//일부요소(자신이 반환한 요소)를 반환해 두번째 Spliterator 생성
long estimateSize();
//탐색해야할 요소 수 정보를 제공
int characteristics();
//Spliterartor 자체 특성집합을 포함하는 int 반환.
}
1 - 분할과정
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어남.
Spliterartor에 trySplit을 호출하면 두번째 Spliterartor가 생성
-> 두개의 Spliterartor에 trySplit를 다시 호출하면 네개의 Spliterartor가 생성
-> trySplit의 결과가 null이 될때까지 이 과정을 반복. 결과가 null이면 재귀분할 종료.
Spliterartor특성
characteristics 추상메서드 정의- 자체 특성집합을 포함하는 int 반환.
이들 특성을 참고해서 Spliterartor를 더 잘 제어하고 최적화 가능.
ORDERED | 리스트처럼 요소에 정해진 순서가 있음 |
DISTINCT | x,y 두요소를 방문했을때 x.equals(y)는 항상 false 반환 |
SORTED | 탐색된 요소는 미리 정의된 정렬순서를 따름. |
SIZED | 크기가 알려진 소스(Set)로 Spliterartor를 생성했으므로 정확한 값 반환 |
NOT-NULL | 탐색하는 모든 요소는 null이 아님 |
IMMUTABLE | 이 Spliterartor 소스는 불변, 요소를 탐색하는동안 수정 불가 |
CONCURRENT | 동기화 없이 Spliterartor의 소스를 여러스레드에서 동시에 수정가능 |
SUBSIZED | 이 Spliterartor 그리고 분할된 모든 Spliterartor는 SIZED특성을 가짐. |
2 - 커스텀 Spliterartor 구현하기
단어수를 세는 메서드를 구현해보자.
다음은 반복형으로 구현한 것이다.
public int countWordsIterarively(String s) {
int counter = 0;
boolean lastSpace = true;
for(char c : s.toCharArray()) {
if(Character.isWhitespace(c)) {
lastSpace = true;
}else{
if(lastSpace) counter++; //공백문자 만나면 탐색문자를 단어로 간주
lastSpace = false;
}
}
return counter;
}
final String SENTENCE =
"Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura " +
"ch la dritta via era smarrita ";
System.out.println("Found "+countWordsInteratively(SENTENCE)+ "words");
반복형 대신 함수형을 이용하면 직접 스레드를 동기화 않고도 병렬스트림으로 작업을 병렬화.
함수형으로 단어수를 세는 메서드 재구현
우선 String을 스트림으로 변환해야하지만 기본형만 제공하므로 Stream<Character>사용해야함.
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
이때 지금까지 발견한 단어 수를 계산하는 int변수와 마지막 문자가 공백이었는지 여부를 기억하는 Boolean변수 필요.
이들 변수상태를 캡슐화하는 새로운 클래스 WordCounter를 만들어야 함.
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace){
this.counter = counter;
this.lastSpace = lastSpace;
}
// 반복알고리즘처럼 문자열 문자를 하나씩 탐색
public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter, true);
}else{
return lastSpace ? new WordCounter(counter+1, false) : this;
//공백문자 만나면 단어수 증가시킴.
}
}
// 두 WordCounter의 counter 값을 더한다.
public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace);//마지막공백은 신경쓰지않음.
}
public int getCounter(){
return counter;
}
}
이제 문자스트림의 리듀싱 연산을 직관적으로 구현하자.
private int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
//이제 문자열로 만든 스트림으로 메서드 호출.
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found "+countWords(stream)+" words");
WordCounter 병렬로 수행하기
countWords(stream.parallel()) 로 병렬스트림 하면 원하는 숫자가 나오지 않는다.
문자열을 임의의 위치에서 나누다보니 하나의 단어를 둘로 계산할수 있기 때문이다.
문자열을 단어가 끝나는 위치에서만 분할하는 방법을 구현하는 문자 Spliterator 가 필요.
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string){
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action){
// 현재 인덱스에 해당하는 문자를 Consumer에 제공후 인덱스 증가.
action.accept(string.charAt(currentChar++));//현재문자 소비.
return currentChar < string.length(); //소비할문자가 남아있다면 true.
}
@Override
public Spliterator<Character> trySplit(){
//반복될 자료구조를 분할하는 로직 포함. 가장 중요.
int currentSize = string.lenth() - currentChar;
if (currentSize <10){
return null; //파싱문자열이 충분히 작아져서 null리턴.
}
for(int spliltPos = currenSize/2 + currentChar; splitPos < string.length(); splitPos++){
//다음 공백이 나올때까지 분할 위치를 뒤로 이동.
if(Character.isWhitespace(string.charAt(splitPos))){
// 처음부터 분할위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 생성.
Spliterator<Charactre> apliterator =
new WordCounterSpliterator(string.substring(currentChar, splitPos));
currentChar = splitPos;//이 WordCounterSpliterator의 시작위치를 분할위치로 지정.
return spliterator; // 공백을 찾아 문자열 분리했으므로 루프 종료.
}
}
return null;
}
@Override
public long estimateSize(){
// 탐색해야할 요소의 개수는 파싱할 문자열의 전체길이에서 현재 반복중인 위치를 뺀것.
return string.length() - currentChar;
}
@Override
public int characteristics(){
// 특성을 알려줌.
return ORDERED + SIZED + SUBSIZED + NON_NULL + IMMUTABLE;
}
}
WordCounterSpliterator 활용
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
StreamSupport.stream 팩토리 메서드로 전달한 두번째 불리언 인수는 병렬스트림 생성여부를 지시한다.
특히 Spliterator는 첫번째 탐색, 첫번째 분할, 첫번째 예상크기 요청시점에 요소의 소스를 바인딩 할 수 있음.- 늦은바인딩
- 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리 가능.
- 항상 병렬처리가 빠른것은 아니므로 성능을 직접 측정해봐야 함.
- 병렬스트림은 특히 처리해야할 데이터가 아주많거나 각 요소를 처리하는데 오랜시간이 걸릴경우 성능을 높임.
- 가능하면 기본형 특화스트림 사용. 올바른 자료구조 선택하는것이 병렬처리 성능에 영향을 줌.
- 포크/조인 프레임워크에서는 태스크를 작은 태스크로 분할 후 각각의 스레드로 실행하고 합쳐 최종결과를 생산.
- Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할것인지 정의.
'IT Book Summary > ModernJavaInAction' 카테고리의 다른 글
Chapter 9 리팩터링, 테스팅, 디버깅 (0) | 2020.04.05 |
---|---|
Part 3 스트림과 람다를 이용한 효과적 프로그래밍 - Chapter 8 컬렉션API 개선 (0) | 2020.03.29 |
Chapter6 스트림으로 데이터 수집 (2) | 2020.03.17 |
Chapter5 스트림 활용 (0) | 2020.03.11 |
Part 2 함수형 데이터 처리 - Chapter 4 스트림 소개 (3) | 2020.03.02 |