https://icthuman.tistory.com/entry/AWS-S3-Athena-%EC%82%AC%EC%9A%A9%EC%A4%91-JDBC-Driver%EB%8F%99%EC%8B%9C%EC%84%B1-%EB%AC%B8%EC%A0%9C

 

AWS S3-Athena 사용중 JDBC Driver동시성 문제

<개요> - S3보안버켓으로 파일을 올려서 Athena 작업중 - 1개의 CSV파일(2.65GB) 12개의 컬럼 - AthenaJDBC42.jar 사용 s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC-2.0.16.1000/docs/Simba+..

icthuman.tistory.com

<현상>

- AWS Athena사용시 quota제한 으로 인하여 다음과 같은 문제가 발생하였습니다.

- 동시에 많은 요청이 몰리는 것을 방지하기 위해서 다음과 같이 API호출수를 제한하도록 하였으며 object wait/notify를 활용하였습니다.

 

<로직>

- API 호출시 기본적으로 @Async호출

- API호출전에 현재 호출count를 확인하여 일정수치 이상일 경우 object.wait()로 대기

- 호출이 성공하면 count를 증가

- 비동기호출을 마치고 응답을 받으면 callback으로 호출count를 감소시키고 object.notify

CompletableFuture<Map<String, Integer>> completableFutureForRule;
    ...
    synchronized (this){
		waitBeforeCall();
		completableFutureForRule = apiCallService.call(request);
		resultCount.incrementAndGet();
	}
    
    completableFutureForRule.thenAccept(retMap -> {
    	synchronized (this){
    		notifyAfterCall();
    }
private void waitBeforeCall(){

	try {
  
		while (apiCallService.getCallCount() >= SIZE) {
			this.wait();
		}
	} catch (InterruptedException e) {
		log.error("Interruped");
	}
	apiCallService.incrementAndGet();
}

private void notifyAfterCall(){
	apiCallService.decrementAndGet();
	this.notifyAll();
}

- blocking queue를 구현할때 와 매우 유사한 로직임을 알수 있습니다.

 

<참고사항>

- caller service 와 callee service가 같은 비율로 증가한다면 이론상 문제는 없습니다. (e.g 10, 20, 30)

  그러나 실제는 두 서비스간에 연관관계가 없을 것으로 예상되며, 또한 이 로직은 한 jvm내 단일 service에서만 유용하기 때문에 ECS와 같이 scale out 이 되는 구조에서 전체 API 호출수를 제한하기 위해서는 다른 방법을 사용해야 합니다. 

e.g) A. cluster shared lock

      B. API gateway를 활용한 limit rate

      C. Redis를 활용한 global count개념

 

- wait후 notify의 경우 잠들어 있는 하나의 쓰레드만 깨우기 때문에 여러가지 예외 상황에 대응하거나 개별제어가 어려워서 일반적으로 notifyAll을 많이 사용합니다.

- while()문으로 체크해야 하는 이유는 여러 쓰레드가 다시 lock을 획득하기 위해 경쟁하기 때문입니다. 또한 Spurious wakeups 처럼 이유없이 쓰레드가 깨어난 경우에도 다시 wait로 진입하기 위해서 필요합니다.

- 이와 같이 일정시간 대기하는 로직을 작성할때 sleep을 사용하는 경우를 간혹 볼 수 있는데, sleep의 경우 wait와 다르게 lock을 반환하지 않으므로 주의해서 사용해야 합니다. (stackoverflow 참조)

https://stackoverflow.com/questions/1036754/difference-between-wait-and-sleep

- 로직이 복잡해질 경우 CountDownLatch 를 이용해서 구현하는 것을 권장합니다!

<기존>

 - 비동기 병렬처리시 SecurityContext의 범위가 ThreadLocal이기 때문에 authentication 정보가 null 인 경우 발생

 - DelegatingSecurityContextAsyncTaskExecutor 을 사용하여 SecurityContext를 전달하도록 처리함

https://icthuman.tistory.com/entry/HHH000346-Error-during-managed-flush-null?category=568674 

 

Spring JPA, SecurityContext사용시 ThreadLocal 사용범위 관련 #1

<현상> - HHH000346: Error during managed flush [null] - Spring JPA 사용중 Transcation 처리가 정상적으로 되지 않는 현상 <원인> - 사용자 ID를 @CreatedBy 를 사용해서 관리중 (@EnableJpaAuditing) -..

icthuman.tistory.com

 

<추가오류사항>

 - API 호출 시 문제가 없었으나 다음날 아침작업에서 동일한 오류 발생

 

<원인>

 - 해당 로직이 @Scheduled 처리 될 경우 인증을 통해서 들어온 Thread가 아니기 때문에 authentication 정보가 null 인 경우 발생

 

<해결방안>

 - 다음과 같이 로직을 변경하고 SecurityContext를 생성하는 부분을 추가하여 해결함

@Scheduled(cron = "0 30 9 * * *", zone = "Asia/Seoul")
    public void scheduledMethod(){
        Authentication originalAuthentication = SecurityContextHolder.getContext().getAuthentication();

        try {
            String token = loginService.createToken("admin");
            SecurityContextHolder.getContext().setAuthentication(jwtTokenProvider.getAuthentication(token));

            ...
            bizlogic();
            ...

        }catch(Throwable t){
            log.error("scheduledMethod error {}", t.getMessage());
        }finally{
            SecurityContextHolder.getContext().setAuthentication(originalAuthentication);
        }

    }

 - 해당 로직을 완료한 뒤 auth정보를 원복처리해주도록 한다. (중요!)

 - bizlogic내 비동기호출이 있으며 auth정보를 활용한다면 CompletableFuture.get()등을 활용하여 끝날때까지 대기해야합니다.

   그렇지 않을경우 비동거처리 특성상 해당로직이 종료되기 전에 originalAuthentication으로 원복처리되어 의도치않은 현상이 발생할 수 있습니다.

 - 예상외의 오류가 발생할 수 있기 때문에 finally 절로 처리하는 것을 추천합니다.

 

<참고사항>

 - 내부 @Scheduled 을 사용하지 않고 외부 스케쥴러를 사용하여 API Call하게 한다면 해당방법을 사용하지 않을 수 있습니다.

 - AuditorAware의 경우 해당 메소드에서 접근하는 Entity에 @CreatedBy컬럼여부에 관계없이 항상 동작한다.

 - 보다 안전한 처리를 위해서 AuditAware에 별도로직을 추가하는 것도 좋습니다.

   예를 들면 NPE가 발생하여 DB Transaction에 문제가 발생할 수 있는데. (특히 @Transactionl로 묶여있다면 모두 rollback됨)

   예외 userId로 처리한뒤 사후에 발견하도록 하면 서비스가 중단되지 않도록 할 수 있습니다.

 e.g)

public class LoginUserAuditorAware  implements AuditorAware<Integer> {

    @Override
    public Optional<Integer> getCurrentAuditor() {

        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
        log.debug("auth {}", authentication);
        LoginUserDetails loginUserDetails = (LoginUserDetails)authentication.getPrincipal();
        Integer userId = loginUserDetails.getUserId();
        log.debug("id{}", userId);

        if(userId == null){
            log.error("id is null {}");
            return Optional.of(0);
        }else{
            return Optional.of( userId );
        }
    }
}

 

 

 

<현상>

- HHH000346: Error during managed flush [null]

- Spring JPA 사용중 Transcation 처리가 정상적으로 되지 않는 현상

 

<원인>

- 사용자 ID를 @CreatedBy 를 사용해서 관리중  (@EnableJpaAuditing)

- AuditAware에서 SecurityContext를 꺼내보면 auth가 null로 되어있다.

- 그 이유는 Spring Security내에서 ThreadLocal로 정보를 관리하고 있으며 새로 생성된 Thread내에는 해당값이 전파되지 않기 때문.

- 현재 로직

1.  여러 API를 호출한 뒤 결과값을 바탕으로 DB 에 update 해야함

2. 해당 로직은 오랜시간이 필요하기 때문에 다음과 같이 구성함

 - 외부 API call하는 메소드 : B서비스 메소드#B + @Async

 - 메타정보에 따라서 외부 API Call 여부를 결정하는 로직 : A서비스 메소드#A

 - 서비스 메소드를 CompletableFutuer.runAsync로 호출

CompletableFuture.runAsync(() -> aService.aMethod());

 

public void aMethod(){
        
   for ( : list) {
            if ( ) {

                CompletableFuture<Map<String, Integer>> completableFutureForRule;
                synchronized (this){
                    waitBeforeCall();
                    completableFutureForRule = bService.bMethod(filterDto);
                    resultCount.incrementAndGet();
                }
                
                completableFutureForRule.thenAccept(retMap -> {
                    synchronized (this){
                        notifyAfterCall();
                    }
                    updateDB(...);
                });
    @Async("executorBean")
    public CompletableFuture<Map<String, Integer>> bMethod(...) {

        ...
        contents = webClient
                .post()
                ...


        return CompletableFuture.completedFuture(map);

 

-Spring SecurityContext의 경우 ThreadLocal로 처리하기 때문에 CompletableFuture 비동기 처리시 인증정보가 전달되지 않는다.

 

<해결방안>

https://www.baeldung.com/spring-security-async-principal-propagation
https://stackoverflow.com/questions/40345643/java-future-spring-authentication-is-null-into-auditoraware

 

Java Future - Spring Authentication is null into AuditorAware

This is my scenario: My app has Mongo Auditing enabled, with a custom AuditorAware which gets the current user from the SecurityContext. This works well with synchronous methods, and the current

stackoverflow.com

1. DelegatingSecurityContextAsyncTaskExecutor 적용

@Bean 
public DelegatingSecurityContextAsyncTaskExecutor taskExecutor(ThreadPoolTaskExecutor delegate) { 
    return new DelegatingSecurityContextAsyncTaskExecutor(delegate); 
}

2. DelegatingSecurityContextAsyncTaskExecutor 소스를 살펴보면 다음과 같다.

최상위 추상 클래스로 AbstractDelegatingSecurityContextSupport 가 구현되어 있으며 Runnable, Callable에 대하여 wrapping처리를 하게 되며,

abstract class AbstractDelegatingSecurityContextSupport {

	private final SecurityContext securityContext;

	/**
	 * Creates a new {@link AbstractDelegatingSecurityContextSupport} that uses the
	 * specified {@link SecurityContext}.
	 *
	 * @param securityContext the {@link SecurityContext} to use for each
	 * {@link DelegatingSecurityContextRunnable} and each
	 * {@link DelegatingSecurityContextCallable} or null to default to the current
	 * {@link SecurityContext}.
	 */
	AbstractDelegatingSecurityContextSupport(SecurityContext securityContext) {
		this.securityContext = securityContext;
	}

	protected final Runnable wrap(Runnable delegate) {
		return DelegatingSecurityContextRunnable.create(delegate, securityContext);
	}

	protected final <T> Callable<T> wrap(Callable<T> delegate) {
		return DelegatingSecurityContextCallable.create(delegate, securityContext);
	}
}

 

DelegatingSecurityContextRunnable 의 run 메소드를 수행할때 delegateSecurityContext를 세팅하여 수행하도록 되어있다.

수행이 완료된 뒤에는 originalSecurityContext로 원복시키고, 현재 Runnable에 세팅되어 있던 값은 null 처리 해야 ThreadPool사용시 문제가 발생하지 않는다.

public final class DelegatingSecurityContextRunnable implements Runnable {
...
	@Override
	public void run() {
		this.originalSecurityContext = SecurityContextHolder.getContext();

		try {
			SecurityContextHolder.setContext(delegateSecurityContext);
			delegate.run();
		}
		finally {
			SecurityContext emptyContext = SecurityContextHolder.createEmptyContext();
			if (emptyContext.equals(originalSecurityContext)) {
				SecurityContextHolder.clearContext();
			} else {
				SecurityContextHolder.setContext(originalSecurityContext);
			}
			this.originalSecurityContext = null;
		}
	}
    ...

 

 

<추가 문제상황>

- 해당 작업을 @Scheduled 를 통해서 수행할 경우에도 SecurityContext는 비어있게 된다. 인증정보를 가지고 수행된 것이 아니기 때문!

- 다음 글에서 추가로 확인해보도록 한다.

https://icthuman.tistory.com/entry/Spring-JPA-SecurityContext%EC%82%AC%EC%9A%A9%EC%8B%9C-ThreadLocal-%EC%82%AC%EC%9A%A9%EB%B2%94%EC%9C%84-%EA%B4%80%EB%A0%A8-2

<기존>

- Redis를 default config로 사용하면서 해당 Cache의 경우 다음과 같이 key를 정의하도록 keyGenerator를 구현하였다.

1번유형 ) String customKey = target.getClass().getSimpleName() + "_"+ method.getName();

              customKey += buildKey(params[0]);

2번유형 ) 별도의 Key지정 없음

 

- Redis내에 생성된 key값을 확인하면 다음과 같다.

1번유형 ) {methodName}::{customKey}

2번유형 ) {methodName}::SimpleKey [{argements}]

 

<Try>

- prefixKeysWith("Service Name")를 사용하여 공통 Prefix를 추가하는 것

- 의도한 결과는 기존의 Cache Name앞에 prefix를 추가하는 것이었다

- 기대값

1번유형 ) Service Name + methodName::{customKey}

2번유형 ) Service Name{methodName}::SimpleKey [{argements}]

 

- 그러나 실제 결과는 다음과 같았다.

1번유형 ) ServiceName{customKey}

2번유형 ) ServiceNameSimpleKey [{arguments}]

기존의 cache name이 사라지고 prefix부분만 남게되는것

 

<원인>

- 비슷한 이슈를 제기하신분들이 기존에 있습니다.

https://github.com/spring-projects/spring-data-redis/issues/1614

 

RedisCacheConfiguration.prefixKeysWith(…) does not consider cache name [DATAREDIS-1041] · Issue #1614 · spring-projects/spr

Mykyta Bezverkhyi opened DATAREDIS-1041 and commented Assuming I have spring-data-redis application with the following method: @Cacheable( cacheNames = "cacheName", key = "T(java.lan...

github.com

https://github.com/spring-projects/spring-data-redis/issues/1548

 

RedisCacheConfiguration prefixKeysWith override the cacheName [DATAREDIS-975] · Issue #1548 · spring-projects/spring-data-redi

Markfred Chen opened DATAREDIS-975 and commented With spring boot version 2.1.4, I found below code which doesn't serve as prefix, instead it overrides the cacheName passed in. Is it a bug? or ...

github.com

 

return this.computePrefixWith((cacheName) -> {
            return prefix;
        });

소스를 살펴보면 다음과 같습니다. cacheName을 날려버립니다...

 

<결론>

그냥 자체적으로 cache name을 생성하는 로직을 만들어서 사용하도록 하였습니다..

+ Recent posts