1. Message Queue

Message Queue 는 기존 레거시부터 최근까지 계속 사용되고 있는 방식입니다.

지원하는 프로토콜(MQTT, AMQP, JMS)에 따라서 분류되기도 하며 메모리 기반/디스크 기반으로 나누어지기도 합니다.

또한 Message를 처리하는 방식에 따라서 Push / Pull 방식으로 나누기도 합니다.

그리고 Queue 의 정상적인 동작을 돕기 위해서 Broker가 개입하는 것이 일반적입니다.

 

그러나 다양한 제품 가운데 공통적인 것이 있으니 결국은 Queue 라는 것입니다.

FIFO(First In First Out)의 메시지 처리가 필요하다는 전제로 각 시스템의 특징과 요구사항에 따라 맞춰서 솔루션을 선택하면 됩니다.

 

2. Message Queue 를 사용하는 이유는?

특히 Queue 는 비동기식 메시지 처리에서 많이 사용되는데 그 이유는 다음과 같습니다.

 

- 어플리케이션을 구현할 때 메시지가 어디서 오는지(Source),  어디로 보내야 하는지(Target) 신경쓸 필요가 없다.

- Producer는 메시지를 작성하는 데에만 집중하고 메시지 전달에 관련된 (목적지, 순서, QoS 등) 항목들에 대해서는 MQ에 모두 위임한다.

- Consumer 역시 해당 메시지를 어떻게 소모하여 처리하는 지에 대해서만 집중하면 된다.

 

- Producer 와 Consumer가 보다 각자의 역할에 집중할 수 있게 도와준다.

 

3. 자료구조 Queue

특정 자료구조에 대한 정리나 코드연습은 Algorithm 카테고리에서 자세히 소개할 예정이기 때문에 간단히 살펴봅니다.

Queue

이해를 돕기 위해서 array를 이용해서 작성한 Queue Sample입니다.

class Queue { 
	private static int front, rear, capacity; 
	private static int queue[]; 

	Queue(int c) 
	{ 
		front = rear = 0; 
		capacity = c; 
		queue = new int[capacity]; 
	} 

	static void queueEnqueue(int data) 
	{ 
		// check queue is full or not 
		if (capacity == rear) {  
			return; 
		} 
		// insert element at the rear 
		else { 
			queue[rear] = data; 
			rear++; 
		} 
		return; 
	} 
 
	static void queueDequeue() 
	{ 
		// if queue is empty 
		if (front == rear) { 
			return; 
		} 

		else { 
			for (int i = 0; i < rear - 1; i++) { 
				queue[i] = queue[i + 1]; 
			} 
			if (rear < capacity) 
				queue[rear] = 0; 

			rear--; 
		} 
		return; 
	} 
} 

-queueEnqueue 

 rear 는 현재 데이터를 넣어야하는 위치의 index 입니다.

 만약에 capacity 만큼 꽉 차있다면 더 이상 데이터를 넣을 수 없습니다.

 데이터를 넣는 것이 가능하다면 삽입 후 index를 변경합니다.

 

-queueDequeue

 front는 데이터가 삭제되는 위치의 index입니다.

 front == rear 라면 비어있는 상태입니다.

 데이터를 제거하는 것이 가능하다면 제거 후 index를 변경합니다.

 

4. Queue 사용시 주의점

위의 sample소스를 통해서 Queue의 특징을 이해하셨다면 주의점도 파악하실 수 있습니다.

- Queue는 무한한 용량이 아니다. 적절히 소모가 이루어지지 않는다면 Queue의 모든 용량을 소모하게 되고 유실이 발생한다.

- Queue에는 많은 쓰레드들이 동시에 접근할 수 있다. enqueue/dequeue 동작은 thread-safe 해야 합니다.

  Java에서는 이를 위해서 concurrent 패키지를 제공합니다.

- 서로 다른 Queue에서는 순서보장이 되지 않습니다. (당연한 말씀)

 

Computer Science의 기초인 DS & Algorithm은 반드시 알고 있는 것이 좋습니다.

 

5. Apache Kafka

 많은 Message Queue 중에서 Apache Kafka는 대용량 실시간 처리에 특화된 아키텍처 설계를 통하여 우수한 TPS를 보장합니다.

기본적으로 pub-sub 구조로 동작하며, scale out과 high availability를 확보하는 것으로 유명합니다.

아키텍처상 다양한 특징이 있지만 이번 글에서는 자료구조 & 알고리즘 측면에서 살펴보겠습니다.

 

 

5-A. Multi Partition

 Kafka가 분산처리가 가능한 것은 Multi Partition이 가능하기 때문입니다.

대용량의 데이터가 들어올 수록 병렬처리를 통해서 속도를 확보할 수 있는 구조를 가지고 있습니다.

 

그렇다면 단점은 없을까요? 

서로 다른 Partition에 대해서는 순서를 보장하지 않습니다.(다른 큐로 보시면 됩니다.)


예를 들어서 Partition 0 내의 메시지들끼리, Partition 1 내의 메시지들끼리 순서를 보장하지만 Partition 0과 1사이에는 순서를 보장할 수 없습니다.

 

반드시 순서를 보장해야 하는 케이스가 존재한다면?

 - Partition 을 하나만 사용하던지

 - Partitioner를 Custom 하게 작성하여 순서가 보장되어야 하는 메시지들은 같은 Partition으로 보내도록 합니다.

 

Topic과 혼동하시면 안됩니다. 특정 Topic내에 여러 Partition이 존재하며 Broker를 통해서 각 Topic별 Parition 의 저장정보와 복제 등이 이루어집니다. 

 

5-B. Partitioner

Kafka의 Default Partitioner는 modulo 연산을 사용하여 파티션을 나누게 되어 있습니다.

Hash 알고리즘을 잘 작성하여 데이터를 고르게 분산하면 병렬처리 성능을 더 끌어올릴 수 있습니다.

간단한 Hash Function의 예제입니다.

private int hashFunction(K key){
    int val=0;
    for(int i=0; i<key.toString().length();i++){
        val+= (key.toString().charAt(i)) % size;
    }
    return val % size;
}

일반적으로 우리가 사용하는 HashMap에서도 같은 Hash값이 자주 발생할 경우 특정 row가 길어지게 되어 chaining hash table을 구현하게 되는데, 이 때 worst case로 특정 bucket에 데이터가 집중되어 메모리를 효율적으로 사용하지 못하는 경우가 발생합니다.

 

Kafka에서도 partition을 적절하게 분산하지 못할 경우 이러한 문제가 발생할 수 있습니다.

 

6. 정리

- Apach Kafka는 자료구조 Queue와 동일합니다.

- 병렬처리를 위해서 multi partiton을 지원합니다. 여러 개의 Queue 를 동시에 사용한다고 생각하면 됩니다.

- Partitioner를 어떻게 구현하는가에 따라서 partiton이 결정됩니다.

- FIFO보장이 필요한 단위에 대해서는 같은 partiton 이 적용되도록 구현합니다.

 

0. 참조

https://sarc.io/index.php/miscellaneous/1615-message-queue-mq

https://ko.wikipedia.org/wiki/%ED%81%90_(%EC%9E%90%EB%A3%8C_%EA%B5%AC%EC%A1%B0)

https://monsieursongsong.tistory.com/

https://epicdevs.com/17

https://en.wikipedia.org/wiki/Hash_table

https://www.geeksforgeeks.org/array-implementation-of-queue-simple/

+ Recent posts