본문 바로가기
Spring WebFlux

[Spring WebFlux] Hot vs Cold

by 코리늬 2022. 3. 18.

webflux 역시 간단한 개념정도만 알고있고, 실제로 고민해서 써본적은 없었었다.

앞으로 restTemplate이 Deprecated 됨에 따라서 우리팀은 WebClient를 사용하는데, 이때 약간의 해프닝이 있었다.

역시 책으로 보는것은 한계가 있다는 사실을 다시 한 번 느끼게 해주는 계기였다.

 

Spring Webflux?

설명하기에 앞서 스프링 웹플럭스는 왜 만들어졌을까??

적은 수의 쓰레드와 보다 적은 하드웨어 자원으로 동시성을 처리하기 위해서이다. 즉, 극한의 효율

보다 이러한 세밀한 처리가 필요한 이유는 아무래도 페이스북이나 유튜브처럼 대량의 트래픽을 처리해야하는 서비스가 늘고있기 때문이다.

서블릿 3.1에서도 이미 N-I/O를 다루기 위한 API를 제공하지만, 논블로킹을 구현하려면 다른 동기 처리나(Filter, Servlet) 블로킹 방식(getParameter, getPart)을 쓰는 API를 사용하기 어렵다. 그리하여 새로운 공통 API를 만들게 되었고 그것이 webFlux인것이다.

또 하나는 자바 8부터 도입된 람다식 덕분에 자바에서도 함수형 API를 작성할 수 있게 되었다. 그리하여 continuation-style-API (CompletableFutureReactiveX)로 비동기 로직을 선언적으로 작성할 수 있다. 웹플럭스에서 어노테이션을 선언한 컨트롤러와 더불어 함수형 웹 엔드포인트를 사옹할 수 있는 건 자바8 덕분이다.

개인적으로 어떤 새로운 기술이 나오는 이유가 상당히 중요하다고 생각한다.

기존의 부족한 점은 무엇이였으며, 이를 어떻게 보완해서 더 좋게 나왔을지에 대해 알아보는 일은 항상 재미있다.

 

사건의 발단

본론으로 들어가서 외부 솔루션으로 API 를 쏴서 데이터를 적재해야하는데, 응답값을 기다릴 필요가 없어서 동기 방식을 비동기 방식으로 바꾸기로 했다.

기존의 동기 방식의 코드가 잘 동작하고 있었기에, 동기 코드만 제거 해주면 비동기로 잘 나갈것 같았다(큰 착각ㅎㅎ)

public void send(String id, Params params) {
    try {
        Mono<String> result = apiResponseHandler.request(() -> webClient.post()
            .uri("/url" + id)
            .header("header", "headervalud")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(params)
            .retrieve()
            .bodyToMono(String.class)
            .retry(2)
        );

    } catch (ApiResponseException e) {
        ...
    } catch (Exception e) {
        ...
    }
}

외 않나가?? 

마무리할 생각에 신나게 테스트를 해봤지만, 에러도 발생하지 않고 발송만 되지않고 있었다.

한참을 구글링 해봤지만, 찾을 수 없었고 주변에 물어보고나서야 원인을 알게되었다.

 

결론

이 글을 읽으시는 분들을 위해 결론부터 말하자면, 구독을 하지 않아서였다. 글을 올리기가 민망할정도다.

원인

Mono와 Flux의 경우 발행자 역할을 하는 Publisher의 구현체이다.

위 reactor 객체는 subscribe 메소드를 호출해 구독해야만 동작한다.

public abstract class Flux<T> implements Publisher<T> {
}

 

Hot, Cold

좀 더 추가적인 설명을 위해 Webflux Hot, Cold에 대해 알아보자.

Cold의 경우 reactor객체를 구독할 때마다 독립적으로 새로운 데이터를 생성해 동작한다. 구독이 없으면 아무런 동작을 하지 않는다. 기본적으로 Hot 연산자가 아닌 경우 reactor 객체는 Cold로 동작한다.

그렇기 때문에 구독여부와 상관없이 값을 생성하기 위해서는 Cold를 Hot으로 바꾸는 작업이 필요하게 된다.

꼭 Hot으로 바꾸지 않아도 Hot 연산자를 사용하면 구독 없이도 사용할 수 있다. (ex. Mono.just())

Hot의 경우 구독 전에 데이터를 생성할 수 있고, 한 번 생성한 데이터를 구독할 때마다 생성해 놓은 데이터를 통해 동작할 수 있다.

 

Connectable Flux

Cold에서 Hot으로 바꾸기 위해서는 기본적으로 publish 연산자를 호출하면 바꿀 수 있다.

변환된 reactor 객체에 connect() 메소드를 호출하면, 해당 메소드가 여러 구독자들이 Connectable Flux를 구독한 후 값을 생성해 각 구독자에게 보내기 시작한다.

publish.autoConnect(2) 와 같이 사용하면 Hot으로 변환된 ConnectableFlux를 최소 2개의 구독자가 구독을 하면 자동으로 구독하는 Flux를 리턴한다.

 

왜 이런일이 발생했는가???

내가 완전히 알지 못하는 코드였지만, 바쁘다는 핑계로 어떤식으로 동작하는지 찾아보지 않은 잘못이였다.

반성하자.

 

완성된 코드

public void send(String id, Params params) {
    try {
        Mono<String> result = apiResponseHandler.request(() -> webClient.post()
            .uri("/url" + id)
            .header("header", "headervalud")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(params)
            .retrieve()
            .bodyToMono(String.class)
            .retry(2)
        );

        result.subscribe(response -> {
            log.info("response : {}", response);
        });
    } catch (ApiResponseException e) {
        ...
    } catch (Exception e) {
        ...
    }
}

 

참고

reactor 공식문서

웹플럭스 공식문서

https://tech.kakao.com/2018/05/29/reactor-programming/

댓글