- read commited 와 snapshot isolation은 주로 동시 쓰기가 있는 경우, 읽기전용 트랜잭션의 볼수 있는 내용을 보장하는 것이다.
- 이전 글에서는 두 트랜잭션이 동시에 쓰는것에 대해서는 다루지 않았고, 특정 유형의 쓰기-쓰기 에 대해서만 살펴봤다.
- 몇 가지 다른유형의 충돌을 살펴 볼텐데 그 중 가장 유명한 것이 Lost Updates 이다.
- 아래 그림을 살펴보자.
TimeLine
1
2
3
4
User #1
getCounter
42 + 1
setCounter 43
Data
42
42
43
43
User #2
getCounter
42 + 1
setCounter 43
- 두 Client간의 race condition이 발생하였다. 주로 App이 read - modify - write의 사이클을 가지고 있을때 발생한다.
- 즉 write가 발생하기 전에 그 사이에 일어난 modification을 포함하지 않기 때문이다.
Solutions
Atomic write Operations
- 많은 DB가 atomic update operations를 제공하기 때문에 App에서 해당 로직을 구현하지 않는 것이 좋다. 일반적인 Best Solution
예) UPDATE counters SET val = val + 1 WHERE key = 'foo';
- 일반적으로 DB내부에서 execlusive lock을 통해서 구현하기 때문에 (update할때 읽을 수 없다!) "Cursor Stability 유지"
- 다른 방법으로 all atomic operations를 single thread 에서 수행하는 방법도 있다. (성능 고려)
Explicit Locking
- DB에서 해당 기능을 제공하지 않는다면 App에서 명시적으로 update 될 object를 잠그는 방법이다.
- 잠금을 수행후 read - modify - write 를 수행할 수 있으며, 다른 트랜잭션이 같은 object 에 접근할 경우에 첫 번째 read - modify - write 가 완료될 때 까지 강제로 대기한다.
예 )
BEGIN TRANSACTION
SELECT * FROM ... WHERE ... FOR UPDATE; (해당쿼리로 반환된 all rows lock !) UPDATE ...
COMMIT;
Automatically detecting lost updates
- atomic operations & locks : read - modify - write 를 순차적으로 하여 lost updates 를 막는다.
- 대안 : 병렬 수행을 허락하고, Transaction Manager 가 lost update를 감지하면, 트랜잭션을 중단하고 강제로 read - modify - write 를 retry 한다!
- 장점 : DB가 이 검사를 효율적으로 수행할 수 있다는 것. with Snapshot Isolation
PostgreSQL
repeatable read
automatically detect and abort the offending transaction
Orale
serializable
SQL
snapshot isolation
MySQL, InnoDB
repetable read
X
Compare-and-Set
- 우리가 CAS연산이라고 부르는 방법이다. DB가 Transcations를 제공하지 않는 atomic compare-and-set을 찾는 것이다. (Single-object writes)
- 마지막으로 값을 읽은 후 값이 변경되지 않았을때에만 업데이트가 발생할 수 있도록 허용하는 것이다.
- 만약 변경이 일어났다면? read-modify-write연산을 재시도한다. 반드시!
주의! Conflict replication
- Locks and Compared and Set은 Single up-to-date, copy of the data를 가정한다.
- replicated DB에서는 여러 노드에 복사본이 존재하고, 데이터 수정이 다른 노드에서 발생할 수 있기 때문에 다른차원의 접근이 필요하다. 즉, 다시 말하면 multi leader 또는 leaderless replication에서는 write가 동시에 발생하고, 비동기 연산이 있다면 보장할 수 없다. (Linearizability)
- 대신 "Detecting Concurrent Writes" 챕터에서 살펴본 내용처럼 concurrent writes 가 충돌된 값의 버전들을 생성하고 (App 또는 별도의 자료구조활용), 이러한 충돌을 versions를 통해서 reslove , merge하는 방법이 가능하다.
- Atomic Operations는 영향을 받지 않는다. (특히 Commutative한 Actions이라면 !)
- 슬프게도.. 많은 replicated DB에서는 기본값으로 Last Write Wins 이다.
<정리>
- 개인적으로 매우 유익했던 챕터이다. 결국 두 개 이상의 동시쓰기가 발생한다면 해결방법은 아래와 같이 정리할 수 있다.
1) 해당 사이클을 통째로 묶는다.
2) 동시수행을 제한한다. ( Lock or Single Thread )
3) 일단 진행시켜! 에러나면 다시 시도
- 멀티 노드를 가지는 Database라면 여러 곳에서 동시다발적으로 데이터에 대한 복제 / 연산이 일어나기 때문에
1) Single Leader를 통해서 제어하던지(Hbase 같은)
2) 마지막에 Write한 값으로 저장
3) 별도의 Application이나 자료구조를 활용하여 충돌버전을 관리하고 resolve / merge
- 그래서 대부분의 분산병렬처리 오픈소스 진영에서 zookeeper를 사용하고 있는 듯 하다.
- 다음 글에서는 이 글에서 다루지 못한 Isolation Level ( Write Skew, Phantoms read )을 좀 더 자세히 살펴보고 분산환경의 Consistency 에 대해서 정리하도록 해야겠다.
- 신규 프로젝트에서는 기술스택을 Spring WebFlux 로 선정하였다. 그 이유는 다음과 같다.
a. 기본적으로 Spring, Java에 대한 이해도가 높다. 하지만 Legacy 코드는 없다.
b. 데이터에 대한 읽기 연산이 대부분이고, 특별한 보안처리나 트랜잭션 처리가 필요없다. (참조해야할만한 Dependecny 가 적다.)
c. 저장공간으로 Redis Cache를 활용한다. 즉, Reactive를 적극 활용할 수 있다.
d. 다수의 API 호출을 통해서 새로운 결과를 만들어 낸다.
즉, IO / Network의 병목구간을 최소화 한다면 자원활용을 극대화 할 수 있을 것으로 보인다.
<진행내용>
- 기존의 For loop 방식과 Async-non blocking 차이,그리고 Mono / Flux 를 살펴본다. (Spring WebFlux)
@ReactiveRedisCacheable
public Mono<String> rawApiCall(...) throws .Exception {
Mono<String> response = webClient
.get()
.uri(url)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception(...)))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception(... )))
.bodyToMono(String.class)
.timeout(Duration.ofMillis(apiTimeout))
.onErrorMap(ReadTimeoutException.class, e -> new Exception(...))
.onErrorMap(WriteTimeoutException.class, e -> new Exception(...))
.onErrorMap(TimeoutException.class, e -> new Exception(...));
return response;
}
webClient를 이용해서 타 API를 호출하는 부분이다. 응답값에는 다수의 건이 포함되어 있으나 해당 데이터를 보내는 쪽에서도 병렬처리를 진행하고 있기 때문에 Collection 이나 Array 형태로 처리하는 부분을 제외하고 그냥 Raw line 형태로 제공하고 있다.
Spring MVC기반에서는 이 값을 꺼내기 위해서 결국 block하고 값에 접근하는 로직이 필요하다. 굳이 코드로 구현하자면 아마도 이렇게 만들어 질 것이다.
List<ApiResponse> ret = new ArrayList<>();
for(String value : Collection ... ){
String contents = apiService.rawApiCall(value).block();
String[] lines = contents.split("\n");
for(String data : lines){
if(StringUtils.hasText(data)){
ApiResponse apiResponse = mapper.readValue(data, ApiResponse.class);
if(populationHourApiResponse .. ){
// biz logic
FinalResponse finalResponse = new FinalResponse();
// setter
...
..
ret.add(finalReponse);
}
}
}
}
이 코드에는 여러가지 문제점이 있는데
- block()을 수행하게 되면 비동기 넌블러킹 처리의 여러 장점이 사라진다.
- 오히려 더 적은 수의 쓰레드를 사용해야 하는 구조특성상 block이 생기면 더 병목이 발생하는 경우도 있다.
- return 에 얼만큼의 데이터가 담길지 모르게 된다.
- API Call 이후 biz logic의 수행시간이 길어질 수록 전체 응답시간은 더욱 길어진다.
해당 내용을 block없이 처리하도록 Flux를 최대한 활용하여 작성해보았다.
public Flux<FinalResponse> getDataByConditionLevel1{
List<Mono<String>> monoList = new ArrayList();
for(String value : Collections ...)){
monoList.add( apiService.rawApiCall(value) );
}
return
Flux.merge(monoList)
.flatMap(s -> Flux.fromIterable(Arrays.asList(s.split("\n"))))
.filter(s -> StringUtils.hasText(s))
.map(data -> {
try {
return mapper.readValue(data, PopulationApiResponse.class);
} catch (JsonProcessingException e) {
log.error(e.getLocalizedMessage());
}
return new ApiResponse();
})
.filter(aApiResponse -> ... biz logic)
.map(apiResponse ->
new FinalResponse(...)
);
}
주요하게 바뀐부분을 살펴보면 다음과 같다.
1. API응답의 결과를 block해서 기다리지 않고 Mono를 모아서 Flux 로 변환한다.
Mono는 0..1건의 데이터, Flux는 0..N건의 데이터를 처리하도록 되어있다.
즉 개별 Mono를 대기하여 처리하는 것이 아니라 하나의 Flux로 모아서 단일 Stream처럼 처리할 수 있다,.
2. 값이 아니라 행위를 넘겨준다.
Spring WebFlux에서는 기본적으로 Controller - Service - Dao 등의 Layer간 이동을 할때 Mono / Flux 를 넘겨준다.
즉, 어떠한 값을 보내는 것이 아니라 Mono / Flux로 구성된 Publisher를 전달해주면 subscribe를 통해서 실제 데이터가 발생될 때 우리가 정의한 Action을 수행하는 형태가 된다고 이해하면 될듯 하다. (Hot / Cold 방식의 차이가 있는데 일단 Skip하도록 한다.)
위의 로직은 각 개별 데이터 간의 연산이나 관계가 없기 때문에 비교적 쉽게 변경할 수 있었다.
하지만 해당 데이터를 다시 조합하거나 Grouping 하거나 하는 경우가 있다면 약간 더 복잡해질 수 있기 때문에 고민이 필요하며 각 비지니스 케이스에 적합한 단위와 연산으로 재설계를 해주는 것이 좋다. ( -> 필수다 !)
예를 들어서 rawApiCall에 필요한 인자값이 yyyyMMdd hh:mm:ss 형태의 timeStamp라면 특정기간 내 시간대별 결과를 얻기 위해서는 다음과 같이 Call을 하고 조합해야 한다.
즉 수행해야 하는 액션은 다음과 같다.
- Flux를 응답받는 메소드를 다시 감싸서
- 응답결과를 적절하게 Biz Logic에 따라서 처리한 뒤
- aggreation 을 통하여 새로운 응답을 만들어 낸다. (e.g 그룹별 개수, 합계, 평균 등등)
- Client / Server 간의 Connection을 맺기 위해 소요되는 시간을 의미한다.
- Server에서 새로 Connection을 맺을 자원의 여유가 없다면 발생할 수 있다.
- HTTP Connection에 대한 내용이기 때문에 keep-alive 옵션역시 사용가능하다.
B. Read Timeout
- 데이터를 읽기 위해서 기다리는 시간을 의미한다.
- 내가 호출한 API 가 응답을 주지 못하면 발생할 수 있다.
- 너무 길다면? 적절히 설정해주지 않으면 응답을 받을때까지 계속 대기하게 되고, 자원이 고갈되는 현상이 발생한다.
- 너무 짧다면? 요청을 받은쪽에서는 처리가 되었으나, 응답을 기다리던 쪽에서는 Timeout이 발생하게 되어서 불일치 상태가 발생한다.
C. Write Timeout
- 데이터를 쓰기 위해서 기다리는 시간을 의미한다.
- 주어진 시간동안 Write Operation을 완료하지 못하면 발생할 수 있다.
- 즉, Connection 연결 후 데이터를 보내기 시작하였으나 해당시간보다 길어지게 되면 중단된다.
D. reactor timeout
- Reactive Stream은 Publisher, Subscriber, Subscription 을 통해서 비동기 / 넌블러킹 / back pressure 처리하는 개념이다.
- 우리가 다루는 Spring WebFlux는 reactive stream의 구현체로 reactor를 사용하고 있으며 Mono / Flux가 Publisher의 구현체이다.
- 따라서 Exception , Retry등을 처리할때도 기존 방식 대신 reactive stream의 기능을 활용해주는 것이 장점을 충분히 살릴 수 있는 방법이라고 생각한다.
- Spring WebFlux에서는 WebClient의 호출결과를 받았을때 결과 Body를 Mono로 감싸주어 데이터를 전달하는 형태가 되는데, 해당 시간동안 데이터를 전달하지 못하게 되면 timeout 이 발생하게 된다.
E. Transaction Timeout과 비교 (개인적인 생각 , 틀릴 수 있음)
- 우리가 일반적으로 DB transaction timeout을 설명할 때 Transaction Timeout > Statement Timeout > Socket Timeout 로 각 구간을 나누어서 설명하고 상위 Layer( ? 포장레이어?) 에서는 하위 Layer보다 크거나 같게 설정하는 것이 일반적이다.
- Web 호출 역시 비슷하게 살펴본다면 Publisher Timeout > Read/Write Timeout > Connection Timeout 정도로 비슷하게 정리해 볼 수 있지 않을까 생각했다.
<정리>
- MSA구조에서 각 API의 응답시간과 사이즈는 적절하게 설정해야 한다.
- 특히, 각 레벨에서의 적절한 수준의 timeout 설정은 필수이다.
- 너무 짧으면 많은 오류가 발생하게 되고, 이에 따른 side-effect (데이터 불일치, 로직의 복잡도 증가) 가 생기게 되며