본문 바로가기

IT Book Summary/ModernJavaInAction

Chapter 15 CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

최근 개발방법을 획기적으로 변화하게 만든 두가지 추세가 있다.

 

하나는 병렬실행 두번째는 마이크로서비스 아키텍처 이다.

 

멀티코어 프로세서가 발전하면서 애플리케이션 속도

멀티코어 프로세서를 활용할 수 있도록 소프트웨어를 개발하는가에 따라 달라질 수 있다.

 

한개의 큰 태스크를 개별 하위태스크로 분리해 병렬로 실행할 수 있고, 

포크/조인 프레임워크 나 병렬스트림으로 병렬실행을 달성할 수 있다.

 

마이크로서비스로 인해 서비스가 작아진 대신 네트워크 통신이 증가했다.

공개 API를 통해 더 많은 인터넷 서비스를 접할수 있게 되었고,

독립적으로 동작하는 웹사이트나 네트워크 애플리케이션을 찾아보기 힘들다.

앞으로는 다양한 소스의 콘텐츠를 가져와 합쳐서 만드는 메시업 mashup 형태의 웹 애플리케이션이 많이 등장할 것이다.

 

여러 웹서비스에 접근해서 데이터를 가져오며 기다리는 동안 다른 웹서비스 데이터를 처리하려면 어떻게 해야할까?

동시성을 필요로 하는 상황에서 연관된 작업을 같은 CPU에서 동작하도록 하려면?

 

동시성은 단일 코어 머신에서 발생할 수 있는 속성으로 실행이 겹칠수 있으나

병렬성은 병렬 실행을 하드웨어 수준에서 지원한다.

 


15.1 동시성을 구현하는 자바 지원의 진화

 

자바 초반 Runnable과 Thread를 동기화된 클래스와 메서드 잠금을 이용했다.

이후 스레드 실행과 태스크 제출을 분리하는 ExecutorService 인터페이스,

Runnable, Thread의 변형을 반환하는 Callable<T>, Future<T>, 제네릭 등을 지원했다.

자바 7에서는 포크/조인 프레임워크를 이용하거나 

자바8에서는 스트림과 새로 추가된 람다에 기반한 병렬 프로세싱이 추가되었다.

 

자바9에서는 분산 비동기 프로그래밍을 지원하는데, 매쉬업 어플리케이션을 개발하는데 기초 모델과 툴킷을 제공한다.

이 과정을 리액티브 프로그래밍이라 부른다. 궁극적인 목표는 동시에 실행할 수 있는 독립적인 태스크를 가능하게 만들며, 멀티코어 또는 기기를 통해 제공되는 병렬성을 쉽게 이용하는 것이다.

 

1 -  스레드와 높은 수준의 추상화

프로세스는 운영체제에 한 개 이상의 스레드

즉, 본인이 가진 프로세스와 같은 주소 공간을 공유하는 프로세스를 요청해 태스크를 동시에 또는 협력적으로 실행할 수 있다.

각 코어는 한개 이상의 프로세스나 스레드에 할당될 수 있지만 프로그램이 스레드를 사용하지 않으면 효율을 고려해 여러 프로세서 코어 중 한개만을 사용할 것이다.

 

병렬 스트림 반복은 명시적 스레드 사용보다 높은 수준의 개념이다.

스트림을 이용해 스레드 사용패턴을 추상화 할 수 있다.

쓸모없는 코드가 내부로 구현되면서 복잡성도 줄어든다.

 

2 - Executor와 스레드 풀

스레드의 문제

자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 크래시될수 있어 기존 스레드가 실행되는 상태에서 계속 새로 만드는 상황을 주의해야한다.

 

스레드풀 그리고 스레드 풀이 더 좋은 이유

자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공함.

newFixedThreadPool 같은 팩토리 메서드 이용해 스레드풀 만들수 있다.

 

ExecutorService newFixedThreadPool(int nThreads)

 

스레드 풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행하고 풀로 반환한다.

이방식은 다양한 설정을 할 수 있고, 하드웨어 수에 맞는 태스크 수를 유지하고 오버헤드없이 스레드 풀에 제출할 수있다.

프로그래서가 태스크(Runnable 이나 Callable) 제공하면 스레드가 이를 실행한다.

 

스레드 풀 그리고 스레드 풀이 나쁜 이유

주의해야할 두가지

  1. 스레드 수가 k로 정해진경우 초과 제출된 테스크는 큐에 저장되어 기다리는데 이때 스레드에서 실행중인 테스크가 IO요청을 기다리면서 블록 상황이 생기면 스레드 수가 줄게 된다. 블록 할 수 있는 태스크는 스레드 풀에 제출ㄹ하지 말아야 한다.
  2. 프로그램을 종료하기 전에 모든 스레드 풀을 종료하는 습관이 중요하다. 

 

3 - 스레드의 다른 추상화 :  중첩되지 않은 메서드 호출

7장에서 설명한 동시성은 

태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다렸다.

15장에서는 사용자의 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나 계속 실행되는 동시성 형태이다

 

엄격한 포크/조인 vs 여유로운 포크/조인

 

4 - 스레드에 무엇을 바라는가?

모든 하드웨어 스레드를 활용해 병렬성의 장점을 극대화하도록 프로그램 구조를 만드는 것,

프로그램을 작은 태스크 단위로 구조화 하는것이 목표다.


15.2 동기 API 와 비동기 API

 

외부반복(명시적 for 루프) 을 내부반복(스트림 메서드 사용)으로 바꿔야 한다.

루프가 실행될때 런타임 시스템은 사용할 수 있는 스레드를 더 정확하게 알고 있다는게 내부반복의 장점이다.

 

ex) 다음과 같은 시그니처를 갖는 f,g 두 메서드의 호출을 합하는 예제

int f(int x);

int g(int x);

 

별도의 스레드로 f와 g를 실행해 구현했지만 코드가 복잡해졌다.

class ThreadExample {
    public static void main(String[] args) throws InterruptedException {
        int x = 1337;
        Result result = new Result();
        Thread t1 = new Thread(() -> { result.left = f(x); } );
        Thread t2 = new Thread(() -> { result.right = g(x); });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(result.left + result.right);
    }
    
    private static class Result {
        private int left;
        private int right;
    }
}

 

Runnable 대시 Future API 인터페이스를 이용해 코드를 단순화 할 수 있다.

public class ExecutorServiceExample {
    public static void main(String[] args)
    throws ExecutionException, InterruptedException {
        int x = 1337;
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> y = executorService.submit(() -> f(x));
        Future<Integer> z = executorService.submit(() -> g(x));
        System.out.println(y.get() + z.get());
        executorService.shutdown();
    }
}

하지만 submit 메서드 호출같은 불필요한 메서드 처리가 남았다.

 

1 - Future 형식 API

시그니처를 바꾸고

Future<Integer> f(int x);

Future<Integer> g(int x);

 

호출을 바꿀수 있다.

Future<Integer> y = f(x);

Future<Integer> z = g(x);

System.out.println(y.get() + z.get());

 

2 - 리액티브 형식 API

 

f,g 의 시그니처를 바꿔서 콜백형식 프로그래밍  

void f(int x, IntConsumer dealWithResult);

 

결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것이 비결

public class CallbackStyleExample {
    public static void main(String[] args) {
        int x = 1337;
        Result result = new Result();
        f(x, (int y) -> {
            result.left = y;
            System.out.println((result.left + result.right));
        } );
        g(x, (int z) -> {
            result.right = z;
            System.out.println((result.left + result.right));
        });
    }
}

 

- 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는것이 적절

 

3 - 잠자기(그리고 기타 블로킹 동작)는 해로운 것으로 간주

스레드는 sleep()메서드로 잠들어도 여전히 시스템자원을 점유한다.

블록동작은 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작과 외부 상호작용을 기다리는 동작 두가지로 구분.

-> 태스크 앞과 뒤 두 부분으로 나누고 블록되지 않을때만 뒷부분을 자바가 스케줄링하도록 요청

 

태스크를 블록하는것보다는 다음 작업을 태스크로 제출하고 현재 태스크는 종료하는것이 좋다.

스레드에는 제한이 있고 저렴하지 않으므로 잠을 자거나 블록해야하는 여러 태스크가 있을때 가능하면 다음의 형식을 따르자.

public class ScheduledExecutorServiceExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService
        = Executors.newScheduledThreadPool(1);
        work1();
        scheduledExecutorService.schedule(
        ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
    }
    public static void work1(){
        System.out.println("Hello from Work1!");
    }
    public static void work2(){
        System.out.println("Hello from Work2!");
    }
}

 

 

4 - 현실성 확인

시스템을 많은 작은 동시 실행되는 태스크로 설계해서 블록할 수 있는 모든 동작을 비동기 호출로 구현한다면 병렬 하드웨어를 최대한 활용할 수 있다. 하지만 현식적으로 '모든것은 비동기' 설계원칙을 어겨야 한다.

 

개선된 동시성 API 사용을 권장한다.

 

네트워크 서버의 블록/비블록 API를 일관적으로 제공하는 Netty같은 라이브러리 사용도 도움이 된다.

 

5 - 비동기 API에서 예외는 어떻게 처리하는가?

Futrue나 리액티브 형식의 비동기 API에서 호출된 메서드의 실제 바디는 별도의 스레드에서 호출되며 

이때 발생하는 에러는 실행범위와 관계없는 상황이 되버린다.

 

Future를 구현한 CompletableFuture에서는 런타임 get() 메서드에 예외를 처리할 수 있는 기능을 제공하고

예외에서 회복할 수 있게 exceptionally() 메서드도 제공한다.

 

리액티브 형식의 비동기 API의 경우 호출된 콜백에서 예외발생시 실행할 추가 콜백도 만들어주자.

void f(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);

 

콜백이 여러개면 이를 따로 제공하는것보다 한 객체로 이 메서드를 감싸는것이 좋다.

자바9 API에서 여러 콜백을 한 객체 Subscriber<T>클래스 를 만들었다.

void onComplete()
void onError(Throwable throwable)
void onNext(T item) 

 

바뀐 시그니처

void f(int x, Subscriber<Integer> s);


15.3 박스와 채널 모델

박스와 채널모델로 생각과 코드를 구조화 할 수 있고, 대규모 시스템 구현의 추상화 수준을 높일 수 있다.

 

박스와 채널 다이어그램

p 함수에 인수 x 를 이용해 호출하고 그 결과를 q1과 q2에 전달하며 다시 이 두 호출의 결과로 함수 r을 호출한다음 결과 출력.

 

// 하드웨어 병렬성과 거리가 먼 코드
int t = p(x);
System.out.println( r(q1(t), q2(t)) );

// Future를 이용해 f,g를 병렬로 평가하는 방법
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(),a2.get()));

 

많은 태스크가 get() 메서드를 호출해 Future가 끝나기를 기다리는 상태에 놓일수 있다.

시스템 구조가 얼마나 많은 수의 get()을 감당할 수 있을지 예측하기 어려움.

자바8에서는 CompletableFuture와 콤비네이터를 이용해 해결할 수 있다.

 

두 Function이 있을때 compose(), andThen() 등을 이용해 다른 Function을 얻을수 있다.

Function p, q1, q2, BiFunction r로 간단하게 구현

p.thenBoth(q1,q2).thenCombine(r)
// 안타깝게 thenBoth, thenCombine은 자바 Function, BiFunction 클래스의 일부가 아님 

 

 


15.4 CompletableFuture와 콤비네이터를 이용한 동시성

 

일반적으로 Future는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다.

CompletableFuture는 실행할 코드 없이 Future를 만들 수 있게 허용하며 complete() 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 값을 얻을 수 있게 허용한다.

 

public class CFComplete {
    public static void main(String[] args)
    throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        CompletableFuture<Integer> a = new CompletableFuture<>();
        executorService.submit(() -> a.complete(f(x)));
        int b = g(x);
        System.out.println(a.get() + b);
        executorService.shutdown();
    }
}

// 또는 다음과 같이 구현
public class CFComplete {
    public static void main(String[] args)
    throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        CompletableFuture<Integer> a = new CompletableFuture<>();
        executorService.submit(() -> b.complete(g(x)));
        int a = f(x);
        System.out.println(a + b.get());
        executorService.shutdown();
    }
}

 

하지만 f(x)나 g(x)의 실행이 끝나지 않으면 get()을 기다려야 하므로 프로세싱 자원 낭비할 수 있다.

 

CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T, U, V> fn)

 

두개 CompletableFuture 값을 받아 새 값을 만든다

thenComine 메서드를 사용해 두 연산 결과를 효과적으로 연결

처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록하지 않은 상태로 결과 Future를 반환.

 

public class CFCombine {
    public static void main(String[] args) throws ExecutionException,
                            InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        CompletableFuture<Integer> a = new CompletableFuture<>();
        CompletableFuture<Integer> b = new CompletableFuture<>();
        CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);
        executorService.submit(() -> a.complete(f(x)));
        executorService.submit(() -> b.complete(g(x)));
        System.out.println(c.get());
        executorService.shutdown();
    }
}

 

세 연산의 타이밍 다이어그램

 


15.5 발생-구독 그리고 리액티브 프로그래밍

Future와 CompletableFuture는 독립적 실행과 병렬성이라는 정식적 모델에 기반한다.

따라서 Future는 한번만 실행해 결과를 제공.

 

스트림은 선형적 파이프라인 처리 기법에 알맞다.

 

자바9 에서는 java.util.concurrent.Flow 의 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다.

  • 구독자가 구독할 수 있는 발행자
  • 이 연결을 구독 subscription 이라 한다.
  • 이 연결을 이용해 메세지를 전송

발행자-구독자 모델

 

1 - 두 플로를 합치는 예제

ex) 두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예

수식을 포함하는 스프레드 시트의 셀에서 흔히 제공하는 동작

"=C1+C2" 공식을 포함하는 스프레드시트 셀 C3

 

private class SimpleCell {
    private int value = 0;
    private String name;
    public SimpleCell(String name) {
        this.name = name;
    }
}

c1,c2에 이벤트가 발생했을때 c3를 구독하도록 하는 인터페이스 Publisher<T>

 

// 통신할 수독자 수를 인수로 받는다
interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}
// 정보 전달할 단순메서드 포함
interface Subscriber<T> {
    void onNext(T t);
}

 

Cell 은 Publisher이면서 Subscriber이다

 

private class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
 private int value = 0;
 private String name;
 private List<Subscriber> subscribers = new ArrayList<>();

 public SimpleCell(String name) {
  this.name = name;
 }
 @Override
 public void subscribe(Subscriber<? super Integer> subscriber) {
  subscribers.add(subscriber);
 }
 private void notifyAllSubscribers() { //새 값이 있음을 알림
  subscribers.forEach(subscriber -> subscriber.onNext(this.value));
 }
 @Override
 public void onNext(Integer newValue) {
  this.value = newValue; //값을 갱신
  System.out.println(this.name + ":" + this.value);
  notifyAllSubscribers(); // 구독자에 갱신되었음을 알림
 }
}

 

다음과 같이 시도

Simplecell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3);
c1.onNext(10); // C1의 값 10으로 갱신
c2.onNext(20); 

 

계산 구현 클래스 

 public class ArithmeticCell extends SimpleCell {
     private int left;
      private int right;
      public ArithmeticCell(String name) {
           super(name);
           }
      public void setLeft(int left) {
           this.left = left;
           onNext(left + this.right);
      }
 
      public void setRight(int right) {
           this.right = right;
           onNext(right + this.left);
      }
 }

다음과 같이 시도

ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);
c1.onNext(10);
c2.onNext(20);
c1.onNext(15);

 

출력결과

C1: 10

C3: 10

C2: 20

C3: 30

C1: 15

C3: 35

 

// C5=C3+C4, 의존하는 새로운 셀 C5

ArithmeticCell c5 = new ArithmeticCell("C5");
ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c4 = new SimpleCell("C4");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);
c3.subscribe(c5::setLeft);
c4.subscribe(c5::setRight);

c1.onNext(10);
c2.onNext(20);
c1.onNext(15);
c4.onNext(1);
c4.onNext(3);

출력결과

C1:10

C3:10

C5:10

C2:20

C3:30

C5:30

C1:15

C3:35

C5:35

C4:1

C5:36

C4:3

C5:38

 

2 - 역압력

정보의 흐름 속도를 역압력(흐름제어)으로 제어

Subscriber 에서 Publisher로 정보를 요청해야 할 필요가 있다.

Publisher는 여러 Subscriber를 갖고 있어 역압력 요청이 한 연결에만 영향을 미쳐야 한다는게 문제가 될수 있다.

 

자바9 플로 API의 Subscriber 인터페이스의 네번째 메서드

void onSubscribe(Subscription subscription);

 

둘 사이 채널이 연결되면 첫이벤트로 이 메서드가 호출된다.

Subscrption객체는 다음처럼 서로 통신할 수 있는 메서드를 포함한다.

interface Subscription {
    void cancel();
    void request(long n);
}

 

콜백을 통한 '역방향' 소통으로

Publisher는 Subscription 객체를 만들어 Subscriber로 전달하면 

Subscriber는 이를 이용해 Publisher로 정보를 보낼 수 있다.

 

3 - 실제 역압력의 간단한 형태

한번에 한개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면?

  • Subscriber가 OnSubscribe 로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장
  • Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에
    channel, request(1)을 추가해 오직 한 이벤트만 요청
  • 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꿈
    (보통 여러 Subscriber가 자신만의 속도를 유지할 수 있도록 Publisher는 새 Subscription을 만들어 각 Subscriber와 연결)

역압력을 구현하는데 고려해야할 장단점

  • 여러 Subscriber가 있을때 이벤트를 가장 느린속도로 보낼 것인가?
    각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질 것인가?
  • 큐가 너무 커지면 어떻게 해야할까?
  • Subscriber가 준비가 안 되었다면 큐의 데이터를 폐기할 것인가?

15.6 리액티브 시스템 vs 리액티브 프로그래밍

리액티브 시스템이란

런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램

리액티브 시스템이 가져야할 공식적인 속성

- 반응성(실시간으로 입력에 반응하는 것)

- 회복성(한 컴포넌트의 실패로 전체 시스템이 실패하지 않음)

- 탄력성 (시스템이 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함)

 

이런 속성을 구현하는 방법중의 하나인 리액티브 프로그래밍 

- java.util.concurrent.Flow 관련된 자바 인터페이스에서 제공.

- 메시지 주도 message-dreven 속성을 반영

- 박스와 채널모델에 기반한 내부 API를 가짐

- 컴포넌트는 처리할 입력을 기다리고 결과를 다른 컴포넌트로 보내면서 시스템이 반응

 

 


  • 자바의 스레드 풀은 유용하지만 블록되는 태스크가 많아지면 문제가 발생함. 
  • 메서드를 비동기로 만들면 병렬성을 추가할 수 있으며 루프를 최적화 시킴.
  • 박스와 채널 모델을 이용해 비동기 시스템을 시각화
  • 자바8 CompletableFutrue 클래스와 자바9 플로API 모두 박스와 채널 다이어그램으로 표현할 수 있다.
  • CompletableFutrue 클래스는 한번의 비동기 연산을 표현. 콤비네이커로 비동기 연산을 조합함으로써 Future를 이용할 때 발생했던 기존의 블로킹 문제를 해결할 수 있다.
  • 플로 API는 발생-구독 프로토콜, 역압력을 이용하면 자바의 리액티브 프로그래밍의 기초를 제공
  • 리액티브 프로그래밍을 이용해 리액티브 시스템을 구현할 수 있다.