<개요>

- Designing Data-Intensive Applications 를 읽고 그 중 분산시스템의 오류처리에 대한 부분 정리

https://icthuman.tistory.com/entry/The-Trouble-with-Distributed-Systems-1

 

<내용>

6. Timeouts and Unbounded Delays

- Timeout이 Fault를 감지하는 확실한 방법이라면 얼마로 설정해야 할까?

- Long timeout : Node가 죽었다는 것을 인지하기 위해서는 오래 기다려한다. (사용자는 기다리거나 에러메시지를 확인한다.)

- Short timeout : fault를 빠르게 감시할 수 있지만 잘못 인식할 있는 위험이 있다. (spike같이 일시적인 현상도 있기 때문에)

 

* 문제점

- 작업이 살아있고 수행하는 중이었는데 Node를 죽은 것으로 간주한다면, 작업이 종료되기 전에 다시 수행해서 중복 수행될 수 있다.

- 만약에 노드가 죽었다면 다른 노드에 이 사실을 전달해야하고 이것은 다른 노드나 네트워크에 추가적인 부하상황으로 이어질 수 있다.

  이미 시스템이 고부하상황이었고 노드가 죽었다고 잘못 판단할 경우 상황은 더 악화될 수 있다.

  특히 죽은 것이 아니라 overload로 인해서 응답이 지연되고 있었다면 (죽은게 아니었다면) 에러가 계속 전파되어서 모든 노드가 죽었다고 판단하면서 모든 작업이 멈춰버릴 수도 있는 극단의 상황도..

 

*아름다운 상상으로 접근 (fictitious system)

- 모든 패킷이 d 시간내에 전달된다고 하고, 살아있는 노드는 해당 request를 처리할때 r 시간내에 가능하다면 

- 모든 성공적인 request는 response time이 2d + r내로 들어올 것이고

- 해당 시간동안 응답을 받지 못한다면 network 나 node 가 동작하지 않는 것으로 간주할 수 있다.

- 그렇다면 2d + r 은 reasonable timeout 으로 사용할 수 있다.

 

*현실

- 불행하게도 대부분의 시스템은 이를 보장할 수가 없다.

- Asynchronous network 는 unbounded dealy를 가지고 있다.( 최대한 빨리 도착하도록 노력은 하지만.. upper limit이 존재하지 않는다는 점)

- 대부분의 서버 구현에서는 maximum time을 보장할 수가 없다. (Response time guarantees)

- Failure Detection을 위해서는 시스템이 빠르다는 것만으로는 충분하지 않다. Timeout이 너무 짧으면 위에서 살펴본것처럼 spike등이 발생하였을 때 system off-balance

 

* Network congestion and queueing

- 네트워크의 패킷 지연현상은 대부분 queueing 때문이다.

a. 여러 노드에서 동시에 한 곳으로 패킷을 보내면, 네트워크 스위치는 Queue에 채우고 Destination network link에 하나씩 넣어줘야 하는데, 패킷을 얻기 위해서 잠시 기다려야할 수도 있고 만약 Queue가 가득 차게되면 packet 이 drop되어서 다시 보내야한다.

b. 패킷이 Desination 머신에 도착했을 때 Cpu core가 모두 사용중이면 request처리준비를 할때까지 OS에서 queued된다.

c. 가상환경을 사용중이라면 OS가 종종 중지된다. 이 시간동안 VM은 network로부터 데이터를 소비할 수 없기 때문에 VM monitor에 의해서 queued (buffered) 된다.

d. TCP는 flow control을 수행하여 과부하를 방지하도록 속도를 제어하기도 한다. 또 TCP는 손실되는 패킷에 대해서 재전송을 해야하기 때문에 Delay를 두면서 timeout to expired 나 retransmitted packet을 기다린다.

(그래서 우리에게 이러한 기능이 필요없다면, 즉 유실방지,유량제어가 필요없고 지연된 데이터는 가치가 없는 상황이라면 UDP를 사용하는 것이 더 좋은 선택이 된다. 예를 들어서 VoIP call)

 

* 환경적인 문제

 Public Cloud 같이 여러 고객들이 같이 사용하는 네트워크 자원 (link, switch) ,각 NIC, CPU 등은 공유가 된다. 또 MapReduce같은 작업들은 병렬처리를 진행하면서 네트워크를 사용하기도 한다.

* 네트워크의 round trip시간의 분포를 적절하게 측정하여 예상되는 Delay 변동성을 결정하고,  Application의 특성을 고려하여 Failure detection delay  과  Risk of premature timeouts 간의 적절한 Trade Off를 결정할 수 있습니다.

* 더 좋은 방법은 상수값의 timeout보다는 Response time 과 Jitter를 지속적으로 측정하고 관찰된 응답시간의 분포에 따라서 Timeouts을 자동으로 조정하는 것입니다.

-  Phi Accrual failure detector (for example, Akka and Cassandra)

- TCP retransmission timeouts

<개요>

- Designing Data-Intensive Applications 를 읽고 그 중 분산시스템의 오류처리에 대한 부분 정리

- 언제나 늘 그렇듯이 새로운 개념이라기 보다는 얼마나 체계적으로 잘 정리해서 핵심을 간직하는가에 집중

 

<내용>

1. Faults and Partial Failures

- Single Computer에서 작업을 할 경우 same operation은 same result를 만들어낸다. (deterministic)

- 우리가 수 대의 컴퓨터에서 동작하는 소프트웨어를 개발할 때 (즉, 네트워크로 연결되어 있는 상태)는 이와는 다르다.

- 분산처리 시스템에서는 예측할 수 없니 특정 부분에서 문제가 발생하는 경우가 있는데 이를 partial failure 라고 부른다.

- partial failure의 어려운 점은 non-deterministic이라는 점이다. 여러 노드와 네트워크에 걸쳐서 무언가를 했을때 예상과는 다르게 성공하기도 하고 실패하기도 한다는 점인데 이러한 부분이 분산처리 시스템을 어렵게 만든다.

 

2. Building a Reliable System from Unreliable Components

- unreliable한 구성요소를 가지고 어떻게 하면 reliable한 시스템을 만들수 있을까? 몇 가지 아이디어를 소개한다.

a. error-correcting code를 가지고 어느부분이 오류가 발생했는지 체크하고 정확하게 전송할 수 있다.

b. TCP/IP : IP는 unreliable하다. 사라지거나 늦게 도착하거나 혹은 중복되거나.. 순서보장도 안된다. 여기에 TCP가 Transpory layer의 역할을 상위에서 해줌으로 사라진것은 다시 보내주고, 중복된것은 재거해주며, 발송순서에 맞게 재조립을 해준다.

물론 언제나 한계점은 존재한다. 데이터의 양이나, 혹은 네트워크 자체의 지연현상등은 커버하기 어렵다. 하지만 까다로운 일반적인 문제를 제거하고 나면 훨씬 쉬워지는것도 사실이다.

 

3. Unreliable Networks

- 분산처리에서는 shared-nothing구조를 유지하는 것이 일반적이다. 왜냐하면 다른 machine끼리는 네트워크로 연결이 되며 각자의 disk나 메모리등에는 접근할 수 없기 때문이다.

- 인터넷 환경에서 각 데이터 센터는 멀리 떨어져 있기 때문에 더욱 이러한 구조인데 대부분 비동기처리로 진행이 된다.(asynchronous packet networks)

- 메시지를 보내기는 하지만 언제 도착할지, 도착이 하기는 할지.. 보장할 수가 없다. 

 a. request가 사라질 수 있다.
 b. request가 늦게 도착할 수 있다.
 c. 원격 node가 죽을 수도 있고
 d. 원격 node가 잠깐 멈출수도 있고. (gc)
 e. 원격 node가 request처리를 했지만 response가 사라질수 있다.
 f. 원격 node가 request처리를 하고 reponse를 보냈지만 늦게 도착할수도 있다.
- 종합해 보면 메시지를 보낸쪽에서는 무엇이 문제인지 알수 있는 방법이 없다.

* 이러한 문제를 처리하는 방법이 일반적으로 "Timeout" 이다. 기다리는 것을 포기하는 것이다.

수신 node가 받았는지, 메시지가 사라졌는지는 여전히 알 수 없지만...

 

4. Network Faults in Practice

- 위에서 살펴본것처럼 reliable한 시스템을 만드는 법은 완벽한것은 없다. (왜? 네트워크는 여전히 불안하기 때문에..)

- 결국 소프트웨어에서 이를 처리할 수 있도록 해야한다. (이것을 지속적으로 테스트하도록 만든 프레임워크가 바로 Chaos Monkey)

 

5. Detecting Faults

- Faults를 감지해야 이후의 처리를 할수 있으니 감지하는 법을 살펴보자.

- 불확실성을 통해서 작동여부를 판단하는 것은 어려우니, 거꾸로 특정상황에서 작동하지 않는다는 것을 명시적으로 알려주는 FeedBack !

 a. 대상 포트에서 수신 프로세스가 없는 경우 OS에서 RST 또는 FIN을 전송한다.

 b. 노드 프로세스가 죽었지만 OS가 여전히 실행중이면 script로 다른 노드에 알릴 수 있다. (e.g Hbase)

 c. 데이터 센터에서 NIC 관리 기능을 사용중이면 하드웨어 수준으로 감지 할 수 있다.

 d. 라우터가 해당 IP에 연결할 수 없다고 확신하면 ICMP destination unreachable 패킷으로 응답할 수 있다. 

 

- 이렇게 빠르게 feedback처리를 하면 매우 유용함을 알 수 있다. 하지만 이것도 역시 신뢰할 수는 없다! (네트워크이니까)

- 결국 request가 성공적으로 처리되었는지는 application레벨에서의 positive response를 받는 것이 필요하다.

 

 

이전글
 
 - 기존에 Spring boot starter 2.1.x 버전에서는 발생하지 않았던 Exception이 2.3.x 로 오면서 발생하였다.

Caused by: org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer 

- 해당 오류는 WebClient를 통해서 API Call후 받아오는 응답의 크기가 일정이상이 될 경우 발생한다. 

 현재 사용중인 API의 응답사이즈가 1~3MB 수준으로 조정이 필요한 상태이다.

 

a. 주의해야할 점이 yml 파일내에서 다음 옵션을 사용하여 조정이 가능한 버전이 있으나

spring.codec.max-in-memory-size=20MB

 

 b. 지금 사용중인 버전에서는 동작하지 않아서 별도 설정을 적용하도록 하였다.

ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                .codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(3* 1024 * 1024))
                .build();

        return WebClient.builder()
                .exchangeStrategies(exchangeStrategies)
                .baseUrl( )
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();

 이와 같이 WebClient builder내에 추가하여 정상동작 하는 것을 확인하였다.

 

2.  Timeout 설정

- Spring WebClient를 사용할때 여러종류의 Timeout이 있다.

HttpClient httpClient = HttpClient.create()
                .tcpConfiguration(
                        client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, -A- ) //miliseconds
                                .doOnConnected(
                                        conn -> conn.addHandlerLast(new ReadTimeoutHandler( -B- , TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler( -C- , TimeUnit.MILLISECONDS))
                                )
                );


        return WebClient.builder()
                .exchangeStrategies(exchangeStrategies)
                .baseUrl(webClientProperties.getBaseUri())
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();
Mono<String> response = webClient
                .get()
                .uri(" ")
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception())
                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception())
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis( -D- ))
                .onErrorMap(ReadTimeoutException.class, e -> ))
                .onErrorMap(WriteTimeoutException.class, e -> ))

A. Connection Timeout

- Client / Server 간의 Connection을 맺기 위해 소요되는 시간을 의미한다.

- Server에서 새로 Connection을 맺을 자원의 여유가 없다면 발생할 수 있다.

- HTTP Connection에 대한 내용이기 때문에 keep-alive 옵션역시 사용가능하다.

 

B. Read Timeout

 - 데이터를 읽기 위해서 기다리는 시간을 의미한다.

 - 내가 호출한 API 가 응답을 주지 못하면 발생할 수 있다.

 - 너무 길다면? 적절히 설정해주지 않으면 응답을 받을때까지 계속 대기하게 되고, 자원이 고갈되는 현상이 발생한다.

 - 너무 짧다면? 요청을 받은쪽에서는 처리가 되었으나, 응답을 기다리던 쪽에서는 Timeout이 발생하게 되어서 불일치 상태가 발생한다. 

 

C. Write Timeout

 - 데이터를 쓰기 위해서 기다리는 시간을 의미한다.

 - 주어진 시간동안 Write Operation을 완료하지 못하면 발생할 수 있다.

 - 즉, Connection 연결 후 데이터를 보내기 시작하였으나 해당시간보다 길어지게 되면 중단된다.

 

D. reactor timeout

 - Reactive Stream은 Publisher, Subscriber, Subscription 을 통해서 비동기 / 넌블러킹 / back pressure 처리하는 개념이다.

 - 우리가 다루는 Spring WebFlux는 reactive stream의 구현체로 reactor를 사용하고 있으며 Mono / Flux가 Publisher의 구현체이다.

 - 따라서 Exception , Retry등을 처리할때도 기존 방식 대신 reactive stream의 기능을 활용해주는 것이 장점을 충분히 살릴 수 있는 방법이라고 생각한다.

 - Spring WebFlux에서는 WebClient의 호출결과를 받았을때 결과 Body를 Mono로 감싸주어 데이터를 전달하는 형태가 되는데, 해당 시간동안 데이터를 전달하지 못하게 되면 timeout 이 발생하게 된다.

 

E. Transaction Timeout과 비교 (개인적인 생각 , 틀릴 수 있음)

 - 우리가 일반적으로 DB transaction timeout을 설명할 때 Transaction Timeout > Statement Timeout > Socket Timeout 로 각 구간을 나누어서 설명하고 상위 Layer( ? 포장레이어?) 에서는 하위 Layer보다 크거나 같게 설정하는 것이 일반적이다.

 - Web 호출 역시 비슷하게 살펴본다면 Publisher Timeout > Read/Write Timeout > Connection Timeout 정도로 비슷하게 정리해 볼 수 있지 않을까 생각했다.

 

<정리>

- MSA구조에서 각 API의 응답시간과 사이즈는 적절하게 설정해야 한다.

- 특히, 각 레벨에서의 적절한 수준의 timeout 설정은 필수이다. 

- 너무 짧으면 많은 오류가 발생하게 되고, 이에 따른 side-effect (데이터 불일치, 로직의 복잡도 증가) 가 생기게 되며

- 너무 길거나 무제한으로 설정하게 되면 리소스 자원의 낭비가 발생한다.

<현상>

- Async API Call 후 응답을 제대로 처리하지 못하는 현상이 있습니다.

- 그 여파로 내부적으로 AtomicInteger를 이용하여 호출Count를 처리하는 로직이 있는데 해당 로직이 수행되지 않아서 버그가 발생하고 있었습니다.

 

<원인>

contents = webClient
                .post()
                .uri(multiCountApiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .body(BodyInserters.fromObject(inputs))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(200000))
                .onErrorReturn(null)
                .flux()
                .toStream()
                .findFirst()
                .orElse(null);

        try {
            ...
        } catch (IOException e) {
            ...
        }
        callCount.decrementAndGet();
    
        return CompletableFuture.completedFuture(ret);

- 원래의 의도는 API Call 오류가 발생하였을 경우 null 로 처리하여 빈값을 가져가도록 하는 것이었습니다.

- 그러나 실제로 테스트해보면 try 이후 구문이 수행되지 않고 있습니다.

- API Call에서 오류가 발생했을 경우 null 처리를 잘못하여 flux() 이하 로직이 수행되지 않고 있습니다.

 

<수정사항>

contents = webClient
                .post()
                .uri(multiCountApiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .body(BodyInserters.fromObject(inputs))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(200000))
                .onErrorReturn("")
                .flux()
                .toStream()
                .findFirst()
                .orElse("");

        try {
            ...
        } catch (IOException e) {
            ...
        }
        callCount.decrementAndGet();
    
        return CompletableFuture.completedFuture(ret);

     

- 위와 같이 bodyToMono에서 우리가 사용하고자 했던 타입에 맞는 값(e.g String "" )으로 처리해주면 onErrorReturn 이후 로직이 정상적으로 수행되는 것을 확인할 수 있습니다.

 

<추가로 확인해야 할 사항>

- timeout 이 발생했을 때 특정 로직을 수행하도록 handler 가 가능하다면 decrement 로직을 그쪽으로 옮길 수 있을지 검토가 필요합니다.

- onErrorReturn 이후 로직이 Spring내부에서 어떻게 동작하는지 상세한 확인이 필요합니다.

 

+ Recent posts