개요

- AWS Managed Flink에서는 사용자가 정의할 수 있는 설정이 제한적입니다.

- 사용자가 Parallelism 과 ParallelismPerKPU를 입력하면 이에 맞춰서 필요한 KPU가 계산되고

- 1 KPU 당 1 vCPU, 4GB메모리를 기준으로 AWS 에서 적당하게 컨테이너를 기동합니다.

 

내용

-  Flink는 TaskManager 와 slot을 기반으로 병렬처리를 제어하는데

- AWS Managed Flink는 KPU를 중심으로 제어하게 됩니다. (과금단위 이기 때문에)

- 이를 좀 더 자세히 살펴보고 원하는 세팅을 만들기 위해서 수치를 어떻게 제어해야 하는지 이해하는 것이 목적입니다.

 

Amazon MSF 리소스 구성 요소 간의 관계

I. KPU (Kinesis Processing Unit): 물리적 리소스 단위

KPU는 Amazon Managed Service for Apache Flink가 애플리케이션 호스팅 환경을 위해 프로비저닝하는 기본 컴퓨팅 리소스 단위입니다.
요소
사양
세부 정보 (출처)
CPU
1 vCPU
컴퓨팅 코어 1개.
메모리
4 GB
총 4 GB의 메모리를 제공합니다.
- 힙 메모리
3 GB/KPU
JVM 힙 메모리 할당에 사용됩니다.
- 네이티브 메모리
1 GB/KPU
네이티브 코드 할당을 위해 예약되며, Managed Memory (RocksDB State Backend와 같은 네이티브 프로세스에 사용)의 기반이 됩니다.
디스크 공간
50 GB/KPU
실행 중인 애플리케이션 상태 저장을 위한 디스크 공간이 제공됩니다.
오케스트레이션
추가 1 KPU
애플리케이션 당 오케스트레이션 목적으로 추가 KPU 1개가 부과됩니다.
Managed Memory의 총 용량은 할당된 총 KPU 수에 비례하여 증가합니다. 1 KPU가 1 GB의 네이티브 메모리를 제공하므로, KPU가 늘어날수록 애플리케이션 전체의 Managed Memory 풀이 커집니다.
- 예) 16 KPU에서 20 KPU로 증가했을 때 Managed Memory 용량 5G -> 6G 
 

II. Parallelism (병렬 처리 수준): 논리적 작업 수

Parallelism 속성은 Apache Flink 애플리케이션의 기본 병렬 처리 수준(Parallelism)을 설정합니다.
요소
설명
제어 방식 및 기본값
Parallelism
애플리케이션 전체의 기본 병렬 실행 수준을 정의합니다.
별도로 재정의하지 않는 한, 모든 연산자(Operator), 소스(Source), 싱크(Sink)는 이 병렬 처리 수준으로 실행됩니다
.
기본값은 1이며, 기본 최대 한도는 256입니다.
MaxParallelism
상태 저장 애플리케이션이 상태 손실 없이 확장할 수 있는 최대 병렬 처리 수준을 정의합니다.
상태가 처음 생성될 때 설정되며, 일반적으로 Parallelism 일 때 기본값은 128입니다
Parallelism 값은 전체 작업(Task)의 수를 결정합니다. 이는 Task Slot의 총 수와 동일하게 간주됩니다.
 

III. ParallelismPerKPU: KPU 당 슬롯 밀도 (자원 활용도)

ParallelismPerKPU단일 KPU 내에 할당될 수 있는 Task Slot의 수를 정의하는 속성입니다.
요소
설명
제어 방식 및 영향
ParallelismPerKPU
단일 KPU에 할당되는 병렬 태스크(Task Slot)의 수.
기본값은 1이며, 최대값은 8입니다.
용도
I/O Bound 작업에 유용합니다.
이 값을 높게 설정하면 동일한 KPU 내에서 더 많은 병렬성을 제공하여 KPU 리소스를 완전히 활용할 수 있습니다
.
예를 들어, I/O Bound 애플리케이션에서 ParallelismPerKPU를 1에서 4로 증가시키면, 동일한 KPU 리소스로 4배의 병렬성까지 확장 가능합니다.
- I/O Bound 작업이 아닐경우 ParallelismPerKPU를 높이게 되면 할당받는 CPU , Memory 가 부족해서 오류가 발생할 수 있으니 작업을 모니터링하면서 적정한 수치로 조정해야 합니다.
 
 

IV. Apache Flink 의 Task Manager 및 Task Slot

Apache Flink는 애플리케이션 실행을 Job Manager가 관리하며, 이는 다시 Task Manager가 관리하는 태스크(tasks)로 분리됩니다.
요소
설명
관계
Task Manager
애플리케이션의 실제 연산을 실행하는 주체이며, 각 Task Manager는 KPU 리소스 내에서 실행됩니다.
Task Manager의 성능은
CPU UtilizationHeap Memory Utilization과 같은 CloudWatch 지표를 통해 모니터링할 수 있습니다.
Task Manager의 수는 할당된 총 KPU 수에 따라 결정됩니다
(AWS 서비스가 자동 프로비저닝).
Task Slot
Apache Flink가 리소스를 할당하는 단위입니다.
Task Slot은
subtask (실행 스레드와 유사한 개념)를 수용합니다.
Task Manager 당 할당되는 슬롯의 수로 병렬도 제어.
 

- 이부분에서 주의해야 할점이 있습니다 !!!

Amazon MSF 환경의 리소스 할당 모델은 Flink 자체적인 개념과 약간 다릅니다.
 

1. ParallelismPerKPU와 Task Slot의 정의

Amazon MSF 환경에서 리소스 할당의 핵심 규칙은 다음과 같습니다:
1. Task Slot (태스크 슬롯): Apache Flink 환경은 Task Slot이라는 단위로 애플리케이션에 리소스를 할당합니다.
2. KPU와 슬롯 할당: Managed Service for Apache Flink는 애플리케이션에 리소스를 할당할 때, 하나 이상의 Apache Flink Task Slot을 단일 KPU에 할당합니다.
3. ParallelismPerKPU의 역할: 단일 KPU에 할당되는 슬롯의 수는 애플리케이션의 ParallelismPerKPU 설정값과 동일합니다
 
 

2. Task Manager와 KPU의 관계

Apache Flink의 Job Manager는 애플리케이션 실행을 태스크(tasks)로 분리하며, 각 태스크는 Task Manager에 의해 관리됩니다. MSF 환경에서 Task Manager는 KPU가 제공하는 컴퓨팅 리소스(1 vCPU, 4 GB 메모리) 내에서 실행됩니다.
 
일반적으로, Task Manager는 컴퓨팅 자원(KPU)의 경계를 따라 실행되며, 각 Task Manager는 해당 KPU에 할당된 Task Slot을 사용합니다.
따라서, 만약 시스템이 개의 KPU를 할당하고 각 KPU가 독립적인 Task Manager 인스턴스를 실행한다면 , "Task Manager 당 슬롯 수 = ParallelismPerKPU"라는 관계가 성립합니다.
 

3. ParallelismPerKPU를 1로 설정했는데도 Task Manager에 슬롯이 하나 이상 할당된 것처럼 보이거나, 혹은 여러 개의 Task Manager가 확인된 이유는 애플리케이션의 총 병렬 처리 수준(Parallelism)과 KPU 개수 때문입니다.

 
예시: ParallelismPerKPU가 1이고, 할당된 KPU가 20
    ◦ 할당된 KPU: 20개.
    ◦ Task Manager 수: 20개의 Task Manager가 실행된다면
    ◦ Task Slot 할당: 각 KPU(Task Manager)는 ParallelismPerKPU 설정에 따라 1개의 Task Slot을 가집니다.
    ◦ 총 Task Slot: 개.
 
 
하지만 일반적으로 20개보다 적은 수의 Task Manager 가 실행됩니다.
 
AWS가 내부적으로 여러 KPU 리소스를 하나의 논리적/물리적 Task Manager 프로세스에 통합하기 때문입니다
 
즉, 단일 Task Manager 인스턴스에 할당된 슬롯 수가 ParallelismPerKPU 값보다 많게 보입니다.
 
 

정리

구분
순수 Apache Flink 환경
Amazon Managed Service for Apache Flink (MSF) 환경
기본 리소스 단위
Task Manager (작업 실행 및 슬롯 호스팅)
KPU (Kinesis Processing Unit) (과금 및 용량 프로비저닝 단위)
병렬도 정의 기준
Task Manager 내의 Task Slot
KPU당 할당된 Task Slot 수 (ParallelismPerKPU)
실제 실행 컨테이너
Task Manager 프로세스
Task Manager (AWS가 여러 KPU 리소스를 통합하여 실행 가능)
메모리 확장
Task Manager 메모리 설정(taskmanager.memory.managed.fraction) 조정
총 KPU 수 증가에 따라 Managed Memory 총량 증가
 사용자는 ParallelismParallelismPerKPU 두 가지 주요 속성을 설정하여, Managed Service for Apache Flink가 프로비저닝할 최종 리소스 크기와 내부 실행 구조를 간접적으로 제어합니다. 일반적인 Flink가 TaskManager 단위로 관리하는 부분과 약간의 개념차이가 있음을 이해하면 됩니다.
 

1. 총 할당된 KPU 계산 (리소스 및 메모리 결정)

Amazon MSF 애플리케이션에 할당되는 총 KPU 수는 다음 공식에 의해 결정됩니다.
 
{KPU} = {Parallelism} / {ParallelismPerKPU}
 
 
 Managed Memory: 총 Managed Memory 용량은 이 할당된 KPU 수에 정비례하여 증가합니다
(1 KPU당 약 1 GB의 네이티브 메모리 제공).
 

2. 총 Task Slot 수 (실행 능력)

 
 
Task Slot의 역할: 각 Task Slot은 연산자(Operator)의 병렬 인스턴스(Subtask)를 실행하는 데 사용될 수 있습니다.
Operator Parallelism: 각 연산자별 병렬 처리 수준(Operator Parallelism)은 기본적으로 애플리케이션의 Parallelism 값과 동일합니다. 이는 필요하다면 각 연산자가 사용 가능한 모든 Subtask(Task Slot)를 사용할 수 있음을 의미합니다.
 

3. Task Manager의 개수/메모리 제어 정리

항목
사용자가 직접 제어하는가?
제어 방법 및 결과
KPU 개수
아니요.
ParallelismParallelismPerKPU 설정을 통해 간접적으로 계산되어 할당됩니다.
KPU 수를 늘리려면 Parallelism을 늘리거나 ParallelismPerKPU를 낮춰야 합니다.
Task Manager 개수
아니요.
KPU 할당에 따라 AWS 서비스에 의해 자동 관리됩니다
.
Task Manager는 할당된 KPU 리소스 내에서 애플리케이션의 태스크를 실행합니다.
총 Managed Memory
아니요.
할당된 총
KPU 수에 비례하여 크기가 결정됩니다.
Managed Memory 용량을 늘리려면 KPU 수를 늘려야 합니다.
Task Slot 개수
예.
ParallelismPerKPU를 통해 Task Manager당 슬롯 수를 제어하고, Parallelism을 통해 총 슬롯 수를 제어합니다.
Parallelism을 늘리는 것은 총 Task Slot 수를 늘리는 가장 직접적인 방법입니다.

 

실제 예시

- Parallelism 20, ParallelismPerKPU 1 = 총 20 KPU

1. 리소스 할당 (KPU 기준)

Amazon MSF에서 1 KPU1 vCPU4 GB의 메모리를 제공합니다. 이 4 GB 메모리 중 1 GB는 RocksDB State Backend와 같은 네이티브 프로세스를 위한 네이티브 메모리(Managed Memory의 기반)로 예약됩니다.
항목
20 KPU가 제공하는 총 예상 리소스
생성 결과 (3 TM 합산)
총 vCPU
vCPU
CPU
총 Physical Memory
GB
GB
총 Managed Memory
GB (네이티브 메모리 할당 기준)
GB
 

2. Task Manager 개수 및 Slot 할당 검증

A. 총 Task Slot 수

애플리케이션의 총 병렬 처리 수준(Total Task Slots)은 설정된 Parallelism 값과 같습니다.
설정: 20 KPU, ParallelismPerKPU = 1.
계산된 Parallelism (총 슬롯): .
생성 결과: 3개 Task Manager에   개의 활성 슬롯
 

B. Task Manager당 슬롯 개수와 ParallelismPerKPU의 차이

생성 결과 (3개의 Task Manager가 각각 7개의 슬롯을 가짐)가 다른 이유는
 
1. KPU는 리소스 단위, Task Manager는 실행 컨테이너
AWS Managed Flink는 KPU를 리소스를 할당하는 기본 단위로 사용하지만, 실제 Task Manager 프로세스(컨테이너)는 여러 KPU의 리소스를 하나로 묶어(Grouping) 실행
2. 그룹화:
    ◦ 총 리소스: 20 KPU
    ◦ 실행 Task Manager: 3개
    ◦ Task Manager당 할당된 평균 KPU: .
3. 슬롯 계산
    ◦ ParallelismPerKPU = 1 이므로, Task Manager당 예상되는 슬롯 수는 Task Manager가 포함하는 KPU 수와 같아야 합니다.
    ◦ Task Manager당 슬롯 수 KPU에 해당하며, 개의 TM이 KPU (총 20 KPU)로 나뉘어 할당되었다고 해석할 수 있습니다.
    ◦ 즉, 하나의 Task Manager 인스턴스가 7개의 KPU 리소스를 관리하며, 이 때문에 7개의 슬롯을 가지게 된 것
 
  20 KPU 리소스를 3개의 Task Manager 인스턴스에 통합하여 실행하는 AWS Managed Flink의 내부적인 리소스 그룹화 정책

AWS Managed Flink 네트워크 오류 원인 분석 및 튜닝 가이드

 
오늘은 AWS MSF(Managed Service for Apache Flink) 애플리케이션에서 발생한 오류(예: RemoteTransportException: Connection unexpectedly closed by remote task manager)의 원인과 해결 방안을 분석합니다.


애플리케이션 로그를 보면 10월 16일 오후 10시 21분 24초에 remote task manager error가 발생했고, 이후 partial recovery 후 다시 TaskManager 연결 오류가 반복되는 모습을 확인할 수 있었습니다.

이러한 로그는 TaskManager 간 통신이 불안정함을 나타내며, 주된 원인으로 메모리/리소스 부족, GC 지연, Heartbeat 타임아웃 등이 의심됩니다.

로그

closed by task manager

 

원인 분석

  • 메모리 부족 및 리소스 고갈: AWS MSF 문서에 따르면, TaskManager가 할당된 메모리를 초과하거나 CPU/메모리 과부하에 걸리면 TaskManager 프로세스가 비정상 종료되고, 이로 인해 RemoteTransportException이 발생할 수 있습니다 .
    • 예를 들어, KPU(Kinesis Processing Unit)당 메모리(4GiB, 이 중 3GiB는 힙 메모리)가 부족하면 OOM(OutOfMemory)으로 TaskManager가 죽을 수 있습니다  . 이 경우 Flink가 TaskManager 연결이 끊어졌다고 보고 오류를 출력합니다.
    • 현재 처리하고 있는 내부 Key 값 및 State의 용량이 크기 때문에 heapMemory사용량이 90%를 유지하고 있어서 원인 중 하나일 것으로 예상합니다.
  • GC(Garbage Collection) 지연: 장시간 Full GC가 발생하면 TaskManager가 일정 시간 응답하지 못해 JobManager에 Heartbeat를 제때 보내지 못합니다. Flink 커뮤니티의 해결책에 따르면, 이러한 경우 TaskManager가 ‘unresponsive’ 상태로 간주되어 슬롯이 해제되고, 결국 연결이 끊어집니다.
    • 이를 방지하려면 G1GC와 같은 빠른 GC를 사용하거나 TaskManager 메모리 설정을 조정해 GC 시간을 단축해야 합니다 . 또한, 필요시 akka.watch.heartbeat.pause와 같이 Heartbeat 시간을 늘려 JobManager의 타임아웃을 늘리는 방안도 고려할 수 있습니다.
    • AWS MSF는 이미 G1GC 적용이 되어 있었습니다.
  • Data Skew (데이터 불균형)
    • 특정 TaskManager 또는 Subtask에 비정상적으로 많은 데이터가 집중되어 처리 부하가 쏠릴 수 있습니다.
    • 이 현상은 다음과 같은 이유로 RemoteTransportException과 같은 네트워크 오류로 이어질 수 있습니다.
      • 특정 TaskManager만 메모리 과부하
        • 동일한 데이터 분포를 가정하고 병렬 처리를 수행하지만, 실제로는 일부 키(key)나 파티션에 데이터가 몰리면 해당 TaskManager만 힙 메모리가 1GB 이상까지 치솟습니다.
      • 백프레셔(Backpressure) 전파:
        • 특정 Subtask가 느려지면 상류(upstream)의 데이터 송신 속도가 제한되고, 네트워크 버퍼가 포화되어 Task 간 통신 오류Partial Recovery가 반복됩니다.
      • 네트워크 셔플 불균형:
        • Flink는 keyBy, rebalance, broadcast 연산 시 네트워크를 통해 데이터를 재분배합니다. 이때 데이터 스큐가 있으면 특정 TaskManager의 네트워크 I/O가 폭증하며, 네트워크 버퍼 부족과 연결 끊김이 함께 발생할 수 있습니다.
    • 내부적으로 각 이벤트를 시나리오와 매핑하는 처리를 하고 있는데 특정 시나리오의 이벤트가 많이 발생하고 있어서 역시 의심이 되는 부분입니다.
    • 시나리오 매핑이후는 개별Key를 통해서 keyBy연산으로 하고 있기 때문에 체이닝이 적절하게 동작하고 있습니다.
  • 네트워크 버퍼 부족: 병렬처리가 높아지거나 연산자 체이닝이 해제되면 셔플(shuffle)이 증가해 네트워크 버퍼 소모가 커질 수 있습니다. AWS 문서에는 네트워크 버퍼가 부족해지면 Insufficient number of network buffers 오류가 발생할 수 있으며, 이를 해결하기 위해 parallelismPerKPU를 낮춰 TaskManager당 메모리(및 버퍼)를 늘리거나 연산자 체이닝을 활용할 것을 권장하고 있습니다 
    • 참고로 한 플로우에서 필요한 버퍼 개수는 (slots_per_TM)^2 * number_of_TMs * 4 공식으로 대략 산출할 수 있습니다.
    • 즉, TaskManager의 슬롯 수나 셔플 횟수가 많을수록 네트워크 버퍼가 대폭 늘어납니다.
  • Heartbeat 타임아웃: Flink의 Heartbeat 설정이 너무 짧거나 네트워크 지연이 있으면 TaskManager와 JobManager 간 연결이 끊길 수 있습니다.
    • 이 경우 AWS MSF에서는 heartbeat.timeout 설정을 지원 티켓으로 늘릴 수 있으며 , 위에서 언급한 GC 지연 대응책(Heartbeat 지연 허용)도 도움이 됩니다.
  • 기타 원인: TaskManager 노드의 일시적 네트워크 장애, 컨테이너 리소스 제한(예: CPU 쿼터 초과), 코드 레벨 메모리 누수 등도 원인이 될 수 있습니다.
    • TaskManager 로그에서 OOM 오류나 비정상 종료 메시지를 확인하고, 메모리 누수 가능성을 코드 리뷰로 점검해야 합니다.

 

해결 방안 및 튜닝 가이드

    • KPU 및 병렬성 조정: KPU 리소스를 늘리거나 TaskManager당 할당되는 작업 수(ParallelismPerKPU)를 줄여 각 작업에 더 많은 메모리를 할당합니다.
      • AWS 문서에 따르면 하나의 KPU는 1 vCPU와 4GiB 메모리(힙 3GiB, 네이티브 1GiB), 50GiB 스토리지를 제공합니다 .
      • 예를 들어, ParallelismPerKPU=4(기본)인 경우 KPU당 4개의 슬롯에 약 1GiB씩 메모리가 분할되지만, ParallelismPerKPU=2로 낮추면 슬롯당 약 2GiB를 확보할 수 있습니다.
      • 전체 병렬도(Parallelism)가 고정되어 있다면 ParallelismPerKPU를 낮추면 필요한 KPU수가 증가하여 리소스가 더 할당됩니다 .
      • AWS도 PPK를 낮춰 메모리당 할당량을 높이는 방안을 제시하고 있으며, 이를 통해 메모리 부족으로 인한 OOM 재시작을 줄일 수 있습니다. (단, PPK를 낮추면 비용이 늘어나므로 병렬도를 적절히 조정하거나 오토스케일링을 활용해야 합니다.)
    • Data Skew 완화
      • Key 균등화 전략 적용 (keyBy 연산 시 key 값의 불균형이 발생하는 경우)
        • 해시 기반 샘플링 후 파티션 재조정
        • “composite key” (예: userId_mod_10) 기반 가상 key 확장
        • rebalance() → map() 단계 삽입을 통해 균등 분배 가능 
    • Aggregation 단계 분리
      • 데이터가 한 TaskManager로 몰리는 경우, 1차 로컬 집계(Local aggregation) → 2차 글로벌 집계(Global aggregation) 구조로 바꾸면, 네트워크 트래픽 및 skew 부담을 크게 줄일 수 있음
    • Metrics 기반 모니터링 : 특정 TaskManager만 유독 높아서 key 분포 불균형 확인함
      • numRecordsIn, busyTimeMsPerSecond, heapMemoryUsed 메트릭을 TaskManager 단위로 비교하여 Skew 구간을 확인
  • Operator 체이닝 활용 및 네트워크 최적화: 가능한 Operator들을 체이닝하여 별도 서브태스크로 분리될 필요가 없도록 합니다.
    • MSF 문서에 따르면 체이닝은 동일 서브태스크 내에서 연산자들을 묶어 실행함으로써 네트워크 호출을 줄이고 리소스를 절약합니다.
    • 반대로 체이닝을 해제하거나 인위적으로 새 체인을 시작하면 불필요한 셔플이 증가합니다. 실제 사례에서도 체이닝 해제가 네트워크 부담을 크게 늘려 오류 원인이 되므로, 가능하면 체이닝을 활성화할 것을 권장합니다.
    • 또한, 앞서 언급한 네트워크 버퍼 부족을 피하기 위해 플로우를 단순화하고 셔플 단계를 최소화하는 것도 중요합니다.
  • I/O 바운드 워크로드 고려: 외부 서비스와의 블로킹 I/O가 많은 경우, 오히려 ParallelismPerKPU를 높여 KPU 리소스를 최대한 활용하는 것이 효율적일 수 있습니다 .
    • 즉, 네트워크 지연이 주요 병목이라면 병렬 슬롯을 늘려 I/O 대기 동안 더 많은 작업을 병렬 처리하도록 조정합니다.
    • 하지만 현재는 TaskManager 재시작 문제가 우려되므로, 우선은 안정적인 메모리 확보에 중점을 두고 PPK를 낮추었습니다.
  • 모니터링 강화 및 추가 점검: CloudWatch의 주요 메트릭을 면밀히 모니터링합니다.
    • cpuUtilization, heapMemoryUtilization 등은 전체 리소스 사용률을 보여주며 문제 지표가 될 수 있습니다.
    • 또한 oldGenerationGCTime과 같은 GC 메트릭을 살펴보면 GC 지연 여부를 판단할 수 있습니다. 장애가 발생할 때는 CloudWatch Downtime, FullRestarts 값도 확인합니다.
    • TaskManager JVM 로그에서 GC 타임, 힙 사용량, OOM 에러를 찾아보고, 코드에 메모리 누수 가능성은 없는지 검토해야 합니다.
      • TaskManager 로그를 보고 싶으면 Flink구성에서 로그항목 [Application]으로 설정해야 합니다.
    • Heartbeat 타임아웃이 잦다면 heartbeat.timeout 설정을 AWS 지원 티켓으로 늘려보는 것도 고려할 수 있습니다 .
  • 추가 적인 점검 사항
    • 일시적인 AWS 네트워크 장애 가능성도 염두에 두고, 다른 애플리케이션이나 Kinesis 스트림의 상태(예: 지연, 스로틀링)도 확인해 봅니다.
    • 코드 레벨에서는 대용량 데이터 처리 시 메모리 소비가 급증하지 않는지, Checkpointing 지연으로 백프레셔가 걸리지는 않는지도 함께 살펴봅니다.

 

결론

 
이번 오류는 단순한 네트워크 불안정이 아니라,

  • TaskManager 메모리 부족
  • Data Skew로 인한 특정 TaskManager 과부하
  • GC 지연 및 Heartbeat 누락
  • 복합적으로 작용한 결과입니다.

 
현재 발생한 오류를 해결하기 위해서는 메모리 확보, DataSkew 해소 및 네트워크 통신 부하 완화가 주요 내용입니다.

다음과 같은 액션을 수행했습니다.

  1. ParallelismPerKPU 재조정
    1. TaskManager 슬롯 수를 줄여 각 슬롯이 더 많은 메모리를 갖도록 합니다.
    2. PPK를 4에서 2로 낮추어 슬롯당 메모리 할당량을 늘림 (1G -> 2G)
  2. 전체 병렬도 검토
    1. PPK를 낮춘 뒤에 처리량이 부족하다면, 전체 병렬도(Parallelism) 자체를 늘려 필요한 KPU 수를 확장
  3. Data Skew 해소
    1.  DataStream에서 .rebalance() 수행 후 이후 map() 하도록 파이프라인 수정
  4. 비용 대비 효율 검토
    1. 리소스 조정으로 비용이 상승할 수 있으므로, 변경 전후의 KPU 사용량과 애플리케이션 처리율을 비교하여 최적점을 찾습니다
  5. 모니터링 강화
    1. heapMemoryUtilization, oldGenerationGCTime, cpuUtilization, fullRestarts 주요 메트릭을 CloudWatch에서 계속 감시하고, 설정 변경 후 리소스 압박이 해소되었는지 검증이 필요 !!

 
이상으로 발생한 TaskManager 관련 오류 원인을 분석하고 해결 방안을 공유합니다.

다음 글에서는 AWS Managed Flink에서 KPU, Parallelim 의 관계와 TaskManager에 할당되는 관계를 알아보도록 하겠습니다.
https://icthuman.tistory.com/entry/%EC%8B%A4%EC%8B%9C%EA%B0%84-%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%B2%98%EB%A6%AC-%EA%B5%AC%ED%98%84-4-Parallelism-ParallelismPerKPU


 
참고: AWS MSF 공식 문서 및 Flink 자료
- https://docs.aws.amazon.com/managed-flink/

 

https://docs.aws.amazon.com/managed-flink/

docs.aws.amazon.com

- https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/memory/mem_setup_tm/

 

Set up TaskManager Memory

Set up TaskManager Memory # The TaskManager runs user code in Flink. Configuring memory usage for your needs can greatly reduce Flink’s resource footprint and improve Job stability. The further described memory configuration is applicable starting with t

nightlies.apache.org

 

<이전글>

https://icthuman.tistory.com/entry/AWS-Cognito-1-%EC%82%AC%EC%9A%A9%EC%9E%90%EA%B4%80%EB%A6%ACOAuth-20

 

AWS Cognito (1) - 사용자관리/OAuth 2.0

- 대부분의 서비스에서 공통적으로 필요한 기능이 사용자 관리/로그인 기능이다. 단순한 것 같지만 생각보다 많은 리소스가 필요한 시스템이다. (특히 권한연계까지 들어갈 경우) - 기존에는 자

icthuman.tistory.com

 

<개요>

- 가장 많이 사용되는 Authorization Code Grant방식을 이용해서 JWT Token 을 발급받는다.

- 토큰으로부터 claims를 얻고 유효한지 검증한다.

- 기존방식 토큰과의 호환성을 검토한다. (JWT / JWS, Secret Key / RSA)

 

<내용>

1. Authorization Code Request

- Cognito의 설정을 마쳤으니 호스팅UI를 통해서 로그인을 시도한다.

- 반드시 다음값을 파라미터로 넘겨주어야 한다.

 a. client_id : Cognito에서 생성한 클라이언트 ID

 b. response_type : code

 c. scope : resource에 접근할 수 있는 범위

 d. redirect_uri : authorization_code를 받아서 리다이렉트할 주소

 

2. Authorization Code Redirect ( redirect_uri )

- 해당 코드값을 통해서 /oauth2/token에 접근한다.

- access_token, id_token 등이 발급되면 응답을 처리한다.

- 코드로 살펴보면 다음과 같다.

CognitoTokenRequest cognitoTokenRequest = cognitoTokenRequestFactory.newCognitoTokenRequest(code);

        Mono<String> ret = webClient
                .post()
                .uri("/oauth2/token")
                .headers(h -> h.setBasicAuth(cognitoTokenRequest.getBasicAuth()))
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .body(BodyInserters.fromObject(cognitoTokenRequest.getMap()))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new RuntimeException(clientResponse.bodyToMono(String.class).block())))
                .bodyToMono(String.class)
                .onErrorReturn(null);
        ;

        CognitoTokenResponse cognitoTokenResponse = null;
        String temp = ret.block(Duration.ofMillis(cognitoTokenApiTimeout));
        try {
            cognitoTokenResponse = objectMapper.readValue(temp , CognitoTokenResponse.class );
        } catch (JsonProcessingException e) {
            log.error("error cognito's response is not valid", e.getLocalizedMessage());
        }
        return CompletableFuture.completedFuture(cognitoTokenResponse);

- 일반적으로 MediaType.APPLICATION_FORM_URLENCODED 방식으로 호출할 경우 MultiValueMap 을 사용하게 되는데 해당 값을 FirstCollection 형태로 처리하고 Factory를 통해서 만들도록 하여 소스를 간결하게 처리했다.

- 또한 /oauth2/token에 접근하기 위해서는 basicAuth로 인증을 해야 하는데 이 때 필요한 Secret값은 다음과 같은 형태로 생성한다.

Base64.getEncoder().encodeToString( (clientId+":" + clientSecret).getBytes() );

- Response는 다음과 같은 형태로 확인할 수 있다.

{
"access_token":
    "id_token":
    "tokenType":
    "expiresIn":
}

- 이 때도 주의해야 하는 점이 있는데 AWS Cognito의 응답구조는 cameCase가 아니라 snakeCase로 넘어온다

ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);

- Mono,CompletableFuture에 대해 부연설명을 하자면 Spring WebFlux에서는 비동기/논블로킹을 일반적인 처리형태로 가져간다.

 다만 이번 소스처럼 일부 동기화 구간이 필요할 경우 block() 을 사용하게 되고 이때 별도의 쓰레드로 대기하도록 ComletableFuture를 활용하여 @Async 처리를 하는 것이 좋다. 과거글 참조

https://icthuman.tistory.com/entry/Spring-WebClient-%EC%82%AC%EC%9A%A9-2-MVC-WebClient-%EA%B5%AC%EC%A1%B0

 

Spring WebClient 사용 #2 (MVC + WebClient 구조)

- Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다. - 현재 구성 중인 시스템에는 동기/비동기 API가 혼재되어 있으면서, 다양한 Application / DB를 사용중이기 때

icthuman.tistory.com

 

3. JWT Token 

- JWT Token은 {header}.{body}.{signature} 의 형태로 구성되어 있으며 Cognito에서 응답받은 accessToken토큰의 내용을 확인해보면 다음과 같다.

- Header부분에 해당 토큰을 생성할 때 사용된 알고리즘과 공개키(비대칭)방식일 경우 kid가 포함된다.

  이를 통해서 서명을 검증할 수 있으며 공개키는 발급자가 제공하는 uri에서 확인할 수 있고, 일반적으로 다음과 같다.

   {issuer_uri}/.well-known/jwks.json

 

- 알고리즘의 대칭 / 비대칭 알고리즘에 대해서는 따로 설명해야 할 만큼 긴데 단순히 요약하면

 a. 대칭 알고리즘 : 서로 같은 키를 사용

 b. 비대칭 알고리즘 : 서로 다른 키를 사용

  e.g) 개인키로 암호화하여 서명을 첨부하고, 공개키를 통해서 서명을 검증할 수 있다.

 

4. JWT Token Encode / Decode (Java)

- io.jsonwebtoken 의 DefaultJwtParser를 살펴보면 세부로직을 좀더 잘 파악할 수 있다.

-  기본 내부에서 사용하던 토큰은 이와 같은 방식으로 되어 있으며 secretKey를 사용하는 대칭 알고리즘 방식이었다.

Jws<Claims> claims = Jwts.parser().setSigningKey(secretKey).parseClaimsJws(refreshToken);

(변수)  = claims.getBody().get("userId");
...
...

그러다 보니 같은 방식으로는 Cogtino의 토큰을 처리할 수 없어서 일단 다음과 같은 방식으로 변경하였다.

Jwt<Header, Claims> preclaims = Jwts.parser().parseClaimsJwt(getUnsignedTokenFromOriginalToken(token));


private static String getUnsignedTokenFromOriginalToken(String token){
        String[] splitToken = token.split("\\.");
        String unsignedToken = splitToken[0] + "." + splitToken[1] + ".";
        return unsignedToken;
} // {[0]}.{[1]}.

- 이와 같이 {signature} 부분을 제외하고 body부분을 읽어올 수 있으며 Header에 대한 정보도 같이 처리가 가능하다.

 다만 누군가 토큰을 위조할 수 있는 가능성이 있어서 아래 Verifier에서 약간 보완을 해보았다.

 

5. JWT / JWS / JWE / JWK

- JWT Token에 대해서 살펴보면 관련된 용어들이 자주 나오는데 간단히 살펴보면 다음과 같다.

 

 A. JWT : 인증을 위한 일반적인 메커니즘. JWS 나 JWE 로 구현된다. (implements)

 [header].[payload].[signature]

 

 B. JWS(JSON Web Signature)

 Claim의 내용이 노출되고 디지털 서명을 하는 방식이다. (Client 가 Claim을 사용하기 위해서 일반적으로 JWS를 사용한다.)

 다음 Signature 에서 일반적으로 사용되는 알고리즘이다.

  • HMAC using SHA-256 or SHA-512 hash algorithms (HS256, HS512)
  • RSA using SHA-256 or SHA-512 hash algorithms (RS256, RS512)

 

C. JWE(JSON Web Encryption)

Claim 자체를 암호화시키는 방식

"header":
{
    "alg" : "RSA-OAEP",                --------------------> For content encryption 
    "enc" : "A256GCM"                  --------------------> For content encryption algorithm
},
 "encrypted_key" : "qtF60gW8O8cXKiYyDsBPX8OL0GQfhOxwGWUmYtHOds7FJWTNoSFnv5E6A_Bgn_2W"
"iv" : "HRhA5nn8HLsvYf8F-BzQew",       --------------------> initialization vector
"ciphertext" : "ai5j5Kk43skqPLwR0Cu1ZIyWOTUpLFKCN5cuZzxHdp0eXQjYLGpj8jYvU8yTu9rwZQeN9EY0_81hQHXEzMQgfCsRm0HXjcEwXInywYcVLUls8Yik",
"tag" : "thh69dp0Pz73kycQ"             --------------------> Authentication tag
}

 

D. JWK(JSON Web Key)

private key로 만들어진 signature 를 검증하기 위해서 public key를 제공하는 구조이다.

{
"alg":"RSA",

"mod": "0vx7agoebGcQSuuPiLJXZptN9nndrQmbXEps2aiAFbWhM78LhWx4cbbfAAtVT86zwu1RK7aPFFxuhDR1L6tSoc_BJECPebWKRXjBZCiFV4n3oknjhMs
tn64tZ_2W-5JsGY4Hc5n9yBXArwl93lqt7_RN5w6Cf0h4QyQ5v-65YGjQR0_FDW2QvzqY368QQMicAtaSqzs8KJZgnYb9c7d0zgdAZHzu6qMQvRL5hajrn1n91CbOpbI
SD08qNLyrdkt-bFTWhAI4vMQFh6WeZu0fM4lFd2NcRwr3XPksINHaQ-G_xBniIqbw0Ls1jF44-csFCur-kEgU8awapJzKnqDKgw",

"exp":"AQAB",

"kid":"2011-04-29"
}

 

 

6. Customize Token Verifier

private LoginUserDetails makeUserDetailsFromToken(String token) throws TimeoutException, ExecutionException, InterruptedException {
        Jwt<Header, Claims> preclaims = Jwts.parser().parseClaimsJwt(getUnsignedTokenFromOriginalToken(token));
        String iss = (String)preclaims.getBody().get("iss");

        if(StringUtils.hasText(iss) && iss.startsWith("https://cognito-idp.ap-northeast-2.amazonaws.com")){  // 1. Cognito Token

            CognitoUserInfoResponse cognitoUserInfoResponse = cognitoService.getAuthInfoFromAccessToken(token).get(3, TimeUnit.SECONDS);
            if(cognitoUserInfoResponse== null || cognitoUserInfoResponse.getSub().isEmpty()){
                return null;
            }
            Jwt<Header, Claims> claims = Jwts.parser().parseClaimsJwt(getUnsignedTokenFromOriginalToken(token));
        	...
            Collection<GrantedAuthority> authorityList = new ArrayList<>();
            List<String> cognitoGroups = (List<String>)claims.getBody().get("cognito:groups");
            if(null != cognitoGroups) {
                for (String group : cognitoGroups) {
                    authorityList.add(new SimpleGrantedAuthority(group));
                }
            }
            return new LoginUserDetails(......, authorityList);
        }else{                                                                                                // 2. Legacy Token
            Jws<Claims> claims = Jwts.parser().setSigningKey(secretKey).parseClaimsJws(token);
            ...
            Collection<GrantedAuthority> authorityList = new ArrayList<>();
            List<String> authorities = (List<String>)claims.getBody().get("roles");
            if(null != authorities) {
                for (String auth : authorities) {
                    authorityList.add(new SimpleGrantedAuthority(auth));
                }
            }
            return new LoginUserDetails(..., authorityList);
        }
    }

 

 A. 발급자가 Cognito라면

 -- 발급된 토큰이 정상인지 확인요청을 한다. (/oauth/userInfo)

   => 이부분은 사실 원격요청을 하지 않고 제공되는 공개키, pem을 통해서 로컬에서 검증이 가능하지만 현재 java로는 직접 구현해야 하는 부분이 많고, 자주는 아니지만 공개키는 변경될 수 있기 때문에 키값을 확인하기 위해서는 결국 원격요청이 필요해서 쉽게 가기로 했다. 

 -- "cognito:groups" 정보를 이용해서 권한을 세팅한다.

 B. 기존 토큰이라면

 - secretKey를 이용해서 검증하고

 - "roles" 정보를 이용해서 권한을 세팅한다.

 

<정리>

이렇게 만들면 기존의 JWT Token(HS256) 와 신규 Cognito Token (RSA256) 를 모두 소화할 수 있다.

그러나 만들어 놓고보니 소스가 지저분하다. ResourceServer에서 권한을 확인하여 API접근제어를 해야하는데 좋은 구조는 아니다.

  - 로직이 복잡하고(구조 특성상 해당모듈을 그대로 copy해가는 방식이 될텐데 유지보수성이...영), secretKey 공유의 문제도 있다.

  - 꼭 두 가지 토큰을 소화할 필요가 있을까?

  - 기존 방식은 외부 사용자가 고려되지 않은 방식이고, OAuth 2.0은 출발자체가 3rd Party연계를 간편하게 하기 위한 방법이다.

 

다양한 확장을 위해서 인증은 각각의 방식을 유지하고 서비스를 사용자에 따라서 분리하는 것이 해결책이라는 판단을 내렸다.

- 내부 사용자가 이용하며 Client, Resource Server가 같은 Boundary에 있는 레거시의 경우 기존 토큰을 사용하고

- 외부 사용자가 이용해야 하는 API 서비스는 별도로 분리 구성하며, 이 서비스에서는 OAuth 2.0만을 활용하여 인가/인증을 하도록 한다.

  (이부분은 AWS API Gateway내에서 자격증명으로 Cognito를 연계하는 방식과 시너지를 낼 수 있을 것 같다.)

 

- 이를 위해서 보다 쉽게 Resource Server를 세팅하는 법을 Spring Security에서 제공하고 있는데 다음글에서 좀 더 자세히 적용해보고 서비스를 어떻게 분리하는 것이 보다 효과적일지 살펴본다.

 

<참조>

https://aws.amazon.com/ko/premiumsupport/knowledge-center/decode-verify-cognito-json-token/

 

Cognito JSON 웹 토큰의 서명 디코딩 및 확인

Amazon Cognito JSON 웹 토큰의 서명을 디코딩 및 확인하려면 어떻게 해야 합니까? 최종 업데이트 날짜: 2022년 9월 6일 Amazon Cognito 사용자 풀을 애플리케이션의 인증 방법으로 사용하고 싶습니다. 클라

aws.amazon.com

https://docs.aws.amazon.com/cognito/latest/developerguide/amazon-cognito-user-pools-using-tokens-verifying-a-jwt.html

 

Verifying a JSON web token - Amazon Cognito

Amazon Cognito might rotate signing keys in your user pool. As a best practice, cache public keys in your app, using the kid as a cache key, and refresh the cache periodically. Compare the kid in the tokens that your app receives to your cache. If you rece

docs.aws.amazon.com

https://www.loginradius.com/blog/engineering/guest-post/what-are-jwt-jws-jwe-jwk-jwa/

 

What are JWT, JWS, JWE, JWK, and JWA? | LoginRadius Blog

Learn about the JOSE framework and its specifications, including JSON Web Token (JWT), JSON Web Signature (JWS), JSON Web Encryption (JWE), JSON Web Key (JWK), and JSON Web Algorithms (JWA). For easier reference, bookmark this article.

www.loginradius.com

 

<개요>

- 대부분의 서비스에서 공통적으로 필요한 기능이 사용자 관리/로그인 기능이다. 단순한 것 같지만 생각보다 많은 리소스가 필요한 시스템이다. (특히 권한연계까지 들어갈 경우)

- 기존에는 자체적으로 사용자 관리 시스템을 운영하고 있었다. (Password Grant방식)

 그 이유는 특별한 사용자의 정보를 담지 않고 있으며 인증, Resource Server, Client가 모두 같은 System Boundary에 포함되어 있었기 때문에 가장 간편한 방식으로 개발하였다.

- 이제부터는 관리기능을 확장시키고 추가 정보를 담아야 하는 요건이 생겼으며, 이를 위해서 사용자 인증에 관련된 부분을 별도 시스템으로 분리하여 구성하는 것이 좋다고 판단하였다.

- AWS Cognito의 경우 외부연동, MFA인증등 편리한 기능을 기본적으로 가지고 있으며 JWT Token, IAM 연동도 가능한 것으로 보여서 가능성을 검토해본다.

 a. 기존 시스템을 유지하면서 개선할 것인지 (Spring Boot / Security / JWT Token)

 b. 새로 개발할 것인지 (e.g AWS Cognito)

 c. 혹은 a,b를 혼합하여 가져갈 수 있을지 (커스터마이징의 범위)

 

<내용>

1. 사용자 풀

사용자 풀

- 사용자를 생성하고 그룹을 할당할 수 있다.

- 이메일 확인 및 비밀번호 초기화 등의 기능을 기본으로 제공한다.

- 그룹은 개별적으로 추가 가능하다.

- AWS Cognito user pool은 기본적으로 대소문자를 구분하지 않도록 되어있다. (권장사항)

  만약에 대소문자를 구별하고 싶다면 Make user name case sensitive 옵션을 활성화 해야한다.

https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-case-sensitivity.html

 

User pool case sensitivity - Amazon Cognito

User pool case sensitivity Amazon Cognito user pools that you create in the AWS Management Console are case insensitive by default. When a user pool is case insensitive, user@example.com and User@example.com refer to the same user. When user names in a use

docs.aws.amazon.com

 

2. 연동자격증명 공급자

- 우리가 SNS연동을 하기 위해서는 일반적으로 App을 생성하고 할당받은 Client ID, Secret을 이용하여 API호출 및 연계개발을 하게 되는데 이부분이 쉽게 설정으로 가능하도록 되어있다. (많이 사용하는 Google / Apple / Facebook)

- 그 외에도 표준 OIDC를 준수한다면 쉽게 추가하고 속성을 매핑할 수 있는 구조로 되어있다.

자격 증명 공급자 : Google

 

3. SMS / SNS

-  스타트업에서는 내부에 문자 / 이메일 발송기능이 없는 경우도 많은데 AWS SES / SNS 등과 쉽게 연동하여 활용이 가능하다.

- 단, SMS는 현재 도쿄 리전에서만 사용이 가능하며 샌드박스 내에서는 등록된 번호로만 발신이 가능하다.

 

4. App 구성

- AWS Cognito도 결국 우리가 만든 서비스와 연결되기 위해서 여러가지 요소가 필요한데 이에 대한 부분도 제공하고 있다.

- 도메인 : Cognito 도메인 or 사용자 지정도메인 가능

- 리소스 서버 구성 (OAuth 2.0)

- 호스팅 UI, 앱 클라이언트

 

5. OAuth 2.0 

- 앱 유형을 선택하기 전에 OAuth 2.0에 대한 기본적인 이해도가 있으면 도움이 된다.

https://icthuman.tistory.com/entry/OAuth-20-Flow

 

OAuth 2.0 Flow

1. Authorization Code - 권한 부여 승인을 위해서 자체생성한 Authorization Code를 전달하는 방식 - 기본이 되는 방식 - Refresh Token 사용이 가능 2. Client Credentials - 클라이언트의 자격증명만으로 Access Token을

icthuman.tistory.com

 

6. 앱 클라이언트

- 앱 클아이언트 부분을 좀 더 자세히 살펴보면 다음과 같다.

- 클라이언트 보안키 생성유무 선택에 따라서 추후 토큰발급 엔드포인트를 호출할 때 방법에 차이가 있다.

- 세션, 토큰( Id/Access/Refresh )별 만료시간을 선택할 수 있으며 Revoke Token에 대해서 선택사항으로 정할 수 있다.

- 사용자를 찾을 수 없다고 응답을 하면 해킹에 취약해질 수 있기 때문에 최근 로그인 시스템에서는 이름 또는 암호가 잘못되었다고 묶어서 표현하고 있다.

 

6. 호스팅 UI

- 자체적으로 로그인 UI 를 제공해준다.

- 주의할 점은 사용자 풀에서 연동자격증명을 선택하였다면 여기서도 선택을 해주어야 UI 상에서 버튼이 활성화 된다.

- 해당 호스팅UI는 내가 등록한 App에서만 사용하는 것이다. 따라서 호출할 때 다음 파라미터들을 전달해주어야 정상적으로 동작한다.

- client_id, response_type, scope, redirect_uri

 

<정리>

- AWS Cognito에서 User Pool을 생성하고 Identity Providers를 추가한 뒤 호스팅 UI 를 통해서 로그인/가입까지 완료했다.

- 다음 글에서는 OAuth 2.0 기본 Flow를 따라서 Cognito 사용자 풀과 호스팅 UI를 연동하여 Authorization Code를 발급받고

- 이를 기반으로 엔드포인트에 접근하여 Token발급, UserInfo 조회하는 기능을 살펴본다.

 

<참고>

- https://aws.amazon.com/ko/cognito/details/

 

기능 | Amazon Cognito | Amazon Web Services(AWS)

 

aws.amazon.com

- https://docs.aws.amazon.com/ko_kr/cognito/latest/developerguide/what-is-amazon-cognito.html

 

Amazon Cognito란 무엇입니까? - Amazon Cognito

Amazon Cognito란 무엇입니까? Amazon Cognito는 웹 및 모바일 앱에 대한 인증, 권한 부여 및 사용자 관리를 제공합니다. 사용자는 사용자 이름과 암호를 사용하여 직접 로그인하거나 Facebook, Amazon, Google

docs.aws.amazon.com

 

<개요>

이전글

https://icthuman.tistory.com/entry/AWS-Java-SDK-S3-File-upload

 

AWS Java SDK - S3 File upload

<개요> - 기본적인 AWS Java SDK S3 사용예제 - 사용시 주의사항 <내용> 현재 버전(2022.05.19기준) 샘플 소스이며, 공식 가이드문서를 참고하는 것이 정확합니다. 1. Configuration @Configuration public class..

icthuman.tistory.com

 

<내용>

- 기존 버전에서는 ObjectMetadata를 null로 처리하였으나 추가요건을 처리하다보니 개선해야 할 부분이 있었습니다.

- CacheControl, ContentType

 

1. CacheControl

- 해당 파일을 이용하여 서비스할 경우 CacheControl 속성으로 max-age값을 주어서 캐시를 활용할 수 있습니다.

 

2. ContentType

- contentType을 지정하지 않을 경우 file upload시 octet-stream으로 동작하여 특정 이미지포멧의 경우 에러가 발생하는 경우를 볼수 있습니다. 

 

그 외에도 필요에 따라서 추가할 수 있는 값들을 AWS Document에 따라서 적절하게 사용하면 됩니다.

public FileDto upload(MultipartFile file, String prefix) throws IOException {
    SimpleDateFormat date = new SimpleDateFormat("yyyyMMddHHmmss");
    String fileName = prefix +"-"+date.format(new Date())+"-"+file.getOriginalFilename();
    String s3location = bucket +"/"+ prefix;

    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setCacheControl(cacheControl);
    objectMetadata.setContentType(file.getContentType());

    amazonS3.putObject(new PutObjectRequest(s3location, fileName, file.getInputStream(), objectMetadata));

    FileDto fileDto = new FileDto();
    fileDto.setS3Location(amazonS3.getUrl(s3location,fileName).toString());
    fileDto.setS3Key(fileCategory.toString() +"/" + fileName);
    return fileDto;
}

 

<참조>

https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/UsingMetadata.html#object-key-guidelines

 

객체 메타데이터 작업 - Amazon Simple Storage Service

PUT 요청 헤더는 크기가 8KB 이하여야 합니다. PUT 요청 헤더에 포함되는 시스템 정의 메타데이터의 크기는 2KB 이하여야 합니다. 시스템 정의 메타데이터의 크기는 US-ASCII로 인코딩된 각 키와 값의

docs.aws.amazon.com

 

<개요>

- 기본적인 AWS Java SDK S3 사용예제

- 사용시 주의사항

 

<내용>

현재 버전(2022.05.19기준) 샘플 소스이며, 공식 가이드문서를 참고하는 것이 정확합니다.

1. Configuration

@Configuration
public class AmazonS3Config {
    
    @Value("${aws.credentials.access-key}")
    private String accessKey;

    @Value("${aws.credentials.secret-key}")
    private String secretKey;

    @Value("${aws.region.static}")
    private String region;

    public BasicAWSCredentials awsCredentials(){
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
        return awsCreds;
    }

    @Bean
    public AmazonS3 amazonS3Client(){
        AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
                .withRegion(this.region)
                .withCredentials(new AWSStaticCredentialsProvider(this.awsCredentials()))
                .build();
        return amazonS3;
    }
}

- 이와 같이 accessKey, secretKey, region정보를 세팅하여 기본적인 인증을 생성하고 이를 기반으로 S3 ClientBuilder를 통해서 객체를 생성 후 Bean으로 사용한다.

- dev / stg / prod 환경에 따라서 달라질 수 있는 정보들은 외부에 관리한다.

- 주의 : 해당 키값이 탈취당할 경우 매우 위험하니 (보안,비용 등등), Public 공간에 올리는것은 주의하고 올라가게 된다면 반드시 암호화를 한다!

 

2. Repository

@Repository
public class AwsFileRepository {

    private AmazonS3 amazonS3;

    @Value("${aws.s3.bucket}")
    private String bucket;

    @Autowired
    public AwsFileRepository(AmazonS3 amazonS3){
        this.amazonS3 = amazonS3;
    }

    public FileDto upload(MultipartFile file, String prefix) throws IOException {
        SimpleDateFormat date = new SimpleDateFormat("yyyyMMddHHmmss");
        String fileName = prefix +"-"+date.format(new Date())+"-"+file.getOriginalFilename();
        String s3location = bucket +"/"+ prefix;

        amazonS3.putObject(new PutObjectRequest(s3location, fileName, file.getInputStream(), null));

        FileDto fileDto = new FileDto();
        fileDto.setS3Location(amazonS3.getUrl(s3location,fileName).toString());
        fileDto.setS3Key(prefix +"/" + fileName);
        return fileDto;
    }

    public String delete(String s3Key){
        amazonS3.deleteObject(bucket, s3Key);
        return s3Key;
    }

}

- S3 역시 외부의 저장소에 접근하는 것이므로 Repository Layer로 정의하는 것을 추천한다.

- S3 는 버킷단위로 정책을 정의할 수 있으며, 내부에 별도 폴더로 분리 저장이 가능하다. (예제 소스에서 prefix부분)

- 저장할 때의 값 bucketName "/" 이하 부분을 해당 파일에 대한 Key로 관리하면 삭제 시 편리하게 활용할 수 있다.

- 예제에서는 ObjectMetadata를 null로 사용하였으며 필요에 따라서 공식가이드에 제공되는 값을 세팅할 수 있다.

https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/UsingMetadata.html#object-key-guidelines

 

객체 메타데이터 작업 - Amazon Simple Storage Service

PUT 요청 헤더는 크기가 8KB 이하여야 합니다. PUT 요청 헤더에 포함되는 시스템 정의 메타데이터의 크기는 2KB 이하여야 합니다. 시스템 정의 메타데이터의 크기는 US-ASCII로 인코딩된 각 키와 값의

docs.aws.amazon.com

 

<주의사항>

- 대부분의 Public Cloud는 Endpoint와 HTTP API를 제공하고 있으며, SDK는 이것을 감싸는 구조로 되어있다. 

- 따라서 url 인코딩을 주의하여 사용해야 한다.

- 예를 들어서 얼마전에 테스트하다가 파일명에 특이한 문자들을 넣게 되었는데 아래와 같은 오류가 발생하였다.

2022-05-19 10:15:15.447 DEBUG 30235 --- [nio-8900-exec-6] com.amazonaws.request                    : Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: The request signature we calculated does not match the signature you provided. Check your key and signing method. (Service: Amazon S3; Status Code: 403; Error Code: SignatureDoesNotMatch; Request ID: S80BNXP50DTW9SJM; S3 Extended Request ID: , S3 Extended Request ID: 
2022-05-19 10:15:15.449 DEBUG 30235 --- [nio-8900-exec-6] com.amazonaws.retry.ClockSkewAdjuster    : Reported server date (from 'Date' header): Thu, 19 May 2022 01:15:15 GMT
.
.
.
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) [aws-java-sdk-s3-1.11.762.jar:na]
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) [aws-java-sdk-s3-1.11.762.jar:na]
	at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:394) [aws-java-sdk-s3-1.11.762.jar:na]
	at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:5942) [aws-java-sdk-s3-1.11.762.jar:na]
	at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1808) [aws-java-sdk-s3-1.11.762.jar:na]
	at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1768) [aws-java-sdk-s3-1.11.762.jar:na]

- 인증토큰이나 이러한 부분에 문제인줄 알았는데 유사한 문제를 겪은 사람들이 상당히 많았고

https://stackoverflow.com/questions/30518899/amazon-s3-how-to-fix-the-request-signature-we-calculated-does-not-match-the-s

 

Amazon S3 - How to fix 'The request signature we calculated does not match the signature' error?

I have searched on the web for over two days now, and probably have looked through most of the online documented scenarios and workarounds, but nothing worked for me so far. I am on AWS SDK for PHP...

stackoverflow.com

- 결국 encoding과정의 문제로 발생한 것이었다.(에러메시지를 잘 주었다면 덜 고생했을텐데;)

https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/object-keys.html

 

객체 키 이름 생성 - Amazon Simple Storage Service

Amazon S3 콘솔을 사용하여 키 이름이 마침표 '.'로 끝나는 객체의 경우 다운로드한 객체의 키 이름에서 마침표 '.'가 제거됩니다. 다운로드한 객체에 보존된, 키 이름이 마침표 '.'로 끝나는 객체를

docs.aws.amazon.com

 

<개요>
- 다음과 같이 Service #A 에서 Service #B로 데이터 조회 API를 요청하고 값을 받아오는 로직이 있다.
- Service #B에서는 AWS Athena를 저장소로 사용하고 있으며 Athena JDBC42 드라이버를 사용 중 이다.

API 호출 후 응답


<현상>
- Service #B에서 JdbcTemplate을 통하여 쿼리가 수행된 시간은 11:13:13 이고,
2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.core.JdbcTemplate : Executing SQL query
2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
- 실제 쿼리 수행결과를 받아온 시간은 11:15:57 로 약 2분44초 가 소요되었다.
2021-11-04 11:15:57.998 INFO 9668 --- [http-nio-8200-exec-9] ...

- Athena 의 경우 동시에 다수의 쿼리가 수행되면 Queue에 의하여 순차적으로 수행될 수 있기 때문에 쿼리 히스토리를 조회하였다.

11:13:13.542초 시작, 수행시간 0.555초
대기열시간 1분21초

- 대기열 시간 1분21초 + 수행시간 0.555초를 제외하고 꽤 오랜시간이 소요되었다.

<소스분석>
- AthenaJDBC42의경우 일반적인 JDBC드라이버처럼 커넥션을 맺고 Resultset을 처리하는 형태가 아니라 AWS Athena로 Http를 통해서 수행요청을 하고, 리턴값으로 ID를 받아온 뒤 일정시간 Thread Sleep하면서 조회 polling을 요청하고 Status가 Completed가 되었을때 후속처리를 하는 형태로 구성되어 있다.

- 또한 위에도 언급한것처럼 동시에 다수의 요청이 집중될경우 자체적으로 큐에 보관하여 처리하게 된다.

- 부수적으로 Athena JDBC드라이버의 SStatement내 execute, getResultSet등의 메소드를 살펴보면 대부분 synchronized로 선언이 되어있기 때문에 이에 따른 delay도 있지 않을까 예상한다.

 

<Thread Dump>

10개의 Thread가 같은 위치에서 대기중이다.

"http-nio-8200-exec-9" #44 daemon prio=5 os_prio=31 tid=0x00007ffcc655f800 nid=0x8c03 waiting on condition [0x000070000c638000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.simba.athena.athena.api.AJClient.executeQuery(Unknown Source)
at com.simba.athena.athena.dataengine.AJQueryExecutor.execute(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeQuery(Unknown Source)
- locked <0x000000078740ccf8> (a com.simba.athena.athena.jdbc42.AJ42Statement)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at org.springframework.jdbc.core.JdbcTemplate$1QueryStatementCallback.doInStatement(JdbcTemplate.java:439)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:376)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:473)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:480)


<정리>
- 다수의 사용자에게서 발생하는 ad-hoc형태 처리는 적합하지 않다.(hive와 동일함)

- Global cache(Redis)를 적절히 활용하여 Service #B Layer에서 처리를 하도록 하면 효율성을 증가시킬수 있다.(일반적인 캐시전략)

- Red Shift등의 빠른대안도 있으나 가성비가 매우 떨어진다.


<필요사항>

- AWS ECS상 워크로드 중 특정서비스의 로그를 수집/가공 하여 통계 정보를 제공

- 기존에 사용중이던 AWS SDK S3 와 호환성 

 

<개요> 

- https://docs.aws.amazon.com/cloudwatch/index.html

 

https://docs.aws.amazon.com/cloudwatch/index.html

 

docs.aws.amazon.com

- AWS는 Cloudwatch 내에서 특정 로그그룹에 대하여 쿼리할 수 있는 툴 (Insight) 를 제공하고 있다.

- 해당 기능을 Web, SDK등 다양한 형태로 제공하고 있으며 Java SDK를 사용했다.

 

<내용>

- Java용 AWS SDK는 크게 버전 1.x , 2.x 가 있으며 특히 2.x 부터 비동기 NonBlocking 및 CompletableFuture를 제공하고 있기 때문에 신규 개발의 경우 가급적 2.x 를 권장하고 있다.

- 현재 서비스에서 S3용  1.x SDK를 이미 사용중이고 CloudWatchLogs는 2.x SDK를 사용할 예정이기 때문에 Mig 를 하던지 두 가지 버전을 병행하던지 선택한다.

- 기존 기능에 큰 문제가 없기 때문에 일단 병행하기로 결정

 

<환경설정 및 변경사항>

1. pom.xml

- groupId 변경

ver 1.x ver 2.x
com.amazonaws software.amazon.aws.sdk
<dependency>
	<groupId>com.amazonaws</groupId>
	<artifactId>aws-java-sdk-s3</artifactId>
	<version>1.11.762</version>
</dependency>

<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>cloudwatchlogs</artifactId>
</dependency>

<dependencyManagement>
  <dependencies>
      <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>aws-java-sdk-bom</artifactId>
          <version>1.12.1</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
      <dependency>
          <groupId>software.amazon.awssdk</groupId>
          <artifactId>bom</artifactId>
          <version>2.16.1</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
  </dependencies>
</dependencyManagement>

 

2. @Configuration

public BasicAWSCredentials awsCredentials(){
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
        return awsCreds;
    }

    @Bean
    public AmazonS3 amazonS3Client(){
        AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
                .withRegion("ap-northeast-2")
                .withCredentials(new AWSStaticCredentialsProvider(this.awsCredentials()))
                .build();
        return amazonS3;
    }

 - 기존에는 BasicAWSCredentials 를 사용하고 있었으며 Region 이 String 으로 사용되고 있어서 오타의 위험이 있다.

@Configuration
public class AmazonCloudWatchLogsConfig {

    @Value("${spring.cloud.aws.credentials.access-key}")
    private String accessKey;

    @Value("${spring.cloud.aws.credentials.secret-key}")
    private String secretKey;

    public AwsBasicCredentials awsBasicCredentials(){
        AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey);
        return awsCreds;
    }

    @Bean
    public CloudWatchLogsAsyncClient cloudWatchLogsAsyncClient(){

        CloudWatchLogsAsyncClient cloudWatchLogsAsyncClient = CloudWatchLogsAsyncClient.builder()
                .region(Region.AP_NORTHEAST_2)
                .credentialsProvider(StaticCredentialsProvider.create(this.awsBasicCredentials()))
                .build();
        return cloudWatchLogsAsyncClient;
    }

- Ver2에서는 AwsBasicCredentials로 변경이 되었으며, new 를 사용하지 않고 create를 사용한다.

- Region 설정이 enum type으로 변경되어 사용자의 실수를 막아준다.

- 또한 필요에 따라서 HttpClient를 Netty로 변경할 수 도 있다.

 

<AWS SDK를 사용하여 AWSCloudWatchLog를 활용하는 방법>

- SDK에서 사용할 IAM을 사전에 생성할 필요가 있다.

- Log를 조회하는 것은 크게 2단계로 나누어진다.

 a. startQuery

StartQueryResponse ret = cloudWatchLogsAsyncClient.startQuery(StartQueryRequest.builder()
                                                .endTime(endTime)
                                                .startTime(startTime)
                                                .limit(n)
                                                .logGroupName("")
                                                .queryString(query)
                                                .build()
                                  ).join();

 쿼리 수행을 마치고 나서 결과값으로 unique한 queryId를 돌려주는데 이 값을 사용하여 쿼리결과를 조회한다. 

 

b. getQueryResults

GetQueryResultsResponse queryReponse = cloudWatchLogsAsyncClient.getQueryResults(GetQueryResultsRequest.builder().queryId(queryId).build()).join();

 queryId를 통해서 GetQueryResultsResponse 를 얻어올 수 있다.

여기서 주의해야 할 것은 GetQueryResultsResponse 내에는 QueryStatus가 존재하며 Running, Scheduled등 일 경우 전체결과가 조회되지 않을 수 있다는 점이다.

처음 작업할때 ComletableFuture의  join을 호출하는 시점에 모든 결과가 있을것으로 예상하여 원하는 값이 나오지 않아서 고민했었다.

 

c. Async방식

 위에서 언급한 것 처럼 AsyncClient를 사용할 경우 CompletableFuture를 return하도록 되어있기 때문에 callback을 작성하여 불필요한 대기를 최소화할 수 것으로 예상했다.

 하지만 구조자체가 응답으로 queryId를 받아와야 하고, 또 쿼리 결과를 조회할때에도 Complete 상태에 이르러야 완벽한 결과값이 세팅되는 점을 감안한다면 해당 API를 사용하는 유저시나리오는 Async-blocking에 가깝다.

 

<결론>

- 해당 기능을 통해서 AWS Cloudwatch 에 수집되고 있는 로그그룹에 접근하여 일정시간 동안 많이 입력된 키워드, 로그인한 사용자, requestUri정보 등을 집계하여 API로 제공중이다.

- 쿼리 사용시 주의해야할 점은 Scan하는 Log의 양만큼 비용을 지불하기 때문에 startTime, endTime을 적절하게 조정해야 한다.

 

+ Recent posts