<가상의 시나리오>

- Ingestion Layer에서 수백개의 병렬처리를 통해서 데이터를 생성하고 있으며, 해당 데이터에 접근할 수 있도록 파티션별로 Raw API 제공되고 있음

- Role, Scheduler, Auth 등등 여러 가지 문제 때문에 신규 API 를 만드는 데 시간이 필요함 (파티션별로 나누어진 데이터를 합쳐서 연산해야 함)

-  Application Layer에서 Raw API들을 반복 호출하여 결과값을 연산하도록 로직을 구성하도록 하며 최대한 처리속도를 끌어올리고 자원효율을 극대화하자.

 

<접근방법>

- 다수의 API 호출 후 결과를 조합해야 하는 경우 Web client를 활용하여 비동기 호출로 효과를 봤었다. (이전 포스트 참조)

https://icthuman.tistory.com/entry/Spring-WebClient-%EC%82%AC%EC%9A%A9%EC%8B%9C-%EC%A3%BC%EC%9D%98%EC%A0%90

 

Spring WebClient 사용 #1

- Async API Call 후 응답을 제대로 처리하지 못하는 현상이 있습니다. - 그 여파로 내부적으로 AtomicInteger를 이용하여 호출Count를 처리하는 로직이 있는데 해당 로직이 수행되지 않아서 버그가 발생

icthuman.tistory.com

https://icthuman.tistory.com/entry/Spring-WebClient-%EC%82%AC%EC%9A%A9-2-MVC-WebClient-%EA%B5%AC%EC%A1%B0

 

Spring WebClient 사용 #2 (MVC + WebClient 구조)

- Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다. - 현재 구성 중인 시스템에는 동기/비동기 API가 혼재되어 있으면서, 다양한 Application / DB를 사용중이기 때

icthuman.tistory.com

https://icthuman.tistory.com/entry/Spring-WebClient-%EC%82%AC%EC%9A%A9-3-Configuration-Timeout

 

Spring WebClient 사용 #3 (Configuration, Timeout)

이전글 Spring WebClient 사용 #2 (MVC + WebClient 구조) Spring WebClient 사용 #2 (MVC + WebClient 구조) - Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다. - 현재 구성 중인 시

icthuman.tistory.com

- 신규 프로젝트에서는 기술스택을 Spring WebFlux 로 선정하였다. 그 이유는 다음과 같다.

 

a. 기본적으로 Spring, Java에 대한 이해도가 높다. 하지만 Legacy 코드는 없다.

b. 데이터에 대한 읽기 연산이 대부분이고, 특별한 보안처리나 트랜잭션 처리가 필요없다. (참조해야할만한 Dependecny 가 적다.)

c. 저장공간으로 Redis Cache를 활용한다. 즉, Reactive를 적극 활용할 수 있다.

d. 다수의 API 호출을 통해서 새로운 결과를 만들어 낸다.

즉, IO / Network의 병목구간을 최소화 한다면 자원활용을 극대화 할 수 있을 것으로 보인다.

 

<진행내용>

- 기존의 For loop 방식과 Async-non blocking 차이,그리고 Mono / Flux 를 살펴본다. (Spring WebFlux) 

@ReactiveRedisCacheable
public Mono<String> rawApiCall(...) throws .Exception {

Mono<String> response = webClient
                .get()
                .uri(url)
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception(...)))
                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception(... )))
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorMap(ReadTimeoutException.class, e -> new Exception(...))
                .onErrorMap(WriteTimeoutException.class, e -> new Exception(...))
                .onErrorMap(TimeoutException.class, e -> new Exception(...));
                
                return response;
}

webClient를 이용해서 타 API를 호출하는 부분이다. 응답값에는 다수의 건이 포함되어 있으나 해당 데이터를 보내는 쪽에서도 병렬처리를 진행하고 있기 때문에 Collection 이나 Array 형태로 처리하는 부분을 제외하고 그냥 Raw line 형태로 제공하고 있다.

Spring MVC기반에서는 이 값을 꺼내기 위해서 결국 block하고 값에 접근하는 로직이 필요하다. 굳이 코드로 구현하자면 아마도 이렇게 만들어 질 것이다.

List<ApiResponse> ret = new ArrayList<>();
for(String value : Collection ... ){

   String contents = apiService.rawApiCall(value).block();

   String[] lines = contents.split("\n");
   for(String data : lines){
       if(StringUtils.hasText(data)){

           ApiResponse apiResponse =  mapper.readValue(data, ApiResponse.class);

           if(populationHourApiResponse .. ){
               // biz logic
				
               FinalResponse finalResponse = new FinalResponse();
               // setter
               ...
               ..
               
               ret.add(finalReponse);
           }
       }
   }
}

이 코드에는 여러가지 문제점이 있는데

- block()을 수행하게 되면 비동기 넌블러킹 처리의 여러 장점이 사라진다.

- 오히려 더 적은 수의 쓰레드를 사용해야 하는 구조특성상  block이 생기면 더 병목이 발생하는 경우도 있다.

- return 에 얼만큼의 데이터가 담길지 모르게 된다.

- API Call 이후 biz logic의 수행시간이 길어질 수록 전체 응답시간은 더욱 길어진다.

 

해당 내용을 block없이 처리하도록 Flux를 최대한 활용하여 작성해보았다.

public Flux<FinalResponse> getDataByConditionLevel1{

    List<Mono<String>> monoList = new ArrayList();
    for(String value : Collections ...)){
        monoList.add( apiService.rawApiCall(value) );
    }

    return 
        Flux.merge(monoList)
                .flatMap(s -> Flux.fromIterable(Arrays.asList(s.split("\n"))))
                .filter(s -> StringUtils.hasText(s))
                .map(data -> {
                    try {
                        return mapper.readValue(data, PopulationApiResponse.class);
                    } catch (JsonProcessingException e) {
                        log.error(e.getLocalizedMessage());
                    }
                    return new ApiResponse();
                })                                                                      
                .filter(aApiResponse -> ... biz logic)     
                .map(apiResponse ->
                     new FinalResponse(...)
                );
  }

주요하게 바뀐부분을 살펴보면 다음과 같다.

 

1. API응답의 결과를 block해서 기다리지 않고 Mono를 모아서 Flux 로 변환한다.

Mono는 0..1건의 데이터, Flux는 0..N건의 데이터를 처리하도록 되어있다.

즉 개별 Mono를 대기하여 처리하는 것이 아니라 하나의 Flux로 모아서 단일 Stream처럼 처리할 수 있다,.

 

2.  값이 아니라 행위를 넘겨준다.

Spring WebFlux에서는 기본적으로 Controller - Service - Dao 등의 Layer간 이동을 할때 Mono / Flux 를 넘겨준다.

즉, 어떠한 값을 보내는 것이 아니라 Mono / Flux로 구성된 Publisher를 전달해주면 subscribe를 통해서 실제 데이터가 발생될 때 우리가 정의한 Action을 수행하는 형태가 된다고 이해하면 될듯 하다. (Hot / Cold 방식의 차이가 있는데 일단 Skip하도록 한다.)

 

위의 로직은 각 개별 데이터 간의 연산이나 관계가 없기 때문에 비교적 쉽게 변경할 수 있었다.

하지만 해당 데이터를 다시 조합하거나 Grouping 하거나 하는 경우가 있다면 약간 더 복잡해질 수 있기 때문에 고민이 필요하며 각 비지니스 케이스에 적합한 단위와 연산으로 재설계를 해주는 것이 좋다. ( -> 필수다 !)

예를 들어서 rawApiCall에 필요한 인자값이 yyyyMMdd hh:mm:ss 형태의 timeStamp라면 특정기간 내 시간대별 결과를 얻기 위해서는 다음과 같이 Call을 하고 조합해야 한다.

즉 수행해야 하는 액션은 다음과 같다.

- Flux를 응답받는 메소드를 다시 감싸서

- 응답결과를 적절하게 Biz Logic에 따라서 처리한 뒤

- aggreation 을 통하여 새로운 응답을 만들어 낸다. (e.g 그룹별 개수, 합계, 평균 등등)

 

코드로 작성해보면 이러한 형태가 될텐데

public Flux<NewResponse> getDataByConditionLevel2{	

    List<Flux<NewResponse>> ret = new ArrayList();
        for( ; ; ){
            ...
            // Biz Logic...
            ...
            
            Flux<NewResponse> flux = getDataByConditionLevel1( ...  )
                     .groupBy(apiSummary -> apiSummary.getKey() )
                     .flatMap(groupedFlux -> groupedFlux.reduce( (arg1, arg2) -> ApiSummary.add(arg1, arg2) )
                                                        .map(apiSummary -> NewResponse.valueOf( ...+ groupedFlux.key(), apiSummary ))
                                                      );

            ret.add(flux);
        }
        return Flux.merge(ret);

 위의 코드에서 살펴볼 부분은 세 가지이다.

- groupBy : getDataByConditionLevel1 메소드에서 받아온 결과를 Key단위로 Grouping을 수행한다.

  이때 수행결과로는 GroupedFlux가 리턴되는 데 이는 중첩된 데이터 형태로 flatMap 을 통해서 작업하는 것이 수월하다.

 

- reduce : groupBy 로 분류된 데이터들을 key 단위로 reduce 하게 되는데 ( 자주 보게되는 wordCount sample과 유사하다).

  Java내에는 Integer , Double등의 타입에서 ::sum 메소드를 제공하고 있지만 우리가 직접 작성한 Class 에 대해서는 연산메소드를 정의해주는 것이 필요하다. 위 예제에서는 ApiSummary.add(arg1, arg2) 이다.

 최종 객체변환의 편의성을 위해서 NewResponse.valueOf 메소드도 정의해서 사용하였다.

 

- Mono/Flux간 변환

getDataByConditionLevel1 메소드에서 살펴본것 처럼 여러 개의 Mono는 하나의 Flux로 변환이 가능하다.

또한 Flux에 대한 reduce 연산은 Mono로 변환이 된다.

그리고 여러 개의 Flux 를 합쳐서 하나의 Flux로 변환하는 것도 가능하다.

 순서보장이 필요한지, 병렬처리가 필요한지 등 여러가지 요건을 고려하여 적절한 연산자를 사용하도록 한다.

 

<정리>

- 처음에는 blocking 로직을 벡엔드에서 가지고 있는 것이 적합하지 않아서 FrontEnd에서 해당 API들을 호출하여 결과값을 연산하는 형태로 접근했었다. (Promise all)

- 일주일치의 데이터를 기반으로 결과값을 생성하기 위해서는 총 24 * 7 = 168 회 API 호출이 필요했고, 프론트에서 처리시간은 최악의 경우 15초를 넘어가는 케이스가 발생하였다.

- Spring Web Flux를 활용하여 Backend에서 처리하도록 개선하였으며 또한 Raw API Call을 수행하는 메소드에 별도로 개발한 Cache Aspect를 적용하였다.

그 이유는 Spring Cache Manager에서 async/non-blocking에 대한 표준 구현체가 없다보니 직접 CacheMono/Flux와  ReactiveRedisTemplate등을 사용하여 값을 처리하도록 구현하였다.

이에 대한 내용은 다음 포스트에서 좀 더 자세히 다루도록 하겠다.

 

<결과>

- 최초 호출시 약 4~5초 정도 수행시간이 소요되며, 각 Raw API 캐시 이후에는 약 1초 정도 걸리는 것을 확인할 수 있었다.

  Network 처리에 가장 많은 시간이 소요되기 때문에 사실 개별 API Call만 캐시해도 성능이 대폭 향상된다.

- 하지만 아직 몇 가지 더 살펴보고 싶은 욕심이 있는데.. 

 a. Mono / Flux 레벨에서의 캐시

 b. Raw API뿐만 아니라 최종 API에 대한 값 캐시 

 (각 Raw API 응답값이 변하기도 하고, 워낙 대상이 많다보니 캐시대상을 늘릴 경우 저장공간에 대한 우려가 있다.)

 c. Reactor에서의 병렬처리

 (Schedulers, parallel 등)

 

<참조>

Reactor에 대한 내용이 잘 정리되어 있다.

https://godekdls.github.io/Reactor%20Core/reactorcorefeatures/

 

Reactor Core Features

리액터 코어 기능 한글 번역

godekdls.github.io

 

https://icthuman.tistory.com/entry/Reactive-Programming-1-%EA%B4%80%EB%A0%A8-%EA%B0%9C%EB%85%90%EC%A0%95%EB%A6%AC

 

Reactive Programming #1 (관련 개념정리)

최근 Reactive Programing이라는 개념이 많이 사용되고 있어서 관련하여 개념들을 정리를 해보려고 한다.1. Event DrivenReactive를 알기 위해서 먼저 Event Driven을 알아볼 필요가 있다.Event Driven은 말 그대로

icthuman.tistory.com

 

<개요>

- 서비스 내에서 로직 처리후 타 서비스 연동을 위해서 Service Call을 다시 하는 구조

- Spring에서 제공하는 ApplicationEvent, ApplicationListener를 활용하여 리팩토링

 

<내용>

- 현재 서비스내에서 다음과 같이 특정 상황이 발생했을 때 Noti하기 위해서 Slack으로 메시지를 보내는 로직들이 구성되어 있다.

 @Async("executorBean")
    public CompletableFuture<Boolean> refreshCount(){

        log.info("Before Current API Call remains {} ", countCallService.getCallCount());
        if(countCallService.getCallCount() > 0){
            sendSlackMessageInfo("");
        }
        List<MetaDto> metaList = filterService.findAllMetas();

        sendSlackMessageInfo( String.format("%s %s", metaList.size(), "metas") );
        log.info(...);
        ...
        
    }
    
    private void sendSlackMessageInfo(String text){
        log.info(text);
        SlackMessage message = SlackMessage
                .builder()
                .text(text)
                .color(SlackMessage.Color.GOOD)
                .build();
        slackMessageService.postMessage(message);
    }

 

* 문제점

- 비지니스와 관계없는 SlackService와 Dependency가 발생한다.

- Slack 이외에 다른 연동이나 로직이 추가될 수록 코드가 지저분해진다. 

- 실제 비지니스 로직과 관계없는 Message 조립에 대한 로직이 포함되고 있다.

 

<Refactoring #1>

1. Service

 @Async("executorBean")
    public CompletableFuture<Boolean> refreshCount(){

        log.info("Before Current API Call{} ", countCallService.getCallCount());
        if(countCallService.getCallCount() > 0){
             eventPublisher.publishEvent(new RefreshMetaErrorEvent(""));
        }
        List<MetaDto> metaList = filterService.findAllMetas();

        eventPublisher.publishEvent(new RefreshMetaStartEvent(metaList.size()));
        log.info(...);
        ...
        
    }

Biz Service내에서는 Event만 publish 하도록 변경하였다.

 

2. EventHandler

public class RefreshMetaEventHandler {

    private final SlackMessageService slackMessageService;

    @EventListener
    public void errorMessage(final RefreshMetaErrorEvent refreshMetaErrorEvent){
        final String message = refreshMetaErrorEvent.getMessage();

        sendSlackMessageInfo(message);
    }

    @EventListener
    public void startMessage(final RefreshMetaStartEvent refreshMetaStartEvent){
        final int count = refreshMetaStartEvent.getCount();
        final String message =  String.format("%s %s", count, "filters");
        sendSlackMessageInfo(message);
    }

    private void sendSlackMessageInfo(String text){
        log.info(text);
        SlackMessage message = SlackMessage
                .builder()
                .text(text)
                .color(SlackMessage.Color.GOOD)
                .build();
        slackMessageService.postMessage(message);
    }
}
@Getter
@RequiredArgsConstructor
public class RefreshMetaStartEvent {

    private int count;
}
@Getter
@RequiredArgsConstructor
public class RefreshMetaErrorEvent {

    private String message;
}

- 상황에 따라 Event를 수신하여 처리하는 로직은 모두 EventHandler로 모아서 구성하였다. (Service Dependency정리)

- slack연동외의 다른 로직을 추가하기 간결한 구조로 정리되었다.

 

* 남은 문제점

- Event가 많아질 수록 메소드가 늘어난다.

- EventHandler나 SlackMessageService의 경우 각 Message의 전달만을 책임져야 한다.

- Message조립에 대한 로직은 분리되는 것이 맞다.

 

<Refactoring #2>

1. Abstract Class 및 Concrete Class 정의

public abstract class AbstractRefreshMetaEvent {

    public abstract String getMessage();
}
public class RefreshMetaStartEvent extends AbstractRefreshMetaEvent{

    private final int count;

    @Override
    public String getMessage() {
        return String.format("%s %s", count, "metas");
    }
}
public class RefreshMetaDemoEvent extends AbstractRefreshMetaEvent {

    private final String svcColumnName;
    private final int count;

    @Override
    public String getMessage() {
        return String.format("%s has %s data",svcColumnName, count);
    }
}
public class RefreshMetaErrorEvent extends AbstractRefreshMetaEvent{

    private final String message;
}

- 각 Event별로 Message 조립에 대한 역할을 가져가도록 분리한다.

- 필요에 따라서 세부 Event를 추가로 정의하고 각 상황에 맞게 상세구현을 진행할 수 있다.

 

2. EventHandler

public class RefreshMetaEventHandler {

    private final SlackMessageService slackMessageService;

    @Async
    @EventListener
    public void sendMessage(final AbstractRefreshMetaEvent refreshMetaEvent){
        final String message = refreshMetaEvent.getMessage();

        sendSlackMessageInfo(message);
        // 연동 및 로직 추가구성 가능 
    }

    private void sendSlackMessageInfo(String text){
        log.info(text);
        SlackMessage message = SlackMessage
                .builder()
                .text(text)
                .color(SlackMessage.Color.GOOD)
                .build();
        slackMessageService.postMessage(message);
    }
}

- AbstractRefreshMetaEvent 타입으로 정리하고 Slack으로 전송하는 로직만 남게된다.

- 추가 연동이 발생하거나 로직 수정시 타 소스에 영향도가 낮다.

 

<정리>

- Event 발행구조 : 해당 Event발생에 따른 후속 상세로직은 Biz Service에서 알 필요가 없다.

- AbstractClass 를 활용하여 추상화하고, 세부 메세지 조립에 대한 로직은 상세클래스에서 구현한다.

- 후속 로직은 요건에 따라 비동기로 처리하여 Biz Logic에 영향이 없도록 한다.

 (비동기/넌블러킹 기술스택으로 변경하여 throughput 을 증가시키는 것은 덤!)

- 구조가 기술보다 우선이다.

 

<개요>

- Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다.

- 현재 구성 중인 시스템에는 동기/비동기 API가 혼재되어 있으면서, 다양한 Application / DB를 사용중이기 때문에 SpringMVC 구조를 유지하면서 WebClient사용을 통해서 외부 API호출부분을 개선해보고 싶었다.

 

<내용>

- RestTemplate사용법과 WebClient사용법을 비교한다.

- Spring MVC 에서 WebClient 사용방법

- WebClient 를 사용할 때 이해할 Reactive Programming개념

 

RestTemplate 과 WebClient

1. RestTemplate

일반적으로 RestTemplate을 사용하여 HTTP 호출을 할 경우 다음과 같이 작성한다.

 try {
      restTemplate.exchange(URL,
                            HttpMethod.POST,
                            new HttpEntity<>(message),
                            new ParameterizedTypeReference<ResponseEntity>() {}
      );
    } catch (HttpStatusCodeException ce) {
      log.error(String.format("Exception occurred : %s, %s, %s"),
                ce.getStatusCode(),
                ce.getResponseBodyAsString());
      throw new RuntimeException(ce.getStatusCode(), ce.getResponseBodyAsString());
    }

이 때 RestTemplate을 @Bean으로 구성할때 공통으로 해당하는 속성들을 다음과 같은 형태로 정의한다.

@Configuration
public class RestConfig {
  @Bean
  public RestTemplate restTemplate() {
    return new RestTemplateBuilder()
      .messageConverters(getHttpMessageConverters())
      .setConnectTimeout(Duration.ofSeconds(10))
      .setReadTimeout(Duration.ofSeconds(10))
      .errorHandler(new HttpResponseErrorHandler())
      .requestFactory(() -> getClientHttpRequestFactory())
      .additionalInterceptors(bearerAuthInterceptor())
      .build();
  }

 

2. WebClient

WebFlux에서 제공하는 WebClient를  사용하여 HTTP호출을 구성하면 다음과 같다.

contents = webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn("-1")
                .flux()
                .toStream()
                .findFirst()
                .orElse("-1");

WebClient에서는 크게 retrieve() 와 exchange()를 제공하고 있으며 exchange사용시 Response처리를 제대로 하지 않을 경우 메모리 누수등이 발생할 수 있기 때문에 되도록 retrieve()를 권장하고 있다.

 

이때 WebClient에 필요한 속성들 역시 유사하게 다음과 같이 구성할 수 있다.

HttpClient httpClient = HttpClient.create()
                .tcpConfiguration(
                        client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientProperties.getConnectionTimeout()) //miliseconds
                                .doOnConnected(
                                        conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientProperties.getReadTimeout(), TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler(webClientProperties.getWriteTimeout(), TimeUnit.MILLISECONDS))
                                )
                );

        return WebClient.builder()
                .baseUrl(webClientProperties.getBaseUri())
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();

본인의 경우 다양한 Host의 API에 접근해야 하고 각각 응답시간이 다르기 때문에 타임아웃등의 설정을 Host별 Properties로 관리/생성하여 사용하고 있다.

 

3. RestTemplate vs WebClient

a. 가장 큰 차이는 동기/비동기의 차이로  볼 수 있다.

RestTemplate을 사용할 경우 기존 MVC 아키텍처를 활용하여 내부적으로 하나의 요청이 쓰레드에 종속되고, 응답대기에 대한 시간이 소요되어 전체적인 자원소모/처리시간의 지연을 가져오게 된다.

WebClient의 경우 비동기처리로 요청을 받는 쓰레드와 일하는 쓰레드를 분리하고 자원접근,응답대기에 대한 부분을 최소화 하여 효율적인 처리가 가능하다.

최근 MSA구조에서는 연계되는 API호출이 많으면서 개별 호출시간이 짧아지는 특성이 있다. 비동기처리로 효율성을 끌어올리는 경우가 많다. 물론 구조적인 복잡성이 증가하게 된다.

 

b. Spring WebFlux는 비동기처리를 기반으로 하고 있으며 그에 대한 Reactive Programming구현제로 Spring Reactor를 사용하고 있다. 따라서 WebClient호출 후 응답은 Mono/Flux 를 통해서 처리해야만 한다.

비동기 방식은 자원접근, 대기시간에 장점이 있는 대신에 오류가 발생했을때 디버깅을 하거나 추적하기에 어려운 부분이 있다.

 

Spring MVC + WebClient 사용

1. Spring WebFlux

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello World");
    }
    
    @GetMapping("/api-call")
    Mono<String> apiCall() {
        return apiCallService.callApi();
    }

}

Spring WebFlux의 경우 Controller에서 응답처리로 Mono/Flux를 사용한다.

WebClient를 이용한 다음과 같은 처리도 가능하다.

    public Mono<String> callApi() {

        return webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout));

    }

 

2. Spring MVC + WebClient

그러나 기존 Spring MVC구조는 유지하면서 API호출부분만 비동기 WebClient를 사용하고 싶다면 약간의 변경이 필요하다.

@RequestMapping(value="URL", method= RequestMethod.GET)
    public @ResponseBody
    void getFiltersTotalCount(HttpServletResponse response) {

        try{

            if( apiCallService.callApi().get(10, TimeUnit.SECONDS) ){

                response.setStatus(HttpServletResponse.SC_OK);

            }else{
                response.setStatus(HttpServletResponse.SC_CONFLICT);
            }
        }catch(Exception e){
            log.error("API Exception {}", e.getMessage());
            response.setStatus(HttpServletResponse.SC_CONFLICT);
        }

    }
@Async("executorBean")
    public CompletableFuture<Boolean> callApi() {

        boolean contents=false;

        contents = webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(Boolean.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn(false)
                .flux()
                .toStream()
                .findFirst()
                .orElse(false);

        return CompletableFuture.completedFuture(contents);

    }

- Controller Layer의 응답처리를 위해서 get()을 통해 대기하는 부분때문에 비동기처리 효과는 감소하지만 기존 아키텍처와 호환성을 유지하면서 다수의 Api Call을 할 경우 어느정도의 효율성을 기대할 수 있다.

 

3. 주의점

https://icthuman.tistory.com/entry/HHH000346-Error-during-managed-flush-null?category=568674 

 

Spring JPA, SecurityContext사용시 ThreadLocal 사용범위 관련 #1

<현상> - HHH000346: Error during managed flush [null] - Spring JPA 사용중 Transcation 처리가 정상적으로 되지 않는 현상 <원인> - 사용자 ID를 @CreatedBy 를 사용해서 관리중 (@EnableJpaAuditing) -..

icthuman.tistory.com

- 지난번 글에서 정리했던 것처럼 Spring Component 중에서는 ThreadLocal을 활용하고 있는부분들이 존재한다. 해당 부분은 @Async를 통과하면서 값이 사라지기 때문에 적절한 propagation 방법을 찾아야 한다.

- Spring MVC구조의 특성상 @Async를 수행하는 Thread Pool 이 모두 수행할 경우 오류가 발생할 수있음을 주의해야 한다.

- 동기처리 요청/응답 방식이 아니기 때문에 디버깅이나 테스트코드 작성이 어려운 부분도 있으며

- WebClient 사용시 Reactive Programming의 개념에 대해서 숙지해야 하는 부분이 있다. 특히 이 부분은 기본개념이면서 매우 중요한 부분이기 때문에 강조하고 싶었다.

 

다음 예제를 살펴보면 기존에 Response처리하던 부분을 void 타입을 활용하여 fire-forget 으로 변경하였다. 크게 문제가 없어보이지만 실제로 수행해보면 API 호출이 되지 않는다.

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout));
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이유가 무엇일까?

해당 소스는 Mono를 생성한 것이지 실행한 것이 아니다. 조금 더 용어를 바꿔보자면 데이터가 아직 흘러간 것이 아니다. Reactive Programming이나 Spark를 사용해 본 사람은 이해할 수 있는 부분이다.

 

Reactive Programming

Reactive Programming

1. Publisher

끊임없이 data를 생성한다. 위에서 살펴보았던 Mono / Flux가 Spring Reactor가 구현한 Publisher의 역할로 이해할 수 있다.

 

2. Subscriber

data를 요청해서 받아간다. reactive programming의 핵심 중 하나는 back pressure 이다. data를 소비하는 쪽에서 충분히 여유가 있을때 요청하여 받아가는 형태로 이해할 수 있다.

 

3.Subscription

publisher 와 subscriber사이에 위치하여 이벤트를 통하여 data를 전달해 주는 역할로 이해할 수 있다.

Example소스들을 보면 이해가 더 빠를 듯 하다.

Flux<Integer> seq = Flux.just(1, 2, 3);

        seq.subscribe(new Subscriber<Integer>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("Subscriber.onSubscribe");
                this.subscription = s;
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Subscriber.onNext: " + i);
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Subscriber.onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
Flux<String> flux = Flux.generate ( .... );
flux.subscribe(new BaseSubscriber<String> () {
            private int receiveCount = 0;
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(5);
            }
            @Override
            protected void hookOnNext(String value) {
                receiveCount++;
                if (receiveCount % 5 == 0) {
                    request(5);
                }
            }
            @Override
            protected void hookOnComplete() {
            }
        });

생각보다 표준을 지키면서 Publisher, Subscriber, Subscription을 구현하는 것이 쉽지 않기 때문에 다양한 구현체 중 선택하여 사용하는것을 권장하며 Spring Reactor가 그중에 하나인것으로 이해하면 된다.

 

Spring MVC + WebClient

결국 Mono / Flux 를 생성하는 것 자체로는 아직 데이터가 흐르지 않은 것이다. 

다시 돌아와서 아까 WebClient소스가 왜 동작하지 않는지 살펴보면 원인을 알 수 있다. subscribe 가 없기 때문이다!

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .subscribe();
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이와 같이 subscribe()를 호출하도록 변경하면 정상적으로 WebClient를 통한 호출이 이루어지는 것을 확인할 수 있다.

 

그렇다면 기존에 Flux로 변경하여 처리하던 부분이 어떻게 수행이 될 수 있는 것일까?  조금 생각해보면 유추가 가능하다.

Flux는 실제 데이터의 흐름이 아니다. toStream을 통해서 Stream객체를 만들어 내려면 데이터를 흘려보내야만 한다.

Flux.class 와 BlockingIterable.class 에서 다음부분을 발견할 수 있다.

public Stream<T> stream() {
        BlockingIterable.SubscriberIterator<T> it = this.createIterator();
        this.source.subscribe(it);
        Spliterator<T> sp = Spliterators.spliteratorUnknownSize(it, 0);
        return (Stream)StreamSupport.stream(sp, false).onClose(it);
    }

 

 

 

<개선효과>

1. 대략 500~600번정도 외부 API를 호출해야하는 요건이 있었다.

RestTemplate을 활용하여 순차적으로 처리할 경우 약18분정도 걸렸으며 @Async + WebClient 로 변경하였을 때는 약 3분정도 걸렸다.

 

2. 기대한 것보다 효과가 나오지 않았는데 그 이유는 다음과 같다.

 a. API 호출 후 결과값을 참조하여 RDB에 update하는 로직이 존재함

 b. 호출되는 API중에도 상당수가 동기로 만들어져 있어서 결국 응답을 받아서 처리해야 하는 로직(a)상은 비슷한 시간이 소요될 수 있음

그럼에도 자원 사용율은 20%, 전체적인 응답시간은 1/6 로 줄어든 효과가 있었다.

 

3. 순차처리 할때 발생하지 않았던 문제가 하나 생겼는데 동시에 같은 DB Row에 접근하여 간혹 Lock이 발생하는 문제가 있었다.

이부분은 API 호출 후 결과값을 잘 정비하고 RDB에 update하는 로직을 뒷부분으로 옮겨오면서 해결할 수 있었다. 

 

+ Recent posts