<개요>

- WebSecurityConfigurerAdapter 사용

- Annotation기반의 설정

 

<내용>

 - 기본적인 세팅 방법 및 제공메소드를 알아본다.

 - 특정 IP 의 호출만 가능하도록 해본다. (white list)

 - hasIpAddress 활용을 위한 API, SpEL을 사용해본다.

 - IP subnet , IPv4/v6 차이점 확인

 

1.  WebSecurityConfigurerAdapter

 - 우리가 일반적으로 가장 많이 사용하는 방법이다.

@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {

	@Override
    public void configure(HttpSecurity http) throws Exception
    {
        http
                .httpBasic().disable()
                .csrf().disable()
                .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
                .and()

                .authorizeRequests()

                // swagger-ui 관련 접속 url - permitAll 추가
                .antMatchers("/v2/api-docs/**").permitAll()
                .antMatchers("/swagger.json").permitAll()
                .antMatchers("/swagger-ui.html").permitAll()
                .antMatchers("/swagger-resources/**").permitAll()
                .antMatchers("/webjars/**").permitAll()

- Session은 STATELESS를 기본으로 하며 각 url에 permitAll() 권한을 부여하는 것으로 시작하였다. 

- 상세한 정책은 antMatchers() 를 사용하며 permitAll(), anonymous(), denyAll(), authenticated(), hasAuthority()등 다양한 방법을 통해서 정의가 가능하다.

  • hasRole() or hasAnyRole()
    특정 역할을 가지는 사용자
  • hasAuthority() or hasAnyAuthority()
    특정 권한을 가지는 사용자
  • hasIpAddress()
    특정 아이피 주소를 가지는 사용자
  • permitAll() or denyAll()
    접근을 전부 허용하거나 제한
  • rememberMe()
    리멤버 기능을 통해 로그인한 사용자
  • anonymous()
    인증되지 않은 사용자
  • authenticated()
    인증된 사용자

- 내부 구현방식을 알아보기 위해서 Spring 내부소스를 보면 다음과 같다.

public final class ExpressionUrlAuthorizationConfigurer<H extends HttpSecurityBuilder<H>>
		extends
		AbstractInterceptUrlConfigurer<ExpressionUrlAuthorizationConfigurer<H>, H> {
	static final String permitAll = "permitAll";
	private static final String denyAll = "denyAll";
	private static final String anonymous = "anonymous";
	private static final String authenticated = "authenticated";
	private static final String fullyAuthenticated = "fullyAuthenticated";
	private static final String rememberMe = "rememberMe";

public ExpressionInterceptUrlRegistry permitAll() {
			return access(permitAll);
		}

		public ExpressionInterceptUrlRegistry anonymous() {
			return access(anonymous);
		}

		public ExpressionInterceptUrlRegistry rememberMe() {
			return access(rememberMe);
		}

		public ExpressionInterceptUrlRegistry denyAll() {
			return access(denyAll);
		}

 - 내부적으로 SpEL을 활용하고 access메소드를 통해서 처리하는 것을 볼 수 있다.

 

2. Annotation을 통한 관리

 - @PreAuthorize, @PostAuthorize, @Secured 를 사용하여 Controller레벨에서 메소드별 관리도 가능하다.

@RestController
public class FilterController {

	@PreAuthorize("hasRole('ROLE_ADMIN')")
	@RequestMapping(value = "/api/a", method = RequestMethod.POST)
    public @ResponseBody
    List<String> testMethod()throws Exception {
		...
    }

 - SpEL을 사용하는 등 내부적인 처리는 1번방식과 동일하다.

 - 다음과 같은 전역설정을 필요로 하며 되도록 @Configuration 과 같은 성격들은 한곳에 모아두는 것을 추천한다.

  @EnableGlobalMethodSecurity(securedEnabled = true, prePostEnabled = true

 

3. hasIpAddress 예제

- SecurityConfig에 다음과 같이 추가를 하게되면 로컬에서만 /login 으로 접근하는 것이 가능하다.

.authorizeRequests()
.antMatchers("/login").hasIpAddress("127.0.0.1")

- 이 때 IP, Subnet등의 다양한 방법이 가능하다.

 e.g) 192.168.1.0/24, 172.16.0.0/16

- 여러 개의 ip를 list로 처리하고 싶다면? hasIpAddress()로는 처리가 어렵다. 힌트는 1번에서 살펴보았던 access()메소드이다.

  hasIpAddress()도 내부적으로 access()를 통해서 처리하고 있다.

    public ExpressionInterceptUrlRegistry hasIpAddress(String ipaddressExpression) {
			return access(ExpressionUrlAuthorizationConfigurer
					.hasIpAddress(ipaddressExpression));
    }

- 즉, hasIpAdress("ip #1") or hasIpAdress("ip #2") 의 형태로 SpEL을 작성하여 access()메소드를 호출하면 될 것 같다.

 

4. 구현소스

@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {


	@Override
    public void configure(HttpSecurity http) throws Exception
    {
        http
                .httpBasic().disable()
                .csrf().disable()
                .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
                .and()

                .authorizeRequests()
                .antMatchers("/login").access( buildSpEL(ssoProperties.getAllowedIps()) )
                
                ...
                
                
    private String buildSpEL(List<String> ipLists){

        if(ipLists.isEmpty()){
            return "hasIpAddress('0.0.0.0/0')";
        }

        return ipLists.stream()
                .map(s -> String.format("hasIpAddress('%s')",s) )
                .collect(Collectors.joining(" or "));

    }

- 특정 ip 목록이 추가될 경우 SpEL처리를 하며, 빈 값이 넘어올 경우 전체 ip에 대해서 허용해주도록 한다.

@ConfigurationProperties(prefix = "1.2.3")
@Getter
@Setter
@ToString
public class Properties {
	private SsoProperties ssoProperties;
}


public class SsoProperties {
    private List<String> allowedIps = new ArrayList<>();
}

- ip목록의 경우 수시로 바뀔수 있기 때문에 별도 properties를 활용하도록 한다.

 

<수행결과>

- FilterSecurityInterceptor에 의해서 처리되었으며 처리 결과는 다음과 같다.

localhost주소 127.0.0.1

<주의사항>

- IPv6를 사용하고 있을 경우 ip형식이 다음과 같이 처리가 되기 때문에 허용주소를 "127.0.0.1"로 한 경우 403오류가 발생한다.

IPv6

- IPv6로 주소를 사용하거나 JVM 기동시 -Djava.net.preferIPv4Stack=true 옵션을 사용하면 된다.

 

<참조>

https://namu.wiki/w/%EC%84%9C%EB%B8%8C%EB%84%B7%20%EB%A7%88%EC%8A%A4%ED%81%AC

https://ko.wikipedia.org/wiki/IPv6

 

IPv6 - 위키백과, 우리 모두의 백과사전

IPv6(Internet Protocol version 6)는 인터넷 프로토콜 스택 중 네트워크 계층의 프로토콜로서 버전 6 인터넷 프로토콜(version 6 Internet Protocol)로 제정된 차세대 인터넷 프로토콜을 말한다. 인터넷(Internet)은

ko.wikipedia.org

 

<개요>

- Spring Security 의 경우 Filter Chain 의 형태로 기능을 제공하고 있으며 필요에 따라서 추가/수정/삭제가 가능하다.

 

<내용>

1. Spring Security - Filter Chain

-  Spring 에서 모든 호출은 DispatcherServlet을 통과하게 되고 이후에 각 요청을 담당하는 Controller 로 분배된다.

-  이 때 각 Request 에 대해서 공통적으로 처리해야할 필요가 있을 때 DispatcherServlet 이전에 단계가 필요하며 이것이 Filter이다.

Filter 와 Interceptor의 차이

- Spring Security는 FilterChainProxy를 통해서 상세로직을 구현하고 있다.

Spring Security Filters

 

- 현재 Application에서 사용중인 Filter Chain은 Debug를 통해서 쉽게 확인할 수 있다.

11개의 Filter를 사용중이다.

- FilterChain이름이 의미하듯 Filter는 순서가 매우 중요하다.

 

2. 우리가 가장 많이 사용하는 UserNamePassword 구조에 대해서 좀 더 살펴보겠다.

(적당한 그림을 찾지 못해서 손으로 그려보았다.)

AuthenticationFilter

- Request가 들어왔을때 Filter를 거치게 되고, 적절한 AuthenticationToken이 존재하지 않는다면 AuthenticationProviders를 거쳐서 생성하게 되며 UserDeatilsService를 입맛에 맞게 구현하여 사용자 정보를 가져오는 부분을 구현할 수 있다.

- Spring Securty에서는 사람들이 가장 많이 사용하는 DB인증을 다음과 같이 미리 구현해 두었다. (오른쪽 파란색 박스)

 

3.  인증방식 변경 ( JWT Token)

최근에는 MSA구조의 효율성을 높이기 위해서 JWT Token 방식을 많이 사용하고 있다.  Request에 대한 인증을 별도 서버를 거치지 않고 검증가능하고 기본로직에 필요한 내용을 담을 수 있어서 편리하다.

현재 서비스에 적용중인 대략적인 구조이다.

JWT Token Filter

- JWT AuthTokenFilter에서 해당 처리를 마친 후 나머지 FilterChain을 수행하는 구조이다.

- Token을 통해서 인증 및 인가를 위한 정보를 생성하여 SecurityContextHolder를 통해서 세팅한다.

- JWT Filter의 내용을 간단히 살펴보면 아래와 같다. (보안상 TokenProvider 로직은 개별구현하는 것을 권장한다.)

public class JwtTokenFilter extends GenericFilterBean {
    private JwtTokenProvider jwtTokenProvider;
    public JwtTokenFilter(JwtTokenProvider jwtTokenProvider) {
        this.jwtTokenProvider = jwtTokenProvider;
    }
    @Override
    public void doFilter(ServletRequest req, ServletResponse res, FilterChain filterChain)
            throws IOException, ServletException {
        String token = jwtTokenProvider.resolveToken((HttpServletRequest) req);
        try {
            if (null == token) {
                filterChain.doFilter(req, res);
            } else {
                if(jwtTokenProvider.validateToken(token)) {
                    Authentication auth = token != null ? jwtTokenProvider.getAuthentication(token) : null;
                    SecurityContextHolder.getContext().setAuthentication(auth);
                    filterChain.doFilter(req, res);
                } else {
                    HttpServletResponse httpResponse = (HttpServletResponse) res;
                    httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Invalid jwt token");
                }
            }
        } catch(IllegalArgumentException | JwtException e) {
            HttpServletResponse httpResponse = (HttpServletResponse) res;
            httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Invalid jwt token");
        }
    }

 

- JwtTokenFilter를 작성 후 해당 Filter를 적절한 위치에 configure하는 것이 중요하다.

public class JwtConfigurer extends SecurityConfigurerAdapter<DefaultSecurityFilterChain, HttpSecurity> {
    private JwtTokenProvider jwtTokenProvider;
    public JwtConfigurer(JwtTokenProvider jwtTokenProvider) {
        this.jwtTokenProvider = jwtTokenProvider;
    }
    @Override
    public void configure(HttpSecurity http) throws Exception {
        JwtTokenFilter customFilter = new JwtTokenFilter(jwtTokenProvider);
        http.addFilterBefore(customFilter, UsernamePasswordAuthenticationFilter.class);
	}
}

 

<정리>

- Spring Security는 분량이 많고 내용이 복잡하기 때문에 기본개념을 이해하고 요건에 따라서 적절히 추가/삭제/수정하는 것이 필요하다.

전체적인 흐름을 알지 못하고 사용하는 경우 의도치 않게 동작할 수 있기 때문에 아래 내용은 반드시 기억하는 것이 좋다.

 1. Filter 는 DispatcherServlet 앞에 존재한다.

 2. 여러 개의 Filter를 동시에 적용할 수 있으며 순서에 주의해야 한다. 

 3. Custom Filter를 개발하였다면 로직 처리 후 FilterChain.doFilter 를 호출하여 이후단계를 수행해야 한다. (AOP @Around와 동일)

 4.Filter / Interceptor 를 필요이상으로 넣을 경우 성능에 영향을 줄 수 있다.

 

<참조>

https://devlog-wjdrbs96.tistory.com/352

https://www.fatalerrors.org/a/in-depth-understanding-of-filterchainproxy.html

<개요>

- Spring 이후 버전에서는 RestTemplate가 deprecated될 예정이며 WebClient 사용을 권장하고 있다.

- 현재 구성 중인 시스템에는 동기/비동기 API가 혼재되어 있으면서, 다양한 Application / DB를 사용중이기 때문에 SpringMVC 구조를 유지하면서 WebClient사용을 통해서 외부 API호출부분을 개선해보고 싶었다.

 

<내용>

- RestTemplate사용법과 WebClient사용법을 비교한다.

- Spring MVC 에서 WebClient 사용방법

- WebClient 를 사용할 때 이해할 Reactive Programming개념

 

RestTemplate 과 WebClient

1. RestTemplate

일반적으로 RestTemplate을 사용하여 HTTP 호출을 할 경우 다음과 같이 작성한다.

 try {
      restTemplate.exchange(URL,
                            HttpMethod.POST,
                            new HttpEntity<>(message),
                            new ParameterizedTypeReference<ResponseEntity>() {}
      );
    } catch (HttpStatusCodeException ce) {
      log.error(String.format("Exception occurred : %s, %s, %s"),
                ce.getStatusCode(),
                ce.getResponseBodyAsString());
      throw new RuntimeException(ce.getStatusCode(), ce.getResponseBodyAsString());
    }

이 때 RestTemplate을 @Bean으로 구성할때 공통으로 해당하는 속성들을 다음과 같은 형태로 정의한다.

@Configuration
public class RestConfig {
  @Bean
  public RestTemplate restTemplate() {
    return new RestTemplateBuilder()
      .messageConverters(getHttpMessageConverters())
      .setConnectTimeout(Duration.ofSeconds(10))
      .setReadTimeout(Duration.ofSeconds(10))
      .errorHandler(new HttpResponseErrorHandler())
      .requestFactory(() -> getClientHttpRequestFactory())
      .additionalInterceptors(bearerAuthInterceptor())
      .build();
  }

 

2. WebClient

WebFlux에서 제공하는 WebClient를  사용하여 HTTP호출을 구성하면 다음과 같다.

contents = webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn("-1")
                .flux()
                .toStream()
                .findFirst()
                .orElse("-1");

WebClient에서는 크게 retrieve() 와 exchange()를 제공하고 있으며 exchange사용시 Response처리를 제대로 하지 않을 경우 메모리 누수등이 발생할 수 있기 때문에 되도록 retrieve()를 권장하고 있다.

 

이때 WebClient에 필요한 속성들 역시 유사하게 다음과 같이 구성할 수 있다.

HttpClient httpClient = HttpClient.create()
                .tcpConfiguration(
                        client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientProperties.getConnectionTimeout()) //miliseconds
                                .doOnConnected(
                                        conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientProperties.getReadTimeout(), TimeUnit.MILLISECONDS))
                                                .addHandlerLast(new WriteTimeoutHandler(webClientProperties.getWriteTimeout(), TimeUnit.MILLISECONDS))
                                )
                );

        return WebClient.builder()
                .baseUrl(webClientProperties.getBaseUri())
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();

본인의 경우 다양한 Host의 API에 접근해야 하고 각각 응답시간이 다르기 때문에 타임아웃등의 설정을 Host별 Properties로 관리/생성하여 사용하고 있다.

 

3. RestTemplate vs WebClient

a. 가장 큰 차이는 동기/비동기의 차이로  볼 수 있다.

RestTemplate을 사용할 경우 기존 MVC 아키텍처를 활용하여 내부적으로 하나의 요청이 쓰레드에 종속되고, 응답대기에 대한 시간이 소요되어 전체적인 자원소모/처리시간의 지연을 가져오게 된다.

WebClient의 경우 비동기처리로 요청을 받는 쓰레드와 일하는 쓰레드를 분리하고 자원접근,응답대기에 대한 부분을 최소화 하여 효율적인 처리가 가능하다.

최근 MSA구조에서는 연계되는 API호출이 많으면서 개별 호출시간이 짧아지는 특성이 있다. 비동기처리로 효율성을 끌어올리는 경우가 많다. 물론 구조적인 복잡성이 증가하게 된다.

 

b. Spring WebFlux는 비동기처리를 기반으로 하고 있으며 그에 대한 Reactive Programming구현제로 Spring Reactor를 사용하고 있다. 따라서 WebClient호출 후 응답은 Mono/Flux 를 통해서 처리해야만 한다.

비동기 방식은 자원접근, 대기시간에 장점이 있는 대신에 오류가 발생했을때 디버깅을 하거나 추적하기에 어려운 부분이 있다.

 

Spring MVC + WebClient 사용

1. Spring WebFlux

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello World");
    }
    
    @GetMapping("/api-call")
    Mono<String> apiCall() {
        return apiCallService.callApi();
    }

}

Spring WebFlux의 경우 Controller에서 응답처리로 Mono/Flux를 사용한다.

WebClient를 이용한 다음과 같은 처리도 가능하다.

    public Mono<String> callApi() {

        return webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(apiTimeout));

    }

 

2. Spring MVC + WebClient

그러나 기존 Spring MVC구조는 유지하면서 API호출부분만 비동기 WebClient를 사용하고 싶다면 약간의 변경이 필요하다.

@RequestMapping(value="URL", method= RequestMethod.GET)
    public @ResponseBody
    void getFiltersTotalCount(HttpServletResponse response) {

        try{

            if( apiCallService.callApi().get(10, TimeUnit.SECONDS) ){

                response.setStatus(HttpServletResponse.SC_OK);

            }else{
                response.setStatus(HttpServletResponse.SC_CONFLICT);
            }
        }catch(Exception e){
            log.error("API Exception {}", e.getMessage());
            response.setStatus(HttpServletResponse.SC_CONFLICT);
        }

    }
@Async("executorBean")
    public CompletableFuture<Boolean> callApi() {

        boolean contents=false;

        contents = webClient
                .get()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Authorization","Bearer " + token)
                .retrieve()
                .bodyToMono(Boolean.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .onErrorReturn(false)
                .flux()
                .toStream()
                .findFirst()
                .orElse(false);

        return CompletableFuture.completedFuture(contents);

    }

- Controller Layer의 응답처리를 위해서 get()을 통해 대기하는 부분때문에 비동기처리 효과는 감소하지만 기존 아키텍처와 호환성을 유지하면서 다수의 Api Call을 할 경우 어느정도의 효율성을 기대할 수 있다.

 

3. 주의점

https://icthuman.tistory.com/entry/HHH000346-Error-during-managed-flush-null?category=568674 

 

Spring JPA, SecurityContext사용시 ThreadLocal 사용범위 관련 #1

<현상> - HHH000346: Error during managed flush [null] - Spring JPA 사용중 Transcation 처리가 정상적으로 되지 않는 현상 <원인> - 사용자 ID를 @CreatedBy 를 사용해서 관리중 (@EnableJpaAuditing) -..

icthuman.tistory.com

- 지난번 글에서 정리했던 것처럼 Spring Component 중에서는 ThreadLocal을 활용하고 있는부분들이 존재한다. 해당 부분은 @Async를 통과하면서 값이 사라지기 때문에 적절한 propagation 방법을 찾아야 한다.

- Spring MVC구조의 특성상 @Async를 수행하는 Thread Pool 이 모두 수행할 경우 오류가 발생할 수있음을 주의해야 한다.

- 동기처리 요청/응답 방식이 아니기 때문에 디버깅이나 테스트코드 작성이 어려운 부분도 있으며

- WebClient 사용시 Reactive Programming의 개념에 대해서 숙지해야 하는 부분이 있다. 특히 이 부분은 기본개념이면서 매우 중요한 부분이기 때문에 강조하고 싶었다.

 

다음 예제를 살펴보면 기존에 Response처리하던 부분을 void 타입을 활용하여 fire-forget 으로 변경하였다. 크게 문제가 없어보이지만 실제로 수행해보면 API 호출이 되지 않는다.

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout));
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이유가 무엇일까?

해당 소스는 Mono를 생성한 것이지 실행한 것이 아니다. 조금 더 용어를 바꿔보자면 데이터가 아직 흘러간 것이 아니다. Reactive Programming이나 Spark를 사용해 본 사람은 이해할 수 있는 부분이다.

 

Reactive Programming

Reactive Programming

1. Publisher

끊임없이 data를 생성한다. 위에서 살펴보았던 Mono / Flux가 Spring Reactor가 구현한 Publisher의 역할로 이해할 수 있다.

 

2. Subscriber

data를 요청해서 받아간다. reactive programming의 핵심 중 하나는 back pressure 이다. data를 소비하는 쪽에서 충분히 여유가 있을때 요청하여 받아가는 형태로 이해할 수 있다.

 

3.Subscription

publisher 와 subscriber사이에 위치하여 이벤트를 통하여 data를 전달해 주는 역할로 이해할 수 있다.

Example소스들을 보면 이해가 더 빠를 듯 하다.

Flux<Integer> seq = Flux.just(1, 2, 3);

        seq.subscribe(new Subscriber<Integer>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("Subscriber.onSubscribe");
                this.subscription = s;
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Subscriber.onNext: " + i);
                this.subscription.request(1); //데이터 요청
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Subscriber.onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
Flux<String> flux = Flux.generate ( .... );
flux.subscribe(new BaseSubscriber<String> () {
            private int receiveCount = 0;
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(5);
            }
            @Override
            protected void hookOnNext(String value) {
                receiveCount++;
                if (receiveCount % 5 == 0) {
                    request(5);
                }
            }
            @Override
            protected void hookOnComplete() {
            }
        });

생각보다 표준을 지키면서 Publisher, Subscriber, Subscription을 구현하는 것이 쉽지 않기 때문에 다양한 구현체 중 선택하여 사용하는것을 권장하며 Spring Reactor가 그중에 하나인것으로 이해하면 된다.

 

Spring MVC + WebClient

결국 Mono / Flux 를 생성하는 것 자체로는 아직 데이터가 흐르지 않은 것이다. 

다시 돌아와서 아까 WebClient소스가 왜 동작하지 않는지 살펴보면 원인을 알 수 있다. subscribe 가 없기 때문이다!

webClient
                .post()
                .uri(apiPath)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(message))
                .retrieve()
                .bodyToMono(Void.class)
                .timeout(Duration.ofMillis(apiTimeout))
                .subscribe();
                
//                .bodyToMono(String.class)
//                .timeout(Duration.ofMillis(apiTimeout))
//                .flux()
//                .toStream();
//                .findFirst();
//                .orElse("");

이와 같이 subscribe()를 호출하도록 변경하면 정상적으로 WebClient를 통한 호출이 이루어지는 것을 확인할 수 있다.

 

그렇다면 기존에 Flux로 변경하여 처리하던 부분이 어떻게 수행이 될 수 있는 것일까?  조금 생각해보면 유추가 가능하다.

Flux는 실제 데이터의 흐름이 아니다. toStream을 통해서 Stream객체를 만들어 내려면 데이터를 흘려보내야만 한다.

Flux.class 와 BlockingIterable.class 에서 다음부분을 발견할 수 있다.

public Stream<T> stream() {
        BlockingIterable.SubscriberIterator<T> it = this.createIterator();
        this.source.subscribe(it);
        Spliterator<T> sp = Spliterators.spliteratorUnknownSize(it, 0);
        return (Stream)StreamSupport.stream(sp, false).onClose(it);
    }

 

 

 

<개선효과>

1. 대략 500~600번정도 외부 API를 호출해야하는 요건이 있었다.

RestTemplate을 활용하여 순차적으로 처리할 경우 약18분정도 걸렸으며 @Async + WebClient 로 변경하였을 때는 약 3분정도 걸렸다.

 

2. 기대한 것보다 효과가 나오지 않았는데 그 이유는 다음과 같다.

 a. API 호출 후 결과값을 참조하여 RDB에 update하는 로직이 존재함

 b. 호출되는 API중에도 상당수가 동기로 만들어져 있어서 결국 응답을 받아서 처리해야 하는 로직(a)상은 비슷한 시간이 소요될 수 있음

그럼에도 자원 사용율은 20%, 전체적인 응답시간은 1/6 로 줄어든 효과가 있었다.

 

3. 순차처리 할때 발생하지 않았던 문제가 하나 생겼는데 동시에 같은 DB Row에 접근하여 간혹 Lock이 발생하는 문제가 있었다.

이부분은 API 호출 후 결과값을 잘 정비하고 RDB에 update하는 로직을 뒷부분으로 옮겨오면서 해결할 수 있었다. 

 

<개요>
- 다음과 같이 Service #A 에서 Service #B로 데이터 조회 API를 요청하고 값을 받아오는 로직이 있다.
- Service #B에서는 AWS Athena를 저장소로 사용하고 있으며 Athena JDBC42 드라이버를 사용 중 이다.

API 호출 후 응답


<현상>
- Service #B에서 JdbcTemplate을 통하여 쿼리가 수행된 시간은 11:13:13 이고,
2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.core.JdbcTemplate : Executing SQL query
2021-11-04 11:13:13.482 DEBUG 9668 --- [http-nio-8200-exec-9] o.s.jdbc.datasource.DataSourceUtils : Fetching JDBC Connection from DataSource
- 실제 쿼리 수행결과를 받아온 시간은 11:15:57 로 약 2분44초 가 소요되었다.
2021-11-04 11:15:57.998 INFO 9668 --- [http-nio-8200-exec-9] ...

- Athena 의 경우 동시에 다수의 쿼리가 수행되면 Queue에 의하여 순차적으로 수행될 수 있기 때문에 쿼리 히스토리를 조회하였다.

11:13:13.542초 시작, 수행시간 0.555초
대기열시간 1분21초

- 대기열 시간 1분21초 + 수행시간 0.555초를 제외하고 꽤 오랜시간이 소요되었다.

<소스분석>
- AthenaJDBC42의경우 일반적인 JDBC드라이버처럼 커넥션을 맺고 Resultset을 처리하는 형태가 아니라 AWS Athena로 Http를 통해서 수행요청을 하고, 리턴값으로 ID를 받아온 뒤 일정시간 Thread Sleep하면서 조회 polling을 요청하고 Status가 Completed가 되었을때 후속처리를 하는 형태로 구성되어 있다.

- 또한 위에도 언급한것처럼 동시에 다수의 요청이 집중될경우 자체적으로 큐에 보관하여 처리하게 된다.

- 부수적으로 Athena JDBC드라이버의 SStatement내 execute, getResultSet등의 메소드를 살펴보면 대부분 synchronized로 선언이 되어있기 때문에 이에 따른 delay도 있지 않을까 예상한다.

 

<Thread Dump>

10개의 Thread가 같은 위치에서 대기중이다.

"http-nio-8200-exec-9" #44 daemon prio=5 os_prio=31 tid=0x00007ffcc655f800 nid=0x8c03 waiting on condition [0x000070000c638000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.simba.athena.athena.api.AJClient.executeQuery(Unknown Source)
at com.simba.athena.athena.dataengine.AJQueryExecutor.execute(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeNoParams(Unknown Source)
at com.simba.athena.jdbc.common.SStatement.executeQuery(Unknown Source)
- locked <0x000000078740ccf8> (a com.simba.athena.athena.jdbc42.AJ42Statement)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at org.springframework.jdbc.core.JdbcTemplate$1QueryStatementCallback.doInStatement(JdbcTemplate.java:439)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:376)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:473)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:480)


<정리>
- 다수의 사용자에게서 발생하는 ad-hoc형태 처리는 적합하지 않다.(hive와 동일함)

- Global cache(Redis)를 적절히 활용하여 Service #B Layer에서 처리를 하도록 하면 효율성을 증가시킬수 있다.(일반적인 캐시전략)

- Red Shift등의 빠른대안도 있으나 가성비가 매우 떨어진다.


<현상>
- Spring @Async를 사용하여 비동기 메소드를 작성하고 기존에 CompletableFuture를 사용해서 값을 넘겨받던 부분이 어느순간부터 null 로 넘어오는 현상

기존에 retMap이 넘어오던 부분이
null 로 처리가 되기 시작한다.


<원인>
- 디버깅중에 원인을 찾았다. 기존에 Logging Aspect를 이용하여 @Async 메소드에 대해서 로깅처리하던 부분이 있었다.
- 기존에 @After advice를 활용하던부분을 @Around advice로 변경하면서 발생하는 문제였다.
- @Around를 사용할 경우 proceed 를 수행하여 다음단계로 넘어가게 되는데, 이때 잊지말고 리턴값을 전달해주어야 한다.
(이런 초보적인 실수를 ㅠㅠ)

 @Around("...PointCut()") public Object logger(ProceedingJoinPoint pjp) throws Throwable { log.debug("before {}", ...); Object obj = pjp.proceed(); log.debug("after {}", ...); return obj; }

https://icthuman.tistory.com/entry/AWS-S3-Athena-%EC%82%AC%EC%9A%A9%EC%A4%91-JDBC-Driver%EB%8F%99%EC%8B%9C%EC%84%B1-%EB%AC%B8%EC%A0%9C

 

AWS S3-Athena 사용중 JDBC Driver동시성 문제

<개요> - S3보안버켓으로 파일을 올려서 Athena 작업중 - 1개의 CSV파일(2.65GB) 12개의 컬럼 - AthenaJDBC42.jar 사용 s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC-2.0.16.1000/docs/Simba+..

icthuman.tistory.com

<현상>

- AWS Athena사용시 quota제한 으로 인하여 다음과 같은 문제가 발생하였습니다.

- 동시에 많은 요청이 몰리는 것을 방지하기 위해서 다음과 같이 API호출수를 제한하도록 하였으며 object wait/notify를 활용하였습니다.

 

<로직>

- API 호출시 기본적으로 @Async호출

- API호출전에 현재 호출count를 확인하여 일정수치 이상일 경우 object.wait()로 대기

- 호출이 성공하면 count를 증가

- 비동기호출을 마치고 응답을 받으면 callback으로 호출count를 감소시키고 object.notify

CompletableFuture<Map<String, Integer>> completableFutureForRule;
    ...
    synchronized (this){
		waitBeforeCall();
		completableFutureForRule = apiCallService.call(request);
		resultCount.incrementAndGet();
	}
    
    completableFutureForRule.thenAccept(retMap -> {
    	synchronized (this){
    		notifyAfterCall();
    }
private void waitBeforeCall(){

	try {
  
		while (apiCallService.getCallCount() >= SIZE) {
			this.wait();
		}
	} catch (InterruptedException e) {
		log.error("Interruped");
	}
	apiCallService.incrementAndGet();
}

private void notifyAfterCall(){
	apiCallService.decrementAndGet();
	this.notifyAll();
}

- blocking queue를 구현할때 와 매우 유사한 로직임을 알수 있습니다.

 

<참고사항>

- caller service 와 callee service가 같은 비율로 증가한다면 이론상 문제는 없습니다. (e.g 10, 20, 30)

  그러나 실제는 두 서비스간에 연관관계가 없을 것으로 예상되며, 또한 이 로직은 한 jvm내 단일 service에서만 유용하기 때문에 ECS와 같이 scale out 이 되는 구조에서 전체 API 호출수를 제한하기 위해서는 다른 방법을 사용해야 합니다. 

e.g) A. cluster shared lock

      B. API gateway를 활용한 limit rate

      C. Redis를 활용한 global count개념

 

- wait후 notify의 경우 잠들어 있는 하나의 쓰레드만 깨우기 때문에 여러가지 예외 상황에 대응하거나 개별제어가 어려워서 일반적으로 notifyAll을 많이 사용합니다.

- while()문으로 체크해야 하는 이유는 여러 쓰레드가 다시 lock을 획득하기 위해 경쟁하기 때문입니다. 또한 Spurious wakeups 처럼 이유없이 쓰레드가 깨어난 경우에도 다시 wait로 진입하기 위해서 필요합니다.

- 이와 같이 일정시간 대기하는 로직을 작성할때 sleep을 사용하는 경우를 간혹 볼 수 있는데, sleep의 경우 wait와 다르게 lock을 반환하지 않으므로 주의해서 사용해야 합니다. (stackoverflow 참조)

https://stackoverflow.com/questions/1036754/difference-between-wait-and-sleep

- 로직이 복잡해질 경우 CountDownLatch 를 이용해서 구현하는 것을 권장합니다!

+ Recent posts