[Java] CompletableFuture 란?

자바에서의 비동기 처리

 

저번 글에서 동기 / 비동기, 블로킹 / 논블로킹에 대해 알아보았는데, 이번 글에서는 Java에서 비동기 작업을 처리하기 위한 강력한 클래스인 CompletableFutue에 대해 알아보겠습니다.

CompletableFuturejava.util.concurrent 패키지에 포함되어 있으며, Future 인터페이스와 CompletionStage 인터페이스를 구현하고 있는 클래스로 Java 8에서 추가된 기능입니다. 이로 인해 Lambda, Method reference 등 다양한 기능을 지원합니다.

 

이 글에서는 기존의 비동기 처리를 위해 사용하던 Future 인터페이스가 가지는 한계와 CompletableFuture가 그것을 어떻게 해결했는지, 그리고 CompletableFuture에 추가된 효율적인 메서드들에 대해 살펴보겠습니다.

 

 

Future 란?

 

Future라는 이름에서 알 수 있듯이, 이는 미래의 값을 의미하는 객체로, 비동기 작업의 결과를 받아오는 인터페이스입니다. 이 인터페이스를 공부하면서 저는 슈뢰딩거의 고양이 사고 실험이 떠올랐습니다. 슈뢰딩거의 고양이는 상자를 열어보기 전까지 고양이가 죽었는지 살았는지 알 수 없다는 실험인데, Future도 비슷한 점이 있습니다. 비동기 작업이 완료되었는지 여부와 상관없이, Future로 반환받은 값을 실제로 확인하기 전까지는 해당 작업이 완료되었는지 알 수 없기 때문입니다.

 

하지만 Future 객체는 isDone(), isCancelled() 와 같은 메서드를 제공하여 비동기 작업의 완료 여부를 확인할 수 있고, get()을 통해 결과를 대기하거나 예외 처리를 할 수 있다는 점에서, 단순히 알 수 없는 상태만을 표현하는 것이 아닌 작업의 결과를 기다리고 확인할 수 있는 방식도 제공합니다.

public Future<Integer> getFuture() {
    var executor = Executors.newSingleThreadExecutor();

    try {
        return executor.submit(() -> {
            // 비동기 작업
            return 0;
        });
    } finally {
        executor.shutdown();
    }
}

 

Future는 인터페이스이기 때문에 직접 인스턴스를 생성하여 사용할 수는 없고, 위와 같이 java.util.concurrent 패키지의 Executors 클래스의 submit() 메서드를 사용하면 Future 객체로 비동기 작업의 결과를 받아 올 수 있습니다.

 

 

Future의 메서드와 한계

  • get() : 결과를 반환하는 메서드로 비동기 작업의 결과를 구할 때까지 thread가 계속 blocking 된다.
    • 인자로 시간을 넣어주면 timeout 시간까지 결과를 구하다가 시간이 넘어가면 TimeoutException을 발생시킨다.
  • isDone() : 비동기 작업이 완료 또는 종료되었는지 확인한다.
  • isCancelled() : 비동기 작업이 명시적으로 취소되었는지 확인한다.
  • cancel() : 비동기 작업을 명시적으로 취소하는 메서드로 이미 완료 또는 종료된 상태라면 취소되지 않는다.

 

이렇게 Future의 메서드들의 대해 알아보았는데, Future에는 몇 가지 한계점이 존재합니다.

  1. cancel() 을 제외하고는 외부에서 Future를 컨트롤할 수 없다.
  2. 반환된 결과를 get()으로 접근해야 되기 때문에 비동기 처리가 어렵다.
  3. complete(완료)와 exception(종료)을 구분하기 어렵다.

 

 

CompletionStage 란?

 

CompletionStage라는 이름에서 알 수 있듯이, 이는 완료 단계를 의미하는 객체로, 작업이 완료된 후에 수행될 수 있는 여러 단계의 후속 작업들을 다루는 인터페이스입니다.

 

CompletionStage 인터페이스를 구현한 CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해 ForkJoinPool을 사용하는데, 이는 다수의 작업을 병렬로 실행할 수 있도록 설계된 풀로서, ForkJoinPool의 기본 사이즈는 ** 할당된 cpu 코어 - 1 ** 입니다. 또한 ForkJoinPool은 기본적으로 데몬 쓰레드를 사용하는데, 이는 main 쓰레드가 종료되면 즉각적으로 종료되는 쓰레드입니다.

 

public CompletionStage<Integer> getStage() {
    return CompletionFuture.supplyAsync(() -> {
        // 비동기 작업
        return 0;
    });
}

 

CompletionStage는 인터페이스이기 때문에 직접 인스턴스를 생성하여 사용할 수는 없고, 위와 같이 java.util.concurrent 패키지의 CompletableFuture 클래스의 메서드를 사용하면 CompletionStage 객체로 비동기 작업의 결과를 받아 올 수 있습니다.

 

 

CompletionStage의 메서드

 

CompletionStage의 메서드들을 살펴보면, Java 8에 추가된 함수형 인터페이스에서 사용되는 익숙한 메서드명들이 사용된 것을 알 수 있습니다. 이러한 메서드들은 비동기 작업을 체이닝 할 때 매우 유용하게 활용되며, 주로 두 가지 카테고리로 나눌 수 있습니다.

  1. thenXxx() 메서드
  2. thenXxxAsync() 메서드

 

thenXxx() 와 thenXxxAsync()

 

이 두 가지 메서드가 어떤 차이점을 가지며 어떻게 동작하는지 알아보기 위해 직접 테스트 코드를 작성해 확인해 보겠습니다. 

@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
    var future = CompletableFuture.supplyAsync(() -> {
        log.info("return in future!");
        return 1;
    });
    Thread.sleep(100);
    return future;
}

public static CompletionStage<Integer> runningStage() {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            log.info("I'm running!");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 1;
    });
}

 

위 두 개의 메서드는 CompletionStage의 상태를 DoneRunning으로 구분하기 위한 테스트 메서드입니다.

이를 간단히 메인 함수에서 테스트하기 위해 static 예약어와 @SneakyThrows 어노테이션을 추가해서 작성했습니다.

 

》 CompletionStage가 Done 상태일 때

1. thenXxx()

@SneakyThrows
public static void main(String[] args) {
    log.info("start main");
    CompletionStage<Integer> stage = finishedStage();
    stage.thenAccept(i -> {
    	log.info("{} in thenAccept", i);
    }).thenAccept(i -> {
    	log.info("{} in thenAccept2", i);
    });
    log.info("after thenAccept");

    Thread.sleep(100);
}

 

 

CompletionStage가 Done 상태일 때 thenXxx()가 호출된다면 Caller의 쓰레드(main)에서 실행되기 때문에, 후속 작업이 진행되는 동안 main 쓰레드가 Blocking 될 수 있습니다.

 

2. thenXxxAsync()

@SneakyThrows
public static void main(String[] args) {
    log.info("start main");
    CompletionStage<Integer> stage = finishedStage();
    stage.thenAcceptAsync(i -> {
    	log.info("{} in thenAcceptAsync", i);
    }).thenAcceptAsync(i -> {
    	log.info("{} in thenAcceptAsync2", i);
    });
    log.info("after thenAcceptAsync");

    Thread.sleep(100);
}

 

CompletionStage가 Done 상태일 때 thenXxxAsync()가 호출된다면 Thread Pool(ForkJoinPool)에 있는 쓰레드에서 실행되기 때문에, 후속 작업이 진행되는 동안 main 쓰레드는 Non-Blocking 합니다.

 

 

》 CompletionStage가 Running 상태일 때

1. thenXxx()

@SneakyThrows
public static void main(String[] args) {
    log.info("start main");
    CompletionStage<Integer> stage = runningStage();
    stage.thenAccept(i -> {
    	log.info("{} in thenAccept", i);
    }).thenAccept(i -> {
    	log.info("{} in thenAccept2", i);
    });
    log.info("after thenAccept");

    Thread.sleep(2000);
}

 

CompletionStage가 Running 상태일 때 thenXxx()가 호출된다면 Thread Pool(ForkJoinPool)에 있는 쓰레드에서 실행되기 때문에, 후속 작업이 진행되는 동안 main 쓰레드는 Non-Blocking 합니다.

 

 

2. thenXxxAsync()

@SneakyThrows
public static void main(String[] args) {
    log.info("start main");
    CompletionStage<Integer> stage = runningStage();
    stage.thenAcceptAsync(i -> {
    	log.info("{} in thenAcceptAsync", i);
    }).thenAcceptAsync(i -> {
    	log.info("{} in thenAcceptAsync2", i);
    });
    log.info("after thenAcceptAsync");

    Thread.sleep(2000);
}

 

CompletionStage가 Running 상태일 때 thenXxxAsync()가 호출된다면 Thread Pool(ForkJoinPool)에 있는 쓰레드에서 실행되기 때문에, 후속 작업이 진행되는 동안 main 쓰레드는 Non-Blocking 합니다.

 

 

thenXxxAsync()CompletionStage의 상태가 Done, Running 에 상관없이 항상 쓰레드 풀에서 실행되므로 main 쓰레드가 항상 Non-Blocking 하게 동작할 수 있다

 

 

테스트 결과를 도식화하면 위와 같이 표현할 수 있습니다.

이 테스트를 통해 thenXxx() 메서드는 Stage 상태에 따라 어떤 쓰레드에서 실행될지 예상하기 힘들기 때문에 지양해야 한다는 결론을 얻을 수 있었습니다.

 

 

CompletableFuture 란?

 

CompletableFuture라는 이름에서 알 수 있듯이, 이는 Future 인터페이스와 CompletionStage 인터페이스를 구현한 클래스로, 여러 비동기 작업을 쉽게 관리하고 조합할 수 있는 클래스입니다.

 

 

 

CompletableFuture의 메서드와 Future의 한계점 극복

 

기본적으로 CompletableFuture는 Future와 CompletionStage를 구현한 클래스기 때문에 부모 인터페이스의 메서드들을 모두 구현하고 있어, 추가적으로 살펴볼 메서드만 나열해 보았습니다.

 

  • supplyAsync() : Supplier를 비동기적으로 시작하고, 결과를 CompletableFuture<T>로 반환하는 메서드
  • runAsync() : Runnable을 비동기적으로 시작하고, 결과를 CompletableFuture<Void>로 반환하는 메서드
  • complete() : CompletableFuture의 결과를 명시적으로 완료시키는 메서드
  • isCompletedExceptionally() : CompletableFuture가 Exception으로 인해 종료되었는지 확인하는 메서드
  • allOf() : 여러 CompletableFuture 인스턴스를 결합하여, 모든 CompletableFuture가 완료될 때까지 기다리고 결과를 CompletableFuture<Void>로 반환하는 메서드
  • anyOf() : 여러 CompletableFuture 중 하나라도 완료될 때까지 기다리고, 완료된 첫 번째 결과를 CompletableFuture<Object>로 반환하는 메서드

 

CompletableFuture는 기존의 Future 객체의 3가지 한계점을 모두 극복하였습니다.

  1. cancel() 외에도 complete()를 통해 외부에서 직접 컨트롤이 가능하다.
  2. 반환된 결과를 get()이 아닌 CompletionStage의 체이닝 메서드를 통해 접근할 수 있다.
  3. complete(완료), exception(종료), cancel(취소)를 모두 구분할 수 있다.

 

위 3번 내용은 아래와 같이 도식화하여 비교할 수 있습니다.

기존의 Future의 경우 isDone(), Cancelled() 메서드만 제공되었기 때문에, complete(완료)와 exception(종료)를 구분할 수 없었습니다.

 

그러나 CompletableFuture의 경우 isDone(), Cancelled() 메서드 외에 isCompletedExceptionally() 라는 메서드도 제공되어, complete(완료), exception(종료), cancel(취소)를 모두 구분할 수 있습니다.

 

 

CompletableFuture의 한계

 

그러나 물론 CompletableFuture에도 한계점이 있습니다.

  1. CompletableFuture를 반환하는 함수를 호출 시 즉시 작업이 실행되기 때문에 지연 로딩을 지원하지 않는다.
  2. CompltatbleFuture에서 데이터를 반환하고 나서 지속적으로 다른 값을 전달하기 어렵다.

 

 

마치며

 

이번 글에서는 Future, CompletionStage, CompletableFuture에 대해 공부해 보았습니다.

비록 CompletableFuture에서 Future의 한계점을 극복하긴 했지만, 여전히 한계점이 존재한다는 것을 알게 되었습니다.

이러한 한계점은 추후 다뤄볼 Reactive Stream 에서 극복할 수 있는데 이는 나중에 Spring Boot - Reactive Stack 카테고리에 포스팅하겠습니다.