<개요>

- Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다.

- 현재 구성 중인 시스템에는 동기/비동기 API가 혼재되어 있으면서, 다양한 Application / DB를 사용중이기 때문에 SpringMVC 구조를 유지하면서 WebClient사용을 통해서 외부 API호출부분을 개선해보고 싶었다.

 

<내용>

- RestTemplate사용법과 WebClient사용법을 비교한다.

- Spring MVC 에서 WebClient 사용방법

- WebClient 를 사용할 때 이해할 Reactive Programming개념

 

RestTemplate 과 WebClient

1. RestTemplate

일반적으로 RestTemplate을 사용하여 HTTP 호출을 할 경우 다음과 같이 작성한다.

 try {
      restTemplate.exchange(URL,
                            HttpMethod.POST,
                            new HttpEntity<>(message),
                            new ParameterizedTypeReference<ResponseEntity>() {}
      );
    } catch (HttpStatusCodeException ce) {
      log.error(String.format("Exception occurred : %s, %s, %s"),
                ce.getStatusCode(),
                ce.getResponseBodyAsString());
      throw new RuntimeException(ce.getStatusCode(), ce.getResponseBodyAsString());
    }

이 때 RestTemplate을 @Bean으로 구성할때 공통으로 해당하는 속성들을 다음과 같은 형태로 정의한다.

@Configuration
public class RestConfig {
  @Bean
  public RestTemplate restTemplate() {
    return new RestTemplateBuilder()
      .messageConverters(getHttpMessageConverters())
      .setConnectTimeout(Duration.ofSeconds(10))
      .setReadTimeout(Duration.ofSeconds(10))
      .errorHandler(new HttpResponseErrorHandler())
      .requestFactory(() -> getClientHttpRequestFactory())
      .additionalInterceptors(bearerAuthInterceptor())
      .build();
  }

 

2. WebClient

WebFlux에서 제공하는 WebClient를  사용하여 HTTP호출을 구성하면 다음과 같다.

contents = webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn("-1")
                .flux()
                .toStream()
                .findFirst()
                .orElse("-1");

WebClient에서는 크게 retrieve() 와 exchange()를 제공하고 있으며 exchange사용시 Response처리를 제대로 하지 않을 경우 메모리 누수등이 발생할 수 있기 때문에 되도록 retrieve()를 권장하고 있다.

 

이때 WebClient에 필요한 속성들 역시 유사하게 다음과 같이 구성할 수 있다.

HttpClient httpClient = HttpClient.create()
                .tcpConfiguration(
                        client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientProperties.getConnectionTimeout()) //miliseconds
                                .doOnConnected(
                                        conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientProperties.getReadTimeout(), TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler(webClientProperties.getWriteTimeout(), TimeUnit.MILLISECONDS))
                                )
                );

        return WebClient.builder()
                .baseUrl(webClientProperties.getBaseUri())
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();

본인의 경우 다양한 Host의 API에 접근해야 하고 각각 응답시간이 다르기 때문에 타임아웃등의 설정을 Host별 Properties로 관리/생성하여 사용하고 있다.

 

3. RestTemplate vs WebClient

a. 가장 큰 차이는 동기/비동기의 차이로  볼 수 있다.

RestTemplate을 사용할 경우 기존 MVC 아키텍처를 활용하여 내부적으로 하나의 요청이 쓰레드에 종속되고, 응답대기에 대한 시간이 소요되어 전체적인 자원소모/처리시간의 지연을 가져오게 된다.

WebClient의 경우 비동기처리로 요청을 받는 쓰레드와 일하는 쓰레드를 분리하고 자원접근,응답대기에 대한 부분을 최소화 하여 효율적인 처리가 가능하다.

최근 MSA구조에서는 연계되는 API호출이 많으면서 개별 호출시간이 짧아지는 특성이 있다. 비동기처리로 효율성을 끌어올리는 경우가 많다. 물론 구조적인 복잡성이 증가하게 된다.

 

b. Spring WebFlux는 비동기처리를 기반으로 하고 있으며 그에 대한 Reactive Programming구현제로 Spring Reactor를 사용하고 있다. 따라서 WebClient호출 후 응답은 Mono/Flux 를 통해서 처리해야만 한다.

비동기 방식은 자원접근, 대기시간에 장점이 있는 대신에 오류가 발생했을때 디버깅을 하거나 추적하기에 어려운 부분이 있다.

 

Spring MVC + WebClient 사용

1. Spring WebFlux

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello World");
    }
    
    @GetMapping("/api-call")
    Mono<String> apiCall() {
        return apiCallService.callApi();
    }

}

Spring WebFlux의 경우 Controller에서 응답처리로 Mono/Flux를 사용한다.

WebClient를 이용한 다음과 같은 처리도 가능하다.

    public Mono<String> callApi() {

        return webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout));

    }

 

2. Spring MVC + WebClient

그러나 기존 Spring MVC구조는 유지하면서 API호출부분만 비동기 WebClient를 사용하고 싶다면 약간의 변경이 필요하다.

@RequestMapping(value="URL", method= RequestMethod.GET)
    public @ResponseBody
    void getFiltersTotalCount(HttpServletResponse response) {

        try{

            if( apiCallService.callApi().get(10, TimeUnit.SECONDS) ){

                response.setStatus(HttpServletResponse.SC_OK);

            }else{
                response.setStatus(HttpServletResponse.SC_CONFLICT);
            }
        }catch(Exception e){
            log.error("API Exception {}", e.getMessage());
            response.setStatus(HttpServletResponse.SC_CONFLICT);
        }

    }
@Async("executorBean")
    public CompletableFuture<Boolean> callApi() {

        boolean contents=false;

        contents = webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(Boolean.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn(false)
                .flux()
                .toStream()
                .findFirst()
                .orElse(false);

        return CompletableFuture.completedFuture(contents);

    }

- Controller Layer의 응답처리를 위해서 get()을 통해 대기하는 부분때문에 비동기처리 효과는 감소하지만 기존 아키텍처와 호환성을 유지하면서 다수의 Api Call을 할 경우 어느정도의 효율성을 기대할 수 있다.

 

3. 주의점

https://icthuman.tistory.com/entry/HHH000346-Error-during-managed-flush-null?category=568674 

 

Spring JPA, SecurityContext사용시 ThreadLocal 사용범위 관련 #1

<현상> - HHH000346: Error during managed flush [null] - Spring JPA 사용중 Transcation 처리가 정상적으로 되지 않는 현상 <원인> - 사용자 ID를 @CreatedBy 를 사용해서 관리중 (@EnableJpaAuditing) -..

icthuman.tistory.com

- 지난번 글에서 정리했던 것처럼 Spring Component 중에서는 ThreadLocal을 활용하고 있는부분들이 존재한다. 해당 부분은 @Async를 통과하면서 값이 사라지기 때문에 적절한 propagation 방법을 찾아야 한다.

- Spring MVC구조의 특성상 @Async를 수행하는 Thread Pool 이 모두 수행할 경우 오류가 발생할 수있음을 주의해야 한다.

- 동기처리 요청/응답 방식이 아니기 때문에 디버깅이나 테스트코드 작성이 어려운 부분도 있으며

- WebClient 사용시 Reactive Programming의 개념에 대해서 숙지해야 하는 부분이 있다. 특히 이 부분은 기본개념이면서 매우 중요한 부분이기 때문에 강조하고 싶었다.

 

다음 예제를 살펴보면 기존에 Response처리하던 부분을 void 타입을 활용하여 fire-forget 으로 변경하였다. 크게 문제가 없어보이지만 실제로 수행해보면 API 호출이 되지 않는다.

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout));
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이유가 무엇일까?

해당 소스는 Mono를 생성한 것이지 실행한 것이 아니다. 조금 더 용어를 바꿔보자면 데이터가 아직 흘러간 것이 아니다. Reactive Programming이나 Spark를 사용해 본 사람은 이해할 수 있는 부분이다.

 

Reactive Programming

Reactive Programming

1. Publisher

끊임없이 data를 생성한다. 위에서 살펴보았던 Mono / Flux가 Spring Reactor가 구현한 Publisher의 역할로 이해할 수 있다.

 

2. Subscriber

data를 요청해서 받아간다. reactive programming의 핵심 중 하나는 back pressure 이다. data를 소비하는 쪽에서 충분히 여유가 있을때 요청하여 받아가는 형태로 이해할 수 있다.

 

3.Subscription

publisher 와 subscriber사이에 위치하여 이벤트를 통하여 data를 전달해 주는 역할로 이해할 수 있다.

Example소스들을 보면 이해가 더 빠를 듯 하다.

Flux<Integer> seq = Flux.just(1, 2, 3);

        seq.subscribe(new Subscriber<Integer>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("Subscriber.onSubscribe");
                this.subscription = s;
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Subscriber.onNext: " + i);
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Subscriber.onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
Flux<String> flux = Flux.generate ( .... );
flux.subscribe(new BaseSubscriber<String> () {
            private int receiveCount = 0;
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(5);
            }
            @Override
            protected void hookOnNext(String value) {
                receiveCount++;
                if (receiveCount % 5 == 0) {
                    request(5);
                }
            }
            @Override
            protected void hookOnComplete() {
            }
        });

생각보다 표준을 지키면서 Publisher, Subscriber, Subscription을 구현하는 것이 쉽지 않기 때문에 다양한 구현체 중 선택하여 사용하는것을 권장하며 Spring Reactor가 그중에 하나인것으로 이해하면 된다.

 

Spring MVC + WebClient

결국 Mono / Flux 를 생성하는 것 자체로는 아직 데이터가 흐르지 않은 것이다. 

다시 돌아와서 아까 WebClient소스가 왜 동작하지 않는지 살펴보면 원인을 알 수 있다. subscribe 가 없기 때문이다!

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .subscribe();
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이와 같이 subscribe()를 호출하도록 변경하면 정상적으로 WebClient를 통한 호출이 이루어지는 것을 확인할 수 있다.

 

그렇다면 기존에 Flux로 변경하여 처리하던 부분이 어떻게 수행이 될 수 있는 것일까?  조금 생각해보면 유추가 가능하다.

Flux는 실제 데이터의 흐름이 아니다. toStream을 통해서 Stream객체를 만들어 내려면 데이터를 흘려보내야만 한다.

Flux.class 와 BlockingIterable.class 에서 다음부분을 발견할 수 있다.

public Stream<T> stream() {
        BlockingIterable.SubscriberIterator<T> it = this.createIterator();
        this.source.subscribe(it);
        Spliterator<T> sp = Spliterators.spliteratorUnknownSize(it, 0);
        return (Stream)StreamSupport.stream(sp, false).onClose(it);
    }

 

 

 

<개선효과>

1. 대략 500~600번정도 외부 API를 호출해야하는 요건이 있었다.

RestTemplate을 활용하여 순차적으로 처리할 경우 약18분정도 걸렸으며 @Async + WebClient 로 변경하였을 때는 약 3분정도 걸렸다.

 

2. 기대한 것보다 효과가 나오지 않았는데 그 이유는 다음과 같다.

 a. API 호출 후 결과값을 참조하여 RDB에 update하는 로직이 존재함

 b. 호출되는 API중에도 상당수가 동기로 만들어져 있어서 결국 응답을 받아서 처리해야 하는 로직(a)상은 비슷한 시간이 소요될 수 있음

그럼에도 자원 사용율은 20%, 전체적인 응답시간은 1/6 로 줄어든 효과가 있었다.

 

3. 순차처리 할때 발생하지 않았던 문제가 하나 생겼는데 동시에 같은 DB Row에 접근하여 간혹 Lock이 발생하는 문제가 있었다.

이부분은 API 호출 후 결과값을 잘 정비하고 RDB에 update하는 로직을 뒷부분으로 옮겨오면서 해결할 수 있었다. 

 

+ Recent posts