<필요사항>

- AWS ECS상 워크로드 중 특정서비스의 로그를 수집/가공 하여 통계 정보를 제공

- 기존에 사용중이던 AWS SDK S3 와 호환성 

 

<개요> 

- https://docs.aws.amazon.com/cloudwatch/index.html

 

https://docs.aws.amazon.com/cloudwatch/index.html

 

docs.aws.amazon.com

- AWS는 Cloudwatch 내에서 특정 로그그룹에 대하여 쿼리할 수 있는 툴 (Insight) 를 제공하고 있다.

- 해당 기능을 Web, SDK등 다양한 형태로 제공하고 있으며 Java SDK를 사용했다.

 

<내용>

- Java용 AWS SDK는 크게 버전 1.x , 2.x 가 있으며 특히 2.x 부터 비동기 NonBlocking 및 CompletableFuture를 제공하고 있기 때문에 신규 개발의 경우 가급적 2.x 를 권장하고 있다.

- 현재 서비스에서 S3용  1.x SDK를 이미 사용중이고 CloudWatchLogs는 2.x SDK를 사용할 예정이기 때문에 Mig 를 하던지 두 가지 버전을 병행하던지 선택한다.

- 기존 기능에 큰 문제가 없기 때문에 일단 병행하기로 결정

 

<환경설정 및 변경사항>

1. pom.xml

- groupId 변경

ver 1.x ver 2.x
com.amazonaws software.amazon.aws.sdk
<dependency>
	<groupId>com.amazonaws</groupId>
	<artifactId>aws-java-sdk-s3</artifactId>
	<version>1.11.762</version>
</dependency>

<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>cloudwatchlogs</artifactId>
</dependency>

<dependencyManagement>
  <dependencies>
      <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>aws-java-sdk-bom</artifactId>
          <version>1.12.1</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
      <dependency>
          <groupId>software.amazon.awssdk</groupId>
          <artifactId>bom</artifactId>
          <version>2.16.1</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
  </dependencies>
</dependencyManagement>

 

2. @Configuration

public BasicAWSCredentials awsCredentials(){
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
        return awsCreds;
    }

    @Bean
    public AmazonS3 amazonS3Client(){
        AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
                .withRegion("ap-northeast-2")
                .withCredentials(new AWSStaticCredentialsProvider(this.awsCredentials()))
                .build();
        return amazonS3;
    }

 - 기존에는 BasicAWSCredentials 를 사용하고 있었으며 Region 이 String 으로 사용되고 있어서 오타의 위험이 있다.

@Configuration
public class AmazonCloudWatchLogsConfig {

    @Value("${spring.cloud.aws.credentials.access-key}")
    private String accessKey;

    @Value("${spring.cloud.aws.credentials.secret-key}")
    private String secretKey;

    public AwsBasicCredentials awsBasicCredentials(){
        AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey);
        return awsCreds;
    }

    @Bean
    public CloudWatchLogsAsyncClient cloudWatchLogsAsyncClient(){

        CloudWatchLogsAsyncClient cloudWatchLogsAsyncClient = CloudWatchLogsAsyncClient.builder()
                .region(Region.AP_NORTHEAST_2)
                .credentialsProvider(StaticCredentialsProvider.create(this.awsBasicCredentials()))
                .build();
        return cloudWatchLogsAsyncClient;
    }

- Ver2에서는 AwsBasicCredentials로 변경이 되었으며, new 를 사용하지 않고 create를 사용한다.

- Region 설정이 enum type으로 변경되어 사용자의 실수를 막아준다.

- 또한 필요에 따라서 HttpClient를 Netty로 변경할 수 도 있다.

 

<AWS SDK를 사용하여 AWSCloudWatchLog를 활용하는 방법>

- SDK에서 사용할 IAM을 사전에 생성할 필요가 있다.

- Log를 조회하는 것은 크게 2단계로 나누어진다.

 a. startQuery

StartQueryResponse ret = cloudWatchLogsAsyncClient.startQuery(StartQueryRequest.builder()
                                                .endTime(endTime)
                                                .startTime(startTime)
                                                .limit(n)
                                                .logGroupName("")
                                                .queryString(query)
                                                .build()
                                  ).join();

 쿼리 수행을 마치고 나서 결과값으로 unique한 queryId를 돌려주는데 이 값을 사용하여 쿼리결과를 조회한다. 

 

b. getQueryResults

GetQueryResultsResponse queryReponse = cloudWatchLogsAsyncClient.getQueryResults(GetQueryResultsRequest.builder().queryId(queryId).build()).join();

 queryId를 통해서 GetQueryResultsResponse 를 얻어올 수 있다.

여기서 주의해야 할 것은 GetQueryResultsResponse 내에는 QueryStatus가 존재하며 Running, Scheduled등 일 경우 전체결과가 조회되지 않을 수 있다는 점이다.

처음 작업할때 ComletableFuture의  join을 호출하는 시점에 모든 결과가 있을것으로 예상하여 원하는 값이 나오지 않아서 고민했었다.

 

c. Async방식

 위에서 언급한 것 처럼 AsyncClient를 사용할 경우 CompletableFuture를 return하도록 되어있기 때문에 callback을 작성하여 불필요한 대기를 최소화할 수 것으로 예상했다.

 하지만 구조자체가 응답으로 queryId를 받아와야 하고, 또 쿼리 결과를 조회할때에도 Complete 상태에 이르러야 완벽한 결과값이 세팅되는 점을 감안한다면 해당 API를 사용하는 유저시나리오는 Async-blocking에 가깝다.

 

<결론>

- 해당 기능을 통해서 AWS Cloudwatch 에 수집되고 있는 로그그룹에 접근하여 일정시간 동안 많이 입력된 키워드, 로그인한 사용자, requestUri정보 등을 집계하여 API로 제공중이다.

- 쿼리 사용시 주의해야할 점은 Scan하는 Log의 양만큼 비용을 지불하기 때문에 startTime, endTime을 적절하게 조정해야 한다.

 

<현상>

- Async API Call 후 응답을 제대로 처리하지 못하는 현상이 있습니다.

- 그 여파로 내부적으로 AtomicInteger를 이용하여 호출Count를 처리하는 로직이 있는데 해당 로직이 수행되지 않아서 버그가 발생하고 있었습니다.

 

<원인>

contents = webClient
                .post()
                .uri(multiCountApiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .body(BodyInserters.fromObject(inputs))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(200000))
                .onErrorReturn(null)
                .flux()
                .toStream()
                .findFirst()
                .orElse(null);

        try {
            ...
        } catch (IOException e) {
            ...
        }
        callCount.decrementAndGet();
    
        return CompletableFuture.completedFuture(ret);

- 원래의 의도는 API Call 오류가 발생하였을 경우 null 로 처리하여 빈값을 가져가도록 하는 것이었습니다.

- 그러나 실제로 테스트해보면 try 이후 구문이 수행되지 않고 있습니다.

- API Call에서 오류가 발생했을 경우 null 처리를 잘못하여 flux() 이하 로직이 수행되지 않고 있습니다.

 

<수정사항>

contents = webClient
                .post()
                .uri(multiCountApiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .body(BodyInserters.fromObject(inputs))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(200000))
                .onErrorReturn("")
                .flux()
                .toStream()
                .findFirst()
                .orElse("");

        try {
            ...
        } catch (IOException e) {
            ...
        }
        callCount.decrementAndGet();
    
        return CompletableFuture.completedFuture(ret);

     

- 위와 같이 bodyToMono에서 우리가 사용하고자 했던 타입에 맞는 값(e.g String "" )으로 처리해주면 onErrorReturn 이후 로직이 정상적으로 수행되는 것을 확인할 수 있습니다.

 

<추가로 확인해야 할 사항>

- timeout 이 발생했을 때 특정 로직을 수행하도록 handler 가 가능하다면 decrement 로직을 그쪽으로 옮길 수 있을지 검토가 필요합니다.

- onErrorReturn 이후 로직이 Spring내부에서 어떻게 동작하는지 상세한 확인이 필요합니다.

 

<현상>

com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: 발생

<코드>

RangeCount rangeCount = new RangeCount(map);
filterDto.setRangeCount(rangeCount);

 

@AllArgsConstructor
@NoArgsConstructor
public class RangeCount {
    public static final int RANGE_SIZE = 10;
    private java.util.Map<Integer, Integer> rangeCount;

    public boolean validateRangeCount(){

        Iterator<Integer> iterator = rangeCount.keySet().iterator();

        while(iterator.hasNext()){
            if( rangeCount.get(iterator.next()) < 0 ){
                return false;
            }
        }

        return true;
    }
}

 

<원인>

- getter 가 없을 경우 해당 멤버변수에 접근할 수 없어서 없는 것으로 판단한다.

개인적으로 머리속의 지식들을 정리하면서 작성해보고 있습니다.
IT개발을 시작한지도 대학교시절부터 합치면 20년이 훌쩍 넘었습니다.

가술트렌드라는 것이 새로운 것이 빠르게 나오지만
개선과 보완의 연속이기 때문에 알아야 하는 지식은 계속 늘어나네요.
서로간의 연결고리도 중요합니다.

IT는 결국 비지니스를 만들어내기 위해서 다양한 영역을 결합시켜야 하고
각 기술들은 만들어진 목적과 지향하는 바가 분명하기 때문이죠.
(물론 아닌 경우도 있습니다; 만들어진 목적보다 더 적합한 경우가 발견되는 경우죠)

단기간에 욕심내면 지칠수 있기때문에
천천히 포기하지 않고 업무에 필요한 부분부터 차근차근 학습하시면 될 듯합니다.

요즘 개발자에 관심이 많아지는 것 같아서 주니어분들에게 도움이 될수 있는 방법이 있을까 생각해보다가 그려봤습니다.
초기버전이라 빠진부분이 있으면 또 보완해야겠습니다.



'개인연결사이트' 카테고리의 다른 글

네이버블로그, 링크드인  (0) 2017.09.27

<개요>

- S3보안버켓으로 파일을 올려서 Athena 작업중

- 1개의 CSV파일(2.65GB) 12개의 컬럼

- AthenaJDBC42.jar 사용
s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC-2.0.16.1000/docs/Simba+Athena+JDBC+Driver+Install+and+Configuration+Guide.pdf

- Query수행메소드 AOP로깅작업

- Spring boot -> Hikari max pool size = 20

 

<현상>

- Application 에 동시에 쿼리작업 요청시 아래와 같이 수행시간이 점차 늘어나는 현상

 (마치 순차적으로 수행되는 것처럼 보이는..)

쿼리수행 메소드의 시간이 점점 증가되고 있음

 mysql이나 다른 database에서는 이러한 현상이 없기 때문에 Application의 문제는 아닐 것으로 추정

-Athena의 History메뉴에서 쿼리수행시간을 조회했을 때 개별쿼리의 수행시간에는 이상한 부분이 보이지 않음

개별 쿼리의 수행시간에는 큰 문제가 없어보임

 

<분석중>

-Athena의 하나의 테이블에서 동시에 수행할 수 있는 쿼리 수에 제한이 있다.

-Athena JDBC Driver에서 동시수행에 제한을 둔다.

-SimbaAthenaJDBC-2.0.16.1000 소스 분석중

<확인결과>

 - Athena의 경우 Service 용도가 아니며 quota제한이 있어서 동시에 많은 요청이 올경우 내부적으로 순차처리를 하게 된다.

 

<해결방안>

1. wait/notify 로직구성

- 데이터분석API 호출시 기본적으로 @Async호출한다.

- API호출전에 현재 호출count를 확인하여 일정수치 이상일 경우 object.wait()로 대기하도록 한다.

- 호출이 성공하면 count를 증가시킨다.

- 비동기호출을 마치고 응답을 받으면 callback으로 호출count를 감소시키고 object.notify하도록 한다.

https://icthuman.tistory.com/entry/object-wait-notify-%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-API%ED%98%B8%EC%B6%9C-%EC%88%98-%EC%A0%9C%ED%95%9C

 

2. Athena로 많은 수의 요청이 집중되지 않도록 하여 timeout 및 오류현상을 해결할 수 있었으나 여전히 병목구간으로 남아있기 때문에 다음과 같은 추가조치를 진행중이다.

 - 분산처리 가능한 대안 검토

 - Redis를 이용한 캐시적용

 

 

 

<문제> 

leetcode.com/problems/set-matrix-zeroes/

 

Set Matrix Zeroes - LeetCode

Level up your coding skills and quickly land a job. This is the best place to expand your knowledge and get prepared for your next interview.

leetcode.com

Given an m x n matrix. If an element is 0, set its entire row and column to 0. Do it in-place.

Follow up:

  • A straight forward solution using O(mn) space is probably a bad idea.
  • A simple improvement uses O(m + n) space, but still not the best solution.
  • Could you devise a constant space solution?

 

Example 1:

Input: matrix = [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]]

Example 2:

Input: matrix = [[0,1,2,0],[3,4,5,2],[1,3,1,5]] Output: [[0,0,0,0],[0,4,5,0],[0,3,1,0]]

 

Constraints:

  • m == matrix.length
  • n == matrix[0].length
  • 1 <= m, n <= 200
  • -231 <= matrix[i][j] <= 231 - 1

<풀이>

- 가장 쉽게 접근하는 방법은 원본과 똑같은 공간을 하나 더 만들어서 순차적으로 탐색하는 방법이다. 이 경우 공간복잡도가 O(mn)이며 문제에서는 권장하지 않는 방법이다.

- 그 다음의 접근하는 방법은 0이 있는 위치를 기억하는 것이다. row 와 column 을 각각 체크할 수 있는 공간을 만든다면 공간복잡도는 O(m) + O(n)이 된다. 간단히 살펴보면 다음과 같다.

Row, Column에 대해서 0인 위치를 기억

// o(m+n)
    public void setZeroes(int[][] matrix) {
        int n = matrix.length;
        int m = matrix[0].length;

        int[] rowCheck = new int[n];
        int[] columnCheck = new int[m];

        for(int i=0; i<n; i++){
            for(int j=0; j< m ; j++){
                if(matrix[i][j] == 0) {rowCheck[i]=1; columnCheck[j]=1;}
            }
        }

        for(int i=0; i<n; i++){
            if(rowCheck[i]==1){
                for(int j=0;j<m;j++){
                    matrix[i][j]=0;
                }
            }
        }
        for(int j=0; j<m ; j++){
            if(columnCheck[j]==1){
                for( int i=0; i<n; i++){
                    matrix[i][j]=0;
                }
            }
        }
    }

- 최초 입력받은 matrix에서 row와 column에 '0' 인 위치를 기억해두었다가 해당 위치의 값을 변경하는 방식이다.

- 그렇다면 문제에서 제안하는 Best Solution은 무엇일까? 제공되는 힌트를 보면

# Hint 1 (추가 메모리 공간이 없도록 manipulate)

If any cell of the matrix has a zero we can record its row and column number using additional memory. But if you don't want to use extra memory then you can manipulate the array instead. i.e. simulating exactly what the question says.

# Hint 2 (불일치가 일어나지 않도록 market관리)

Setting cell values to zero on the fly while iterating might lead to discrepancies. What if you use some other integer value as your marker? There is still a better approach for this problem with 0(1) space.

# Hint 3 (row/columns를 분리해서 생각하기 보다는 하나로 가져갈수 있는방법)

We could have used 2 sets to keep a record of rows/columns which need to be set to zero. But for an O(1) space solution, you can use one of the rows and and one of the columns to keep track of this information.

# Hint 4 (! 모든 행과열의 첫번째 셀을 flag로 사용할 수 있다.)

We can use the first cell of every row and column as a flag. This flag would determine whether a row or column has been set to zero.

이와 같은 힌트를 동해 다음과 같이 접근해보자.

첫번째 행과 첫번째 열을 flag marker로 사용한다.

1. 첫번째 행과 첫번째 열을 우리가 위에서 마련했던 O(m+n) 의 공간으로 사용한다.

2. 단 이 때 기존에 세팅되어 있던 값을 보존해야 한다. 이후 matrix의 값을 "0"을 치환하는 과정에서 첫번째 행과 열의 값을 참조해야 하는데 값이 구분되지 않아서 불일치가 일어날 수 있다. 

3. 따라서 최초 첫번째 행과 열에 있던 값을 기억하여 나중에 일괄처리 할 수 있도록 하며, 2번의 변환과정의 범위에서 첫번째 행과 열은 제외해야 한다.

// constant
    public void setZeroes(int[][] matrix) {
        int m = matrix.length;
        int n = matrix[0].length;

        boolean isFirstColumnHasZero=false;
        boolean isFirstRowHasZero=false;

        //1. first column
        for(int i=0; i<m; i++){
            if(matrix[i][0] == 0){
                isFirstColumnHasZero=true;
                break;
            }
        }
        //2. first row
        for(int j=0; j<n; j++){
            if(matrix[0][j] == 0){
                isFirstRowHasZero=true;
                break;
            }
        }
        //3. check 0
        for(int i=1; i<m ; i++){
            for(int j=1; j<n ; j++){
                if(matrix[i][j]==0){
                    matrix[i][0]=0;
                    matrix[0][j]=0;
                }
            }
        }
        //4. Process 0
        for(int i=1; i<m ; i++){
            for(int j=1; j<n ; j++){
                if(matrix[i][0]==0 || matrix[0][j]==0){
                    matrix[i][j]=0;
                }
            }
        }
        //5. first row
        if(isFirstRowHasZero){
            for(int j=0;j<n;j++){
                matrix[0][j]=0;
            }
        }
        //6. first column
        if(isFirstColumnHasZero){
            for(int i=0; i<m; i++){
                matrix[i][0]=0;
            }
        }
    }

 

 

<참조>

leetcode.com/problems/set-matrix-zeroes/

 

Set Matrix Zeroes - LeetCode

Level up your coding skills and quickly land a job. This is the best place to expand your knowledge and get prepared for your next interview.

leetcode.com

 

'Algorithm' 카테고리의 다른 글

LeetCode - ReorganizeString  (0) 2020.09.07

+ Recent posts