Parallelism 속성은 Apache Flink 애플리케이션의 기본 병렬 처리 수준(Parallelism)을 설정합니다.
요소
설명
제어 방식 및 기본값
Parallelism
애플리케이션 전체의 기본 병렬 실행 수준을 정의합니다. 별도로 재정의하지 않는 한, 모든 연산자(Operator), 소스(Source), 싱크(Sink)는 이 병렬 처리 수준으로 실행됩니다.
기본값은 1이며, 기본 최대 한도는 256입니다.
MaxParallelism
상태 저장 애플리케이션이 상태 손실 없이 확장할 수 있는 최대 병렬 처리 수준을 정의합니다.
상태가 처음 생성될 때 설정되며, 일반적으로 Parallelism≤128일 때 기본값은 128입니다.
• Parallelism 값은 전체 작업(Task)의 수를 결정합니다. 이는 Task Slot의 총 수와 동일하게 간주됩니다.
III. ParallelismPerKPU: KPU 당 슬롯 밀도 (자원 활용도)
ParallelismPerKPU는 단일 KPU 내에 할당될 수 있는 Task Slot의 수를 정의하는 속성입니다.
요소
설명
제어 방식 및 영향
ParallelismPerKPU
단일 KPU에 할당되는 병렬 태스크(Task Slot)의 수.
기본값은 1이며, 최대값은 8입니다.
용도
I/O Bound 작업에 유용합니다. 이 값을 높게 설정하면 동일한 KPU 내에서 더 많은 병렬성을 제공하여 KPU 리소스를 완전히 활용할 수 있습니다.
예를 들어, I/O Bound 애플리케이션에서 ParallelismPerKPU를 1에서 4로 증가시키면, 동일한 KPU 리소스로 4배의 병렬성까지 확장 가능합니다.
- I/O Bound 작업이 아닐경우 ParallelismPerKPU를 높이게 되면 할당받는 CPU , Memory 가 부족해서 오류가 발생할 수 있으니 작업을 모니터링하면서 적정한 수치로 조정해야 합니다.
IV. Apache Flink 의 Task Manager 및 Task Slot
Apache Flink는 애플리케이션 실행을 Job Manager가 관리하며, 이는 다시 Task Manager가 관리하는 태스크(tasks)로 분리됩니다.
요소
설명
관계
Task Manager
애플리케이션의 실제 연산을 실행하는 주체이며, 각 Task Manager는 KPU 리소스 내에서 실행됩니다. Task Manager의 성능은 CPU Utilization 및 Heap Memory Utilization과 같은 CloudWatch 지표를 통해 모니터링할 수 있습니다.
Task Manager의 수는 할당된 총 KPU 수에 따라 결정됩니다 (AWS 서비스가 자동 프로비저닝).
즉, 단일 Task Manager 인스턴스에 할당된 슬롯 수가 ParallelismPerKPU 값보다 많게 보입니다.
정리
구분
순수 Apache Flink 환경
Amazon Managed Service for Apache Flink (MSF) 환경
기본 리소스 단위
Task Manager (작업 실행 및 슬롯 호스팅)
KPU (Kinesis Processing Unit) (과금 및 용량 프로비저닝 단위)
병렬도 정의 기준
Task Manager 내의 Task Slot 수
KPU당 할당된 Task Slot 수 (ParallelismPerKPU)
실제 실행 컨테이너
Task Manager 프로세스
Task Manager (AWS가 여러 KPU 리소스를 통합하여 실행 가능)
메모리 확장
Task Manager 메모리 설정(taskmanager.memory.managed.fraction) 조정
총 KPU 수 증가에 따라 Managed Memory 총량 증가
사용자는 Parallelism과 ParallelismPerKPU 두 가지 주요 속성을 설정하여, Managed Service for Apache Flink가 프로비저닝할 최종 리소스 크기와 내부 실행 구조를 간접적으로 제어합니다. 일반적인 Flink가 TaskManager 단위로 관리하는 부분과 약간의 개념차이가 있음을 이해하면 됩니다.
1. 총 할당된 KPU 계산 (리소스 및 메모리 결정)
Amazon MSF 애플리케이션에 할당되는 총 KPU 수는 다음 공식에 의해 결정됩니다.
{KPU} = {Parallelism} / {ParallelismPerKPU}
Managed Memory: 총 Managed Memory 용량은 이 할당된 KPU 수에 정비례하여 증가합니다
(1 KPU당 약 1 GB의 네이티브 메모리 제공).
2. 총 Task Slot 수 (실행 능력)
총 Task Slot 수=할당된 KPU×ParallelismPerKPU=Parallelism
• Task Slot의 역할: 각 Task Slot은 연산자(Operator)의 병렬 인스턴스(Subtask)를 실행하는 데 사용될 수 있습니다.
• Operator Parallelism: 각 연산자별 병렬 처리 수준(Operator Parallelism)은 기본적으로 애플리케이션의 Parallelism 값과 동일합니다. 이는 필요하다면 각 연산자가 사용 가능한 모든 Subtask(Task Slot)를 사용할 수 있음을 의미합니다.
3. Task Manager의 개수/메모리 제어 정리
항목
사용자가 직접 제어하는가?
제어 방법 및 결과
KPU 개수
아니요. Parallelism과 ParallelismPerKPU 설정을 통해 간접적으로 계산되어 할당됩니다.
KPU 수를 늘리려면 Parallelism을 늘리거나 ParallelismPerKPU를 낮춰야 합니다.
Task Manager 개수
아니요. KPU 할당에 따라 AWS 서비스에 의해 자동 관리됩니다.
Task Manager는 할당된 KPU 리소스 내에서 애플리케이션의 태스크를 실행합니다.
총 Managed Memory
아니요. 할당된 총 KPU 수에 비례하여 크기가 결정됩니다.
Managed Memory 용량을 늘리려면 KPU 수를 늘려야 합니다.
Task Slot 개수
예. ParallelismPerKPU를 통해 Task Manager당 슬롯 수를 제어하고, Parallelism을 통해 총 슬롯 수를 제어합니다.
Parallelism을 늘리는 것은 총 Task Slot 수를 늘리는 가장 직접적인 방법입니다.
실제 예시
- Parallelism 20, ParallelismPerKPU 1 = 총 20 KPU
1. 리소스 할당 (KPU 기준)
Amazon MSF에서 1 KPU는 1 vCPU와 4 GB의 메모리를 제공합니다. 이 4 GB 메모리 중 1 GB는 RocksDB State Backend와 같은 네이티브 프로세스를 위한 네이티브 메모리(Managed Memory의 기반)로 예약됩니다.
항목
20 KPU가 제공하는 총 예상 리소스
생성 결과 (3 TM 합산)
총 vCPU
20×1=20 vCPU
3×7=21 CPU
총 Physical Memory
20×4 GB=80 GB
3×26.7 GB=80.1 GB
총 Managed Memory
20×1 GB=20 GB (네이티브 메모리 할당 기준)
3×6.34 GB=19.02 GB
2. Task Manager 개수 및 Slot 할당 검증
A. 총 Task Slot 수
애플리케이션의 총 병렬 처리 수준(Total Task Slots)은 설정된 Parallelism 값과 같습니다.
• 설정: 20 KPU, ParallelismPerKPU = 1.
• 계산된 Parallelism (총 슬롯):20×1=20.
• 생성 결과: 3개 Task Manager에 7+7+6=20 개의 활성 슬롯
B. Task Manager당 슬롯 개수와 ParallelismPerKPU의 차이
생성 결과 (3개의 Task Manager가 각각 7개의 슬롯을 가짐)가 다른 이유는
1. KPU는 리소스 단위, Task Manager는 실행 컨테이너
AWS Managed Flink는 KPU를 리소스를 할당하는 기본 단위로 사용하지만, 실제 Task Manager 프로세스(컨테이너)는 여러 KPU의 리소스를 하나로 묶어(Grouping) 실행
2. 그룹화:
◦ 총 리소스: 20 KPU
◦ 실행 Task Manager: 3개
◦ Task Manager당 할당된 평균 KPU: 20/3≈6.67 KPU/TM.
3. 슬롯 계산
◦ ParallelismPerKPU = 1 이므로, Task Manager당 예상되는 슬롯 수는 Task Manager가 포함하는 KPU 수와 같아야 합니다.
◦ Task Manager당 슬롯 수 7은 7 KPU에 해당하며, 3개의 TM이 7,7,6 KPU (총 20 KPU)로 나뉘어 할당되었다고 해석할 수 있습니다.
◦ 즉, 하나의 Task Manager 인스턴스가 7개의 KPU 리소스를 관리하며, 이 때문에 7개의 슬롯을 가지게 된 것
20 KPU 리소스를 3개의 Task Manager 인스턴스에 통합하여 실행하는 AWS Managed Flink의 내부적인 리소스 그룹화 정책
오늘은 AWS MSF(Managed Service for Apache Flink) 애플리케이션에서 발생한 오류(예: RemoteTransportException: Connection unexpectedly closed by remote task manager)의 원인과 해결 방안을 분석합니다.
애플리케이션 로그를 보면 10월 16일 오후 10시 21분 24초에 remote task manager error가 발생했고, 이후 partial recovery 후 다시 TaskManager 연결 오류가 반복되는 모습을 확인할 수 있었습니다.
이러한 로그는 TaskManager 간 통신이 불안정함을 나타내며, 주된 원인으로 메모리/리소스 부족, GC 지연, Heartbeat 타임아웃 등이 의심됩니다.
로그
closed by task manager
원인 분석
메모리 부족 및 리소스 고갈: AWS MSF 문서에 따르면, TaskManager가 할당된 메모리를 초과하거나 CPU/메모리 과부하에 걸리면 TaskManager 프로세스가 비정상 종료되고, 이로 인해 RemoteTransportException이 발생할 수 있습니다 .
예를 들어, KPU(Kinesis Processing Unit)당 메모리(4GiB, 이 중 3GiB는 힙 메모리)가 부족하면 OOM(OutOfMemory)으로 TaskManager가 죽을 수 있습니다 . 이 경우 Flink가 TaskManager 연결이 끊어졌다고 보고 오류를 출력합니다.
현재 처리하고 있는 내부 Key 값 및 State의 용량이 크기 때문에 heapMemory사용량이 90%를 유지하고 있어서 원인 중 하나일 것으로 예상합니다.
GC(Garbage Collection) 지연: 장시간 Full GC가 발생하면 TaskManager가 일정 시간 응답하지 못해 JobManager에 Heartbeat를 제때 보내지 못합니다. Flink 커뮤니티의 해결책에 따르면, 이러한 경우 TaskManager가 ‘unresponsive’ 상태로 간주되어 슬롯이 해제되고, 결국 연결이 끊어집니다.
이를 방지하려면 G1GC와 같은 빠른 GC를 사용하거나 TaskManager 메모리 설정을 조정해 GC 시간을 단축해야 합니다 . 또한, 필요시 akka.watch.heartbeat.pause와 같이 Heartbeat 시간을 늘려 JobManager의 타임아웃을 늘리는 방안도 고려할 수 있습니다.
AWS MSF는 이미 G1GC 적용이 되어 있었습니다.
Data Skew (데이터 불균형)
특정 TaskManager 또는 Subtask에 비정상적으로 많은 데이터가 집중되어 처리 부하가 쏠릴 수 있습니다.
이 현상은 다음과 같은 이유로 RemoteTransportException과 같은 네트워크 오류로 이어질 수 있습니다.
특정 TaskManager만 메모리 과부하
동일한 데이터 분포를 가정하고 병렬 처리를 수행하지만, 실제로는 일부 키(key)나 파티션에 데이터가 몰리면 해당 TaskManager만 힙 메모리가 1GB 이상까지 치솟습니다.
백프레셔(Backpressure) 전파:
특정 Subtask가 느려지면 상류(upstream)의 데이터 송신 속도가 제한되고, 네트워크 버퍼가 포화되어 Task 간 통신 오류나 Partial Recovery가 반복됩니다.
네트워크 셔플 불균형:
Flink는 keyBy, rebalance, broadcast 연산 시 네트워크를 통해 데이터를 재분배합니다. 이때 데이터 스큐가 있으면 특정 TaskManager의 네트워크 I/O가 폭증하며, 네트워크 버퍼 부족과 연결 끊김이 함께 발생할 수 있습니다.
내부적으로 각 이벤트를 시나리오와 매핑하는 처리를 하고 있는데 특정 시나리오의 이벤트가 많이 발생하고 있어서 역시 의심이 되는 부분입니다.
시나리오 매핑이후는 개별Key를 통해서 keyBy연산으로 하고 있기 때문에 체이닝이 적절하게 동작하고 있습니다.
네트워크 버퍼 부족: 병렬처리가 높아지거나 연산자 체이닝이 해제되면 셔플(shuffle)이 증가해 네트워크 버퍼 소모가 커질 수 있습니다. AWS 문서에는 네트워크 버퍼가 부족해지면 Insufficient number of network buffers 오류가 발생할 수 있으며, 이를 해결하기 위해 parallelismPerKPU를 낮춰 TaskManager당 메모리(및 버퍼)를 늘리거나 연산자 체이닝을 활용할 것을 권장하고 있습니다
참고로 한 플로우에서 필요한 버퍼 개수는 (slots_per_TM)^2 * number_of_TMs * 4 공식으로 대략 산출할 수 있습니다.
즉, TaskManager의 슬롯 수나 셔플 횟수가 많을수록 네트워크 버퍼가 대폭 늘어납니다.
Heartbeat 타임아웃: Flink의 Heartbeat 설정이 너무 짧거나 네트워크 지연이 있으면 TaskManager와 JobManager 간 연결이 끊길 수 있습니다.
이 경우 AWS MSF에서는 heartbeat.timeout 설정을 지원 티켓으로 늘릴 수 있으며 , 위에서 언급한 GC 지연 대응책(Heartbeat 지연 허용)도 도움이 됩니다.
기타 원인: TaskManager 노드의 일시적 네트워크 장애, 컨테이너 리소스 제한(예: CPU 쿼터 초과), 코드 레벨 메모리 누수 등도 원인이 될 수 있습니다.
TaskManager 로그에서 OOM 오류나 비정상 종료 메시지를 확인하고, 메모리 누수 가능성을 코드 리뷰로 점검해야 합니다.
해결 방안 및 튜닝 가이드
KPU 및 병렬성 조정: KPU 리소스를 늘리거나 TaskManager당 할당되는 작업 수(ParallelismPerKPU)를 줄여 각 작업에 더 많은 메모리를 할당합니다.
AWS 문서에 따르면 하나의 KPU는 1 vCPU와 4GiB 메모리(힙 3GiB, 네이티브 1GiB), 50GiB 스토리지를 제공합니다 .
예를 들어, ParallelismPerKPU=4(기본)인 경우 KPU당 4개의 슬롯에 약 1GiB씩 메모리가 분할되지만, ParallelismPerKPU=2로 낮추면 슬롯당 약 2GiB를 확보할 수 있습니다.
전체 병렬도(Parallelism)가 고정되어 있다면 ParallelismPerKPU를 낮추면 필요한 KPU수가 증가하여 리소스가 더 할당됩니다 .
AWS도 PPK를 낮춰 메모리당 할당량을 높이는 방안을 제시하고 있으며, 이를 통해 메모리 부족으로 인한 OOM 재시작을 줄일 수 있습니다. (단, PPK를 낮추면 비용이 늘어나므로 병렬도를 적절히 조정하거나 오토스케일링을 활용해야 합니다.)
Data Skew 완화
Key 균등화 전략 적용 (keyBy 연산 시 key 값의 불균형이 발생하는 경우)
해시 기반 샘플링 후 파티션 재조정
“composite key” (예: userId_mod_10) 기반 가상 key 확장
rebalance() → map() 단계 삽입을 통해 균등 분배 가능
Aggregation 단계 분리
데이터가 한 TaskManager로 몰리는 경우, 1차 로컬 집계(Local aggregation) → 2차 글로벌 집계(Global aggregation) 구조로 바꾸면, 네트워크 트래픽 및 skew 부담을 크게 줄일 수 있음
Metrics 기반 모니터링 : 특정 TaskManager만 유독 높아서 key 분포 불균형 확인함
numRecordsIn, busyTimeMsPerSecond, heapMemoryUsed 메트릭을 TaskManager 단위로 비교하여 Skew 구간을 확인
Operator 체이닝 활용 및 네트워크 최적화: 가능한 Operator들을 체이닝하여 별도 서브태스크로 분리될 필요가 없도록 합니다.
MSF 문서에 따르면 체이닝은 동일 서브태스크 내에서 연산자들을 묶어 실행함으로써 네트워크 호출을 줄이고 리소스를 절약합니다.
반대로 체이닝을 해제하거나 인위적으로 새 체인을 시작하면 불필요한 셔플이 증가합니다. 실제 사례에서도 체이닝 해제가 네트워크 부담을 크게 늘려 오류 원인이 되므로, 가능하면 체이닝을 활성화할 것을 권장합니다.
또한, 앞서 언급한 네트워크 버퍼 부족을 피하기 위해 플로우를 단순화하고 셔플 단계를 최소화하는 것도 중요합니다.
I/O 바운드 워크로드 고려: 외부 서비스와의 블로킹 I/O가 많은 경우, 오히려 ParallelismPerKPU를 높여 KPU 리소스를 최대한 활용하는 것이 효율적일 수 있습니다 .
즉, 네트워크 지연이 주요 병목이라면 병렬 슬롯을 늘려 I/O 대기 동안 더 많은 작업을 병렬 처리하도록 조정합니다.
하지만 현재는 TaskManager 재시작 문제가 우려되므로, 우선은 안정적인 메모리 확보에 중점을 두고 PPK를 낮추었습니다.
모니터링 강화 및 추가 점검: CloudWatch의 주요 메트릭을 면밀히 모니터링합니다.
cpuUtilization, heapMemoryUtilization 등은 전체 리소스 사용률을 보여주며 문제 지표가 될 수 있습니다.
또한 oldGenerationGCTime과 같은 GC 메트릭을 살펴보면 GC 지연 여부를 판단할 수 있습니다. 장애가 발생할 때는 CloudWatch Downtime, FullRestarts 값도 확인합니다.
TaskManager JVM 로그에서 GC 타임, 힙 사용량, OOM 에러를 찾아보고, 코드에 메모리 누수 가능성은 없는지 검토해야 합니다.
TaskManager 로그를 보고 싶으면 Flink구성에서 로그항목 [Application]으로 설정해야 합니다.
Heartbeat 타임아웃이 잦다면 heartbeat.timeout 설정을 AWS 지원 티켓으로 늘려보는 것도 고려할 수 있습니다 .
추가 적인 점검 사항
일시적인 AWS 네트워크 장애 가능성도 염두에 두고, 다른 애플리케이션이나 Kinesis 스트림의 상태(예: 지연, 스로틀링)도 확인해 봅니다.
코드 레벨에서는 대용량 데이터 처리 시 메모리 소비가 급증하지 않는지, Checkpointing 지연으로 백프레셔가 걸리지는 않는지도 함께 살펴봅니다.
결론
이번 오류는 단순한 네트워크 불안정이 아니라,
TaskManager 메모리 부족
Data Skew로 인한 특정 TaskManager 과부하
GC 지연 및 Heartbeat 누락
복합적으로 작용한 결과입니다.
현재 발생한 오류를 해결하기 위해서는 메모리 확보, DataSkew 해소 및 네트워크 통신 부하 완화가 주요 내용입니다.
다음과 같은 액션을 수행했습니다.
ParallelismPerKPU 재조정
TaskManager 슬롯 수를 줄여 각 슬롯이 더 많은 메모리를 갖도록 합니다.
PPK를 4에서 2로 낮추어 슬롯당 메모리 할당량을 늘림 (1G -> 2G)
전체 병렬도 검토
PPK를 낮춘 뒤에 처리량이 부족하다면, 전체 병렬도(Parallelism) 자체를 늘려 필요한 KPU 수를 확장
Data Skew 해소
DataStream에서 .rebalance() 수행 후 이후 map() 하도록 파이프라인 수정
비용 대비 효율 검토
리소스 조정으로 비용이 상승할 수 있으므로, 변경 전후의KPU 사용량과 애플리케이션 처리율을 비교하여 최적점을 찾습니다
모니터링 강화
heapMemoryUtilization, oldGenerationGCTime, cpuUtilization, fullRestarts 등 주요 메트릭을 CloudWatch에서 계속 감시하고, 설정 변경 후 리소스 압박이 해소되었는지 검증이 필요 !!
=> 이부분은 사실 원격요청을 하지 않고 제공되는 공개키, pem을 통해서 로컬에서 검증이 가능하지만 현재 java로는 직접 구현해야 하는 부분이 많고, 자주는 아니지만 공개키는 변경될 수 있기 때문에 키값을 확인하기 위해서는 결국 원격요청이 필요해서 쉽게 가기로 했다.
-- "cognito:groups" 정보를 이용해서 권한을 세팅한다.
B. 기존 토큰이라면
- secretKey를 이용해서 검증하고
- "roles" 정보를 이용해서 권한을 세팅한다.
<정리>
이렇게 만들면 기존의 JWT Token(HS256) 와 신규 Cognito Token (RSA256) 를 모두 소화할 수 있다.
그러나 만들어 놓고보니 소스가 지저분하다. ResourceServer에서 권한을 확인하여 API접근제어를 해야하는데 좋은 구조는 아니다.
- 로직이 복잡하고(구조 특성상 해당모듈을 그대로 copy해가는 방식이 될텐데 유지보수성이...영), secretKey 공유의 문제도 있다.
- 꼭 두 가지 토큰을 소화할 필요가 있을까?
- 기존 방식은 외부 사용자가 고려되지 않은 방식이고, OAuth 2.0은 출발자체가 3rd Party연계를 간편하게 하기 위한 방법이다.
다양한 확장을 위해서 인증은 각각의 방식을 유지하고 서비스를 사용자에 따라서 분리하는 것이 해결책이라는 판단을 내렸다.
- 내부 사용자가 이용하며 Client, Resource Server가 같은 Boundary에 있는 레거시의 경우 기존 토큰을 사용하고
- 외부 사용자가 이용해야 하는 API 서비스는 별도로 분리 구성하며, 이 서비스에서는 OAuth 2.0만을 활용하여 인가/인증을 하도록 한다.
(이부분은 AWS API Gateway내에서 자격증명으로 Cognito를 연계하는 방식과 시너지를 낼 수 있을 것 같다.)
- 이를 위해서 보다 쉽게 Resource Server를 세팅하는 법을 Spring Security에서 제공하고 있는데 다음글에서 좀 더 자세히 적용해보고 서비스를 어떻게 분리하는 것이 보다 효과적일지 살펴본다.
- 대부분의 Public Cloud는 Endpoint와 HTTP API를 제공하고 있으며, SDK는 이것을 감싸는 구조로 되어있다.
- 따라서 url 인코딩을 주의하여 사용해야 한다.
- 예를 들어서 얼마전에 테스트하다가 파일명에 특이한 문자들을 넣게 되었는데 아래와 같은 오류가 발생하였다.
2022-05-19 10:15:15.447 DEBUG 30235 --- [nio-8900-exec-6] com.amazonaws.request : Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: The request signature we calculated does not match the signature you provided. Check your key and signing method. (Service: Amazon S3; Status Code: 403; Error Code: SignatureDoesNotMatch; Request ID: S80BNXP50DTW9SJM; S3 Extended Request ID: , S3 Extended Request ID:
2022-05-19 10:15:15.449 DEBUG 30235 --- [nio-8900-exec-6] com.amazonaws.retry.ClockSkewAdjuster : Reported server date (from 'Date' header): Thu, 19 May 2022 01:15:15 GMT
.
.
.
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) [aws-java-sdk-s3-1.11.762.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) [aws-java-sdk-s3-1.11.762.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:394) [aws-java-sdk-s3-1.11.762.jar:na]
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:5942) [aws-java-sdk-s3-1.11.762.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1808) [aws-java-sdk-s3-1.11.762.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1768) [aws-java-sdk-s3-1.11.762.jar:na]
- 인증토큰이나 이러한 부분에 문제인줄 알았는데 유사한 문제를 겪은 사람들이 상당히 많았고
<개요> - 다음과 같이 Service #A 에서 Service #B로 데이터 조회 API를 요청하고 값을 받아오는 로직이 있다. - Service #B에서는 AWS Athena를 저장소로 사용하고 있으며 Athena JDBC42 드라이버를 사용 중 이다.
API 호출 후 응답
<현상> - Service #B에서 JdbcTemplate을 통하여 쿼리가 수행된 시간은 11:13:13 이고, 2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.core.JdbcTemplate : Executing SQL query 2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource - 실제 쿼리 수행결과를 받아온 시간은 11:15:57 로 약 2분44초 가 소요되었다. 2021-11-04 11:15:57.998 INFO 9668 --- [http-nio-8200-exec-9] ...
- Athena 의 경우 동시에 다수의 쿼리가 수행되면 Queue에 의하여 순차적으로 수행될 수 있기 때문에 쿼리 히스토리를 조회하였다.
11:13:13.542초 시작, 수행시간 0.555초대기열시간 1분21초
- 대기열 시간 1분21초 + 수행시간 0.555초를 제외하고 꽤 오랜시간이 소요되었다.
<소스분석> - AthenaJDBC42의경우 일반적인 JDBC드라이버처럼 커넥션을 맺고 Resultset을 처리하는 형태가 아니라 AWS Athena로 Http를 통해서 수행요청을 하고, 리턴값으로 ID를 받아온 뒤 일정시간 Thread Sleep하면서 조회 polling을 요청하고 Status가 Completed가 되었을때 후속처리를 하는 형태로 구성되어 있다.
- 또한 위에도 언급한것처럼 동시에 다수의 요청이 집중될경우 자체적으로 큐에 보관하여 처리하게 된다.
- 부수적으로 Athena JDBC드라이버의 SStatement내 execute, getResultSet등의 메소드를 살펴보면 대부분 synchronized로 선언이 되어있기 때문에 이에 따른 delay도 있지 않을까 예상한다.
<Thread Dump>
10개의 Thread가 같은 위치에서 대기중이다.
"http-nio-8200-exec-9" #44 daemon prio=5 os_prio=31 tid=0x00007ffcc655f800 nid=0x8c03 waiting on condition [0x000070000c638000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.simba.athena.athena.api.AJClient.executeQuery(Unknown Source) at com.simba.athena.athena.dataengine.AJQueryExecutor.execute(Unknown Source) at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source) at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source) at com.simba.athena.jdbc.common.SStatement.executeQuery(Unknown Source) - locked <0x000000078740ccf8> (a com.simba.athena.athena.jdbc42.AJ42Statement) at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111) at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java) at org.springframework.jdbc.core.JdbcTemplate$1QueryStatementCallback.doInStatement(JdbcTemplate.java:439) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:376) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:473) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:480)
<정리> - 다수의 사용자에게서 발생하는 ad-hoc형태 처리는 적합하지 않다.(hive와 동일함)
- Global cache(Redis)를 적절히 활용하여 Service #B Layer에서 처리를 하도록 하면 효율성을 증가시킬수 있다.(일반적인 캐시전략)