본문 바로가기

IT Book Summary/ModernJavaInAction

Chapter 16 CompletableFuture : 안정적 비동기 프로그래밍

병렬을 구현하는 두가지 방식

- 병렬 스트림과 포크/조인 기법을 이용해 컬렉션을 반복

- 분할, 정복 알고리즘을 활용하는 프로그램에서 높은 수준의 병렬을 적용

 

자바8, 자바9 에서는 CompletableFuture와 리액티브 프로그래밍 패러다임 두가지 API를 제공한다.

자바8에서 제공하는 Future의 구현 CompatableFuture이 비동기 프로그램에 도움을 준다.

 

16.1 Future의 단순 활용

 

비동기 계산을 모델링하는데 Future를 이용할 수 있고 Future는 계산이 끝났을때 결과에 접근할 수 있는 참조를 제공한다.

시간이 걸리는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다.

오래걸리는 작업을 Callable 객체 내부로 감싼후 다음에 ExecutorService에 제출해야 한다.

 

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { //Callable을 ExecutorService로 제출
  public Double call() {
    return doSomeLongComputation(); //오래걸리는 작업을 비동기 스레드로 실행
   }});
  doSomethingElse(); //비동기 작업동안 다른작업 수행
  try {
      Double result = future.get(1, TimeUnit.SECONDS); //비동기작업의 결과를 가져옴.
      //결과가 준비되지않으면 1초까지 기다리며 호출스레드 블록
  } catch (ExecutionException ee) {
      //계산 중 예외
  } catch (InterruptedException ie) {
      //현재 스레드 대기중 인터럽트 발생
  } catch (TimeoutException te) {
      //Future가 완료되기 전 타임아웃 발생
}

Future로 오래걸리는 작업을 비동기적으로 실행

오래걸리는 작업이 끝나지 않는다면?

-> get 메서드를 오버로드해서 스레드 대기 최대시간을 설정하는 것이 좋다.

 

1 - Future 제한

 

Future에 있으면 유용할 선언형 기능

  • 두개의 비동기 계산을 하나로 합침
  • Future 집합이 실행하는 모든 태스크의 완료를 기다림
  • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻음.
  • 프로그램적으로 Future를 완료시킴
  • Future 완료동작에 반응. (Future의 결과로 원하는 추가동작을 수행)

Future와 CompletableFuture의 관계를 Collection과 Stream 관계에 비유할 수 있다.

CompletableFuture는 람다 표현식과 파이프라이닝을 활용한다.

 

2 - CompletableFuture로 비동기 애플리케이션 만들기

온라인상점 중 가장 저렴한 가격을 제시하는 상점을 찾는 애플리케이션을 완성해가는 예제.

앱을 만들면서 사용할 수 있는 CompletableFuture 의 기능 

  • 고객에게 비동기 API를 제공하는 방법
  • 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법
  • 비동기 동작의 완료에 대응하는 방법

16.2 비동기 API 구현

 

public class Shop {
  public static void delay() {
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
  //메서드 지연 흉내내기
  public double getPrice(String product) {
    return calculatePrice(product);
  }
  private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
  }
}

 

1 - 동기 메서드를 비동기 메서드로 변환

 

getPrice를 비동기 메서드로 변환하기전 이름과 반환값을 특성에 맞게 바꾸자

 

비동기 getPriceAsync 메서드 구현

public Future<Double> getPriceAsync(String product) {
      CompletableFuture<Double> futurePrice = new CompletableFuture<>(); //계산결과 포함할 futurePrice
      new Thread( () -> {
        double price = calculatePrice(product); //비동기 계산 스레드
        futurePrice.complete(price); //오래걸리는 작업 값 Future에 설정
     }).start();
    return futurePrice; //계산결과를 기다리지 않고 Future 반환
}

 

비동기 API 사용

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //상정에 가격정보요청
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + " msecs");
//가격계산하는 동안 다른작업 수행
doSomethingElse();

try {
    double price = futurePrice.get(); //가격정보 받을때까지 블럭
    System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
    throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");

Future를 이용해 나중에 결과를 얻을 수 있다.

 

 

2 - 에러 처리 방법

가격을 계산하는동안 에러가 발생한다면? get() 반환을 계속 기다리게 될것이다.

이처럼 블록 문제가 발생하는 상황에서는 타임아웃을 활용해야 한다.

completeExceptionally 메서드를 이용해 내부발생 예외를 전달하고 종료할 수 있다.

 

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
      try {
        double price = calculatePrice(product);
        futurePrice.complete(price); //계산이 정상리턴되면 Future에 가격정보를 저장하고 종료.
      } catch (Exception ex) {
         futurePrice.completeExceptionally(ex); //발생한 에러를 포함시켜 Future종료.
      }
    }).start();
    return futurePrice;
}

 

팩토리 메서드 supplyAsync로 CompletableFuture 만들기

 

좀더 간단하게 getPriceAsync 메서드 만들기

supplyAsync 메서드는 Supplier를 인수로 받아 CompletableFuture를 반환한다.

CompletableFuture는 Supplier를 실행해 비동기적으로 결과를 생성

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

 


16.3 비블록 코드 만들기

 

모든상점에 정보를 요청하는 findPrice

List<Shop> shops = List.of(new Shop("BestPrice"),
                           new Shop("LetsSaveBig"),
                           new Shop("MyFavoriteShop"),
                           new Shop("BuyItAll"));
public List<String> findPrices(String product) {
  return shops.stream()
  .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
  .collect(toList()); 
}       
// 결과와 성능 확인
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");

네개의 상점에서 가격을 검색하는동안 1초의 대기시간이 있어 전체 검색결과는 4초가 더 걸린다

 

1 - 병렬 스트림으로 요청 병렬화하기

stream() 을 parallelStream()으로 바꾸어 병렬처리해 성능을 높일 수 있다.

 

2 - CompletableFuture로 비동기 호출 구현하기

CompletableFuture 기능을 활용해 findPrices 메서드의 동기 호출을 비동기로 바꾸자.

 

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
  shops.stream()
        .map(shop -> CompletableFuture.supplyAsync( //비동기로 계산
          () -> shop.getName() + " price is " +
                shop.getPrice(product)))
        .collect(Collectors.toList());
  return priceFutures.stream()
                     .map(CompletableFuture::join) //비동기 동작이 끝나길 기다림
                     .collect(toList());
 }

두 map 연산을 하나의 스트림처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리 했다.

스트림연산은 게으른 특성이 있어서 하나의 파이프라인으로 연산을 처리하면 모든 정보요청 동작이

동기적, 순차적으로 이루어진다.

스트림의 게으른 특성으로 순차계산이 일어나는 이유와 순차계산을 회피하는 방법

위쪽은 단일 스트림으로 했을때의 과정. 밑쪽은 두개의 스트림으로 나누어 했을때의 과정을 보여준다.

 

3 - 더 확장성이 좋은 해결 방법

검색해야할 다섯번째 상점이 추가되었다면?

CompletableFuture는 병렬스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다.

Executor로 스레드 풀 크기를 조정할수도 있고 애플리케이션에 맞는 최적화 설정이 가능하다.

 

4 - 커스텀 Executor 사용하기

 

풀에서 관리하는 스레드 수를 결정하는 방법은?

'자바 병렬 프로그래밍' 에서는 스레드풀 최적값을 찾는법을 제한함.
풀이 너무크면 CPU메모리 자원을 서로 경쟁하느라 낭비. 너무 작으면 일부 코어가 활용되지 않음.

Nthreads = Ncpu* Ucpu * (1+W/C)

- Ncpu 는 Runtime.getRuntime().availableProcessors() 가 반환하는 코어 수
- Ucpu 는 0과 1 사이의 값을 갖는 CPU활용 비율
- W/C는 대기시간과 계산시간의 비율 

상점수보다 많은 스레드 수는 낭비이므로, 한 상점에 하나의 스레드가 할당되도록 하지만 최대개수는 100이하로 설정하자.

 

커스텀 Executor 만들기

private final Executor executor =
  Executors.newFixedThreadPool(Math.min(shops.size(), 100), //스레드풀 사이즈 1-100사이
    new ThreadFactory() {
      public Thread newThread(Runnable r) { 
        Thread t = new Thread(r);
        t.setDaemon(true); //프로그램 종료 방해않는 데몬 스레드 사용.
        return t;
      }
});

 

데몬스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다.

이제 Executor를 팩토리 메서드 supplyAsync 의 두번째 인수로 전달 가능.

 

CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);

 

상황에 맞는 Executor를 만들어 CompletableFuture를 활용하는것이 좋다.

- I/O가 아닌 계산 중심 작업일경우 스트림 인터페이스가 구현하기 간단하며 더 효율적일 수 있음.

- I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공함.


16.4 비동기 작업 파이프라인 만들기

 

할인서비스에서 더로 다른 할인율을 제공하는 다섯가지 코드 제공

enum으로 할인코드 정의.

public class Discount {
  public enum Code {
    NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    private final int percentage;
    Code(int percentage) {
      this.percentage = percentage;
    }
  }
  //Discount 클래스 구현
  
  public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
    return String.format("%s:%.2f:%s", name, price, code);
  }
  
  private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
  }
  
}

 

 

1 - 할인 서비스 구현

여러 상점에서 가격 정보를 얻고, 결과 문자열을 파싱하고, 

할인 서버에서 할인율을 확인해서 최종가격을 계산할 수 있다.

 

상점에서 제공한 문자열 파싱은 Quote클래스로 만들자

public class Quote {
  private final String shopName;
  private final double price;
  private final Discount.Code discountCode;
  public Quote(String shopName, double price, Discount.Code code) {
    this.shopName = shopName;
    this.price = price;
    this.discountCode = code;
  }
  
  public static Quote parse(String s) {
    String[] split = s.split(":");
    String shopName = split[0];
    double price = Double.parseDouble(split[1]);
    Discount.Code discountCode = Discount.Code.valueOf(split[2]);
    return new Quote(shopName, price, discountCode);
 }
  public String getShopName() { return shopName; }
  public double getPrice() { return price; }
  public Discount.Code getDiscountCode() { return discountCode; }
}

 

Discount 서비스에서는 Quote 객체를 인수로 받아 할인된 가격문자열을 반환하는 applyDiscount 메서드도 제공.

public class Discount {
  public enum Code {
  }
  
  public static String applyDiscount(Quote quote) {
    return quote.getShopName() + " price is " +
               Discount.apply(quote.getPrice(), quote.getDiscountCode());
  }
  private static double apply(double price, Code code) { //기존가격에 할인가격 적용
    delay(); //서비스 응답지연 흉내
    return format(price * (100 - code.percentage) / 100);
  }
}

 

 

2 - 할인 서비스 사용

 

Discount 서비스를 이용하는 간단한 findPrices 구현

public List<String> findPrices(String product) {
  return shops.stream()
               .map(shop -> shop.getPrice(product)) //할인전 가격얻기.
               .map(Quote::parse) //반환한 문자열을 Quote 객체로 변환
               .map(Discount::applyDiscount) //각 Quote에 할인 적용
               .collect(toList());
}

 

3 - 동기 작업과 비동기 작업 조합하기

 

CompletableFuture에서 제공하는 기능으로 findPrices 메서드를 비동기적으로 재구현

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures =
         shops.stream()
              .map(shop -> CompletableFuture.supplyAsync(
                         () -> shop.getPrice(product), executor)) //가격정보 얻기
              .map(future -> future.thenApply(Quote::parse)) //Quote파싱하기
              .map(future -> future.thenCompose(quote ->
                             CompletableFuture.supplyAsync(
                             () -> Discount.applyDiscount(quote), executor))) 
                             //CompletableFuture를 조합해서 할인된 가격 계산하기
              .collect(toList());
              
  return priceFutures.stream()
                     .map(CompletableFuture::join)
                     .collect(toList());
}

CompletableFuture 클래스 기능을 이용해 비동기로 만듬.

 

동기작업과 비동기작업 조합하기

 

첫번째 map. 가격정보 얻기

변환의 결과 Stream<CompletableFuture<String>>

CompletableFuture는 작업이 끝났을때 해당 상점에서 반환하는 문자열 정보를 포함.

 

두번째 map. Quote 파싱하기

첫번째 결과 문자열을 Quote로 변환

 

세번째 map. CompletableFuture를 조합해서 할인된 가격 계산하기

상점에서 받은 할인전 가격에 Discount 서비스에서 제공하는 할인율 적용.

세개의 연산 결과 요소를 리스트로 수집하면 List<CompletableFuture<String>> 을 얻음.

마지막으로 CompletableFuture가 완료되길 기다렸다가 join으로 값 추출.

 

4 - 독립 CompletableFuture와 비독립 CompletableFuture 합치기

첫번째 CompletableFuture 의 완료와 상관없이 두번째 CompletableFuture를 실행할 수 있어야 한다.

이럴때 thenCombine 메서드를 사용한다.

thenCombineAsync 메서드에서는 BiFunction이 정의하는 조합동작이  별도의 태스크에서 비동기 적으로 수행된다

CompletableFuture의 결과가 생성되고 BiFunction으로 합쳐진 다음 세번째 CompletableFuture를 얻을 수 있다.

 

Future<Double> futurePriceInUSD =
  CompletableFuture.supplyAsync(() -> shop.getPrice(product)) //가격정보 요청
                   .thenCombine(CompletableFuture.supplyAsync(
                   () ->  exchangeService.getRate(Money.EUR, Money.USD)), //환율정보 요청
                   (price, rate) -> price * rate //두 결과를 곱해 가격과 환율정보 합침
  ));

 

5 - Future의 리플랙션과 CompletableFuture의 리플렉션

CompletableFuture는 람다표현식을 사용하며, 복잡한 연산수행 방법을 쉽게 선언형 API를 만들수있다.

 

자바7로 두 Future 합치기

ExecutorService executor = Executors.newCachedThreadPool(); //태스크를 스레드풀에 제출할수있도록 생성.
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
  public Double call() {
    return exchangeService.getRate(Money.EUR, Money.USD); //환율정보 가져올 Future생성
  }
});
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
  public Double call() {
    double priceInEUR = shop.getPrice(product); //두번째 Future로 상정에서 제품가격을 검색
    return priceInEUR * futureRate.get(); //가격 검색한 Future이용해 가격환율을 곱함.
  }
});

 

6 - 타임아웃 효과적으로 사용하기

자바9의 orTimeout 메서드는 지정된 시간이 지난후 CompletableFuture를 TimeoutException으로 완료하면서 

다른 CompletableFuture를 반환할 수 있도록 내부적으로 ScheculedThreadExecutor를 활용한다.

 

// 타임아웃 추가해 exception 발생시킴.
Future<Double> futurePriceInUSD =
     CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                  CompletableFuture.supplyAsync(
                    () ->  exchangeService.getRate(Money.EUR, Money.USD)),
                  (price, rate) -> price * rate))
                .orTimeout(3, TimeUnit.SECONDS); 
                //3초뒤 작업이 완료되지 않으면 Future가 TimeoutException발생시키도록 설정.


//CompletableFuture에 타임아웃이 발생하면 기본값으로 처리하는 방식
Future<Double> futurePriceInUSD =
     CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                      .thenCombine(CompletableFuture.supplyAsync(
                        () ->  exchangeService.getRate(Money.EUR, Money.USD))
                      .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS), //1초안에 결과를 제공하지 않으면 기본환율값 사용 
                        (price, rate) -> price * rate))               
                      .orTimeout(3, TimeUnit.SECONDS);

 


16.5 CompletableFuture의 종료에 대응하는 방법

 

기다리지 않고 각 상점에서 가격정보를 제공할때마다 즉시 보여줄 수 있는

최저가격 검색 애플리케이션을 만들어보다. 

 

1 - 최저가격 검색 애플리케이션 리팩터링

가격정보를 포함할 때까지 리스트 생성을 기다리지 않도록 하자

 

findPrices메서드 리팩터링

public Stream<CompletableFuture<String>> findPricesStream(String product) {
  return shops.stream()
              .map(shop -> CompletableFuture.supplyAsync(
                () -> shop.getPrice(product), executor))
              .map(future -> future.thenApply(Quote::parse))
              .map(future -> future.thenCompose(quote ->
                CompletableFuture.supplyAsync(
                  () -> Discount.applyDiscount(quote), executor)));
}

thenAccept 메서드는 연산 결과를 소비하는 Consumer를 인수로 받는다.

thenAcceptAsync 메서드는 CompletableFuture가 새로운 스레드를 이용해서 Consumer를 실행.

하지만 완료즉시 응답하는것이 좋으므로 thenAcceptAsync는 사용하지 않는다.

네번째 map 연산을 <CompletableFuture<Void>>를 반환한다

 

//CompletableFuture 종료에 반응
CompletableFuture[] futures = findPricesStream("myPhone")
  .map(f -> f.thenAccept(System.out::println))
  .toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

CompletableFuture 배열을 받아 allOf 메서드가 반환하는 CompletableFuture<Void> 에

join을 호출하면 스트림의 모든 CompletableFuture의 실행 완료를 기다린다.

만약 배열의 CompletableFuture 중 하나의 작업이 끝나기를 기다리는 상황에서는

allOf 메소드 대신 anyOf 를 사용하면 된다.

 

2 -  응용

 

0.5-2.5초 임의의 지연을 발생시켜 원격 서비스 호출을 흉내 

각각의 계산에 소요된 시간을 출력하는 부분을 추가함.

long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream("myPhone27S")
        .map(f -> f.thenAccept(
           s -> System.out.println(s + " (done in " +
             ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
        .toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
                   + ((System.nanoTime() - start) / 1_000_000) + " msecs");

 

임의의 지연이 추가되면 마지막 가격정보에 비해 처음가격정보를 두배 빨리 얻을 수 있다.

 

출력결과

BuyItAll price is 184.74 (done in 2005 msecs)
MyFavoriteShop price is 192.72 (done in 2157 msecs)
LetsSaveBig price is 135.58 (done in 3301 msecs)
ShopEasy price is 167.28 (done in 3869 msecs)
BestPrice price is 110.93 (done in 4188 msecs)
All shops have now responded in 4188 msecs

 


  •  한 개 이상의 원격 외부서비스를 사용하는 긴 동작을 실행할 때는
    비동기 방식으로 애플리케이션의 성능과 반응성을 향상시킬수 있음.
  • 우리 고객에게 비동기 API를 제공하는것을 고려해야 한다.
    CompletableFuture의 기능을 이용하면 쉽게 비동기 API를 구현할 수 있음.
  • CompletableFuture를 이용할 때 비동기 태스크에서 발생한 에러를 관리하고 전달 가능.
  • 동기 API를 CompletableFuture로 감싸서 비동기적으로 소비할 수 있음.
  • 여러 비동기 동작을 조립하고 조합가능.
  • CompletableFuture에 콜백을 등록해서 Future가 동작을 끝내고 결과를 생산했을때 어떤 코드를 실행하도록 지정가능.
  • CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지
    아니면 첫값만 완료되길 기다릴지 선택 가능.
  • 자바9에서는 orTimeout, completeOnTimeout 메서드로 CompletableFuture에 비동기 타임아웃 기능 추가됨.