지원하는 프로토콜(MQTT, AMQP, JMS)에 따라서 분류되기도 하며 메모리 기반/디스크 기반으로 나누어지기도 합니다.
또한 Message를 처리하는 방식에 따라서 Push / Pull 방식으로 나누기도 합니다.
그리고 Queue 의 정상적인 동작을 돕기 위해서 Broker가 개입하는 것이 일반적입니다.
그러나 다양한 제품 가운데 공통적인 것이 있으니 결국은 Queue 라는 것입니다.
FIFO(First In First Out)의 메시지 처리가 필요하다는 전제로 각 시스템의 특징과 요구사항에 따라 맞춰서 솔루션을 선택하면 됩니다.
2. Message Queue 를 사용하는 이유는?
특히 Queue 는 비동기식 메시지 처리에서 많이 사용되는데 그 이유는 다음과 같습니다.
- 어플리케이션을 구현할 때 메시지가 어디서 오는지(Source), 어디로 보내야 하는지(Target) 신경쓸 필요가 없다.
- Producer는 메시지를 작성하는 데에만 집중하고 메시지 전달에 관련된 (목적지, 순서, QoS 등) 항목들에 대해서는 MQ에 모두 위임한다.
- Consumer 역시 해당 메시지를 어떻게 소모하여 처리하는 지에 대해서만 집중하면 된다.
- Producer 와 Consumer가 보다 각자의 역할에 집중할 수 있게 도와준다.
3. 자료구조 Queue
특정 자료구조에 대한 정리나 코드연습은 Algorithm 카테고리에서 자세히 소개할 예정이기 때문에 간단히 살펴봅니다.
이해를 돕기 위해서 array를 이용해서 작성한 Queue Sample입니다.
class Queue {
private static int front, rear, capacity;
private static int queue[];
Queue(int c)
{
front = rear = 0;
capacity = c;
queue = new int[capacity];
}
static void queueEnqueue(int data)
{
// check queue is full or not
if (capacity == rear) {
return;
}
// insert element at the rear
else {
queue[rear] = data;
rear++;
}
return;
}
static void queueDequeue()
{
// if queue is empty
if (front == rear) {
return;
}
else {
for (int i = 0; i < rear - 1; i++) {
queue[i] = queue[i + 1];
}
if (rear < capacity)
queue[rear] = 0;
rear--;
}
return;
}
}
-queueEnqueue
rear 는 현재 데이터를 넣어야하는 위치의 index 입니다.
만약에 capacity 만큼 꽉 차있다면 더 이상 데이터를 넣을 수 없습니다.
데이터를 넣는 것이 가능하다면 삽입 후 index를 변경합니다.
-queueDequeue
front는 데이터가 삭제되는 위치의 index입니다.
front == rear 라면 비어있는 상태입니다.
데이터를 제거하는 것이 가능하다면 제거 후 index를 변경합니다.
4. Queue 사용시 주의점
위의 sample소스를 통해서 Queue의 특징을 이해하셨다면 주의점도 파악하실 수 있습니다.
- Queue는 무한한 용량이 아니다. 적절히 소모가 이루어지지 않는다면 Queue의 모든 용량을 소모하게 되고 유실이 발생한다.
- Queue에는 많은 쓰레드들이 동시에 접근할 수 있다. enqueue/dequeue 동작은 thread-safe 해야 합니다.
Java에서는 이를 위해서 concurrent 패키지를 제공합니다.
- 서로 다른 Queue에서는 순서보장이 되지 않습니다. (당연한 말씀)
Computer Science의 기초인 DS & Algorithm은 반드시 알고 있는 것이 좋습니다.
5. Apache Kafka
많은 Message Queue 중에서 Apache Kafka는 대용량 실시간 처리에 특화된 아키텍처 설계를 통하여 우수한 TPS를 보장합니다.
기본적으로 pub-sub 구조로 동작하며, scale out과 high availability를 확보하는 것으로 유명합니다.
아키텍처상 다양한 특징이 있지만 이번 글에서는 자료구조 & 알고리즘 측면에서 살펴보겠습니다.
5-A. Multi Partition
Kafka가 분산처리가 가능한 것은 Multi Partition이 가능하기 때문입니다.
대용량의 데이터가 들어올 수록 병렬처리를 통해서 속도를 확보할 수 있는 구조를 가지고 있습니다.
그렇다면 단점은 없을까요?
서로 다른 Partition에 대해서는 순서를 보장하지 않습니다.(다른 큐로 보시면 됩니다.)
예를 들어서 Partition 0 내의 메시지들끼리, Partition 1 내의 메시지들끼리 순서를 보장하지만 Partition 0과 1사이에는 순서를 보장할 수 없습니다.
반드시 순서를 보장해야 하는 케이스가 존재한다면?
- Partition 을 하나만 사용하던지
- Partitioner를 Custom 하게 작성하여 순서가 보장되어야 하는 메시지들은 같은 Partition으로 보내도록 합니다.
Topic과 혼동하시면 안됩니다. 특정 Topic내에 여러 Partition이 존재하며 Broker를 통해서 각 Topic별 Parition 의 저장정보와 복제 등이 이루어집니다.
5-B. Partitioner
Kafka의 Default Partitioner는 modulo 연산을 사용하여 파티션을 나누게 되어 있습니다.
Hash 알고리즘을 잘 작성하여 데이터를 고르게 분산하면 병렬처리 성능을 더 끌어올릴 수 있습니다.
간단한 Hash Function의 예제입니다.
private int hashFunction(K key){
int val=0;
for(int i=0; i<key.toString().length();i++){
val+= (key.toString().charAt(i)) % size;
}
return val % size;
}
일반적으로 우리가 사용하는 HashMap에서도 같은 Hash값이 자주 발생할 경우 특정 row가 길어지게 되어 chaining hash table을 구현하게 되는데, 이 때 worst case로 특정 bucket에 데이터가 집중되어 메모리를 효율적으로 사용하지 못하는 경우가 발생합니다.
Kafka에서도 partition을 적절하게 분산하지 못할 경우 이러한 문제가 발생할 수 있습니다.
6. 정리
- Apach Kafka는 자료구조 Queue와 동일합니다.
- 병렬처리를 위해서 multi partiton을 지원합니다. 여러 개의 Queue 를 동시에 사용한다고 생각하면 됩니다.
@RestController
@RequestMapping(path="/api")
public class ServiceController {
@RequestMapping(value="/services/{serviceCode}", method= RequestMethod.GET)
public @ResponseBody
List<DeviceModelDto> findServiceByServiceCode (@AuthenticationPrincipal LoginUserDetails loginUserDetails,
@PathVariable("serviceCode") String serviceCode) throws ServiceNotFoundException {
// @ResponseBody means the returned String is the response, not a view name
// @RequestParam means it is a parameter from the GET or POST request
ServiceDto serviceDto = serviceService.findServiceByServiceCode(serviceCode);
if((serviceDto == null) ||
loginUserDetails.checkNotAvailableService( serviceDto.getServiceId()) ||
StringUtils.isEmpty(serviceDto.getServiceId())
){
throw new ServiceNotFoundException(String.valueOf(serviceCode));
}
기존에 만들었던 getServiceByServiceId는 사용할 수가 없습니다. serviceId를 인자값으로 하고 있었는데 이제는 serviceCode를 사용해야하기 때문입니다.
Service Layer에도 작업이 필요합니다.
public ServiceDto findServiceByServiceCode(String serviceCode) throws DataFormatViolationException {
Pattern codePattern = ValidationPattern.serviceCodePattern;
Matcher matcher = codePattern.matcher(serviceCode);
if(!matcher.matches()){
throw new DataFormatViolationException("Code value should be consist of alphabet lowercase, number and '-', (length is from 2 to 20)");
}
ServiceEntity serviceEntity = serviceRepository.findByServiceCode(serviceCode).orElse(new ServiceEntity());
return modelMapper.map(serviceEntity, ServiceDto.class);
}
이때 주의해야 할 점은 입력으로 받는 serviceCode에 대해서도 기존과 동일한 검증로직을 적용해주는 것이 좋습니다. 없어도 상관은 없습니다. 그러나 불필요한 요청이 Repository Layer까지 전달될 필요는 없을 것 같습니다. (Repository Layer는 언제나 비용이 가장 비쌉니다.)
- Null 처리를 Service Layer에서 해주었기 때문에 Controller Layer에서는 삭제가 가능합니다.
- serviceCode로 조회되는 경우도 재사용을 할 수 있도록 getServiceByServiceCode로 묶어서 private 메소드로 구현하였습니다.
기본적인 내용이지만 잠깐 짚고 넘어가야할 부분이 있습니다. 많은 분들이 개발을 할때 습관적으로 메소드의 기본을 public 으로 작성합니다.
왜그럴까요? 일단 다 사용할 수 있게 해주는 것이 편리하기 때문입니다. getter, setter 역시 습관적으로 모든 필드값에 만들어 놓고 시작하는 경우를 많이 봅니다.
하지만 이러한 습관은 설계의 기본원칙을 무시하는 위험한 행동입니다. 저는 개인적으로 private을 기본으로 하고 필요한 경우에만 public 메소드를 통해서 열어주는 것을 권장합니다. 메소드와 필드값 모두 동일한 원칙으로 적용합니다.
첫번째 시간에 LoginUserDetails내에서 service Id를 외부로 노출하지 않았던 것을 기억하시기 바랍니다. 현재 사용자의 serviceId를 가지고 작업해야 경우가 생긴다면 해당 객체의 method call을 하는 것이 맞습니다. 교과서적인 용어로는 객체간의 Interaction이라고 합니다.
public ServiceDto createService(ServiceDto serviceDto) throws DataFormatViolationException {
Pattern codePattern = ValidationPattern.serviceCodePattern;
Matcher matcher = codePattern.matcher(serviceDto.getServiceCode());
if(!matcher.matches()){
throw new DataFormatViolationException("Code value should be consist of alphabet lowercase, number and '-', (length is from 2 to 20)");
}
ServiceEntity serviceEntity =modelMapper.map(serviceDto, ServiceEntity.class);
serviceRepository.save(serviceEntity);
return modelMapper.map(serviceEntity, ServiceDto.class);
}
}
이와 같이 로직을 구성하고 값을 저장합니다. Repository Layer에서는 serviceCode 컬럼 값에 대해서 신경쓰지 않고 데이터 저장에만 집중할 수 있습니다.
UI에서도 이와 같은 validation을 동일하게 구현할 수 있지만 보다 시스템을 튼튼하게 만들기 위해서는 Service Layer에서 반드시 체크해야 합니다. 나중에 다른 비지니스 프로세스를 개발할 때 createService를 재사용할 수도 있기 때문입니다.
ServiceCode를 검증하는 중 발생하는 오류에 대해서는 별도 Exception으로 처리하였습니다.
되도록이면 Raw Exception을 사용하는 것은 지양합니다. 그리고 어떤 Exception을 어느 Layer까지 전파시킬 것인가에 대해서도 사전에 정의하는 것이 좋습니다.
- 일반적인 Web MVC구조에 따라서 Service 등록/수정/삭제/조회 하는 REST API를 만든다고 가정합니다.
<내용>
가장 단순한 건당 조회를 살펴봅니다.
Controller 클래스 입니다.
@RestController
@RequestMapping(path="/api")
public class ServiceController {
@RequestMapping(value="/services/{serviceId}", method= RequestMethod.GET)
public @ResponseBody
ServiceDto findService (@AuthenticationPrincipal LoginUserDetails loginUserDetails,
@PathVariable("serviceId") int serviceId) throws ServiceNotFoundException {
// @ResponseBody means the returned String is the response, not a view name
// @RequestParam means it is a parameter from the GET or POST request
ServiceDto serviceDto = getServiceByServiceId(serviceId, loginUserDetails);
return serviceDto;
}
private ServiceDto getServiceByServiceId(int serviceId, LoginUserDetails loginUserDetails) throws ServiceNotFoundException {
ServiceDto serviceDto = serviceService.findServiceById(serviceId);
if( (serviceDto == null) ||
loginUserDetails.checkNotAvailableService( serviceDto.getServiceId()) ||
StringUtils.isEmpty(serviceDto.getServiceId())){
throw new ServiceNotFoundException(String.valueOf(serviceId));
}
return serviceDto;
}
- LoginUserDetails 의 경우 로그인한 사용자의 정보를 저장합니다. - serviceId를 이용하여 데이터를 조회하고 결과값을 간단하게 검증하는 로직입니다.
- 결과값이 다음 중 하나와 같으면 현재 Service가 존재하지 않는 것으로 판단합니다.
1. 객체가 null 인 경우
2. ID필드의 값이 없는 경우
3. 값이 존재하나 로그인한 사용자의 정보에 해당하지 않는(본인의 서비스가 아닌 경우) 경우
이와 같이 Service 를 조회하고 값을 검증하는 로직은 조회 이외에 등록/수정에서도 필요하기 때문에 별도 메소드를 작성하여 재사용하였습니다.
그리고 현재 ServiceId와 로그인 사용자의 ServiceId를 비교하는 로직의 경우 Controller에서 구현하는 것보다는 정보은닉화와 설계원칙에 적합해 보여서 LoginUserDetails 내부에 구현하였습니다.
이와 같이 구현하면 현재 사용자의 serviceId등은 외부로 노출시키지 않아도 되며 (getter를 작성하지 않아도 됩니다.)
값의 검증이 필요한 모듈은 LoginUserDetails 객체에 요청하기만 하면 됩니다.
Controller layer의 경우 web과 바로 연결되어 있는 부분들을 담당하기 때문에 login관련정보나 인자값을 주로 처리하며 비지니스 로직은 Service Layer에 존재하게 됩니다.
다음으로 Service Layer를 살펴보겠습니다.
@Service
public class ServiceService {
@Autowired
private ServiceRepository serviceRepository;
@Autowired
private ModelMapper modelMapper;
public ServiceDto findServiceById(int id){
ServiceEntity serviceEntity = serviceRepository.findById(id);
return modelMapper.map(serviceEntity, ServiceDto.class);
}
지금은 아무 로직이 없기 때문에 단순히 Repository 로부터 값을 조회하여 객체의 값을 매핑만 합니다.
현재 프로젝트에서는 Spring JPA를 사용하여 시스템을 구축중인데, Entity클래스의 변경은 되도록 줄이는 것이 좋습니다. 만약 Entity 클래스 하나의 유형으로 화면 - 서비스 - 데이터를 모두 처리하게 될 경우 객체지향에서 말하는 대표적인 anti pattern이 될 수 있기 때문에 별도 DTO 클래스를 사용합니다.
var Registry = require('azure-iothub').Registry;
const chalk = require('chalk');
var connectionString = process.argv[2];
var fwVersion = '2.8.5';
var fwPackageURI = 'https://secureuri/package.bin';
var fwPackageCheckValue = '123456abcde';
var sampleConfigId = 'firmware285';
2. Update할 Configuration 을 정의한다.
// <configuration>
var firmwareConfig = {
id: sampleConfigId,
content: {
deviceContent: {
'properties.desired.firmware': {
fwVersion: fwVersion,
fwPackageURI: fwPackageURI,
fwPackageCheckValue: fwPackageCheckValue
}
}
},
// Maximum of 5 metrics per configuration
metrics: {
queries: {
current: 'SELECT deviceId FROM devices WHERE configurations.[[firmware285]].status=\'Applied\' AND properties.reported.firmware.fwUpdateStatus=\'current\'',
applying: 'SELECT deviceId FROM devices WHERE configurations.[[firmware285]].status=\'Applied\' AND ( properties.reported.firmware.fwUpdateStatus=\'downloading\' OR properties.reported.firmware.fwUpdateStatus=\'verifying\' OR properties.reported.firmware.fwUpdateStatus=\'applying\')',
rebooting: 'SELECT deviceId FROM devices WHERE configurations.[[firmware285]].status=\'Applied\' AND properties.reported.firmware.fwUpdateStatus=\'rebooting\'',
error: 'SELECT deviceId FROM devices WHERE configurations.[[firmware285]].status=\'Applied\' AND properties.reported.firmware.fwUpdateStatus=\'error\'',
rolledback: 'SELECT deviceId FROM devices WHERE configurations.[[firmware285]].status=\'Applied\' AND properties.reported.firmware.fwUpdateStatus=\'rolledback\''
}
},
// Specify the devices the firmware update applies to
targetCondition: 'tags.devicetype = \'chiller\'',
priority: 20
};
// </configuration>
deviceContent에는 Device가 참조할 값들이 들어가고, metrics에는 작업 수행 중 Device 가 기록하는 값(DeviceTwin)을 조회하여 모니터링 할 수 있는 쿼리가 들어간다.
3. RegistryManager를 통해서 해당 Configuration 정보를 등록한다.
해당 값을 String으로만 처리하는 경우에는 유효하지 않는 코드값이 들어오거나, 오타가 발생하는등의 오류를 잡아내기가 힘들며 또한 코드값이 변경/추가되었을때 유지보수가 용이하지 않기 때문에 되도록 enum type사용을 권장한다.
또한, enum type을 사용하더라도 코드값에 따른 로직분기의 경우 if - else if문의 반복을 통해서 수행하는 경우가 많은데 추후 요구사항의 변경이 발생하였을 때 코드의 가독성을 떨어뜨리고, 버그를 만드는 원인이 된다. 이부분 역시 enum type을 활용하여 소스를 깔끔하게 유지할 필요가 있다.
Firmware Update의 모니터링쿼리를 살펴보면 각 Status에 따라 쿼리의 조건문이 다르고, 인자의 수에 따라서도 미묘하게 다르게 생성되어야 하지만 공통으로 공유하는 조건도 있다.
예를 들어서 CURRENT인 경우 상태가 'current'인 device에 대해서 조회가 이루어져야 하며, APPLYING의 경우는 'downloading', 'verifying', 'applying' 의 3가지 경우에 대해서 조회가 이루져야 한다. 그러나 조회조건의 configurationId는 동일하다.
해당 로직을 외부에서 각각 구현한다면 중복로직이 존재하게 되고 추후 변경사항이 발생할 경우 영향을 받는 범위도 넓다. 이는 OCP원칙에 위배되기 때문에 확장에는 열려있고 변경에는 닫히도록 작성할 필요가 있다.
public enum FirmwareUpdate{
CURRENT(Arrays.asList(FirmwareStatus.CURRENT)),
REBOOTING(Arrays.asList(FirmwareStatus.REBOOTING)),
ERROR(Arrays.asList(FirmwareStatus.ERROR)),
ROLLEDBACK(Arrays.asList(FirmwareStatus.ROLLEDBACK)),
APPLYING(Arrays.asList(FirmwareStatus.APPLYING, FirmwareStatus.DOWNLOADING, FirmwareStatus.VERIFYING));
List<FirmwareStatus> statusList;
FirmwareUpdate(List<FirmwareStatus> statusList){
this.statusList=statusList;
}
String query(String configurationId){
String temp = "SELECT deviceId FROM devices WHERE configurations.[[" + configurationId + "]].status=\'Applied\' AND ";
if (statusList.size() == 0) {
throw new ArrayIndexOutOfBoundsException("the size of status list should be positive number");
} else if (statusList.size() == 1) {
temp += "properties.reported.firmware.fwUpdateStatus=\'" + statusList.get(0).label + "\'";
} else {
temp += "(";
int count=0;
for (FirmwareStatus status : statusList) {
temp += "properties.reported.firmware.fwUpdateStatus=\'" + status.label + "\'";
count++;
if(count < statusList.size()){
temp+=" OR ";
}
}
temp += ")";
}
return temp;
}
}
위와 같이 각 Status에 따라서 동작하는 고유의 로직은 enum형태로 가지고 있도록 하며, 외부에는 query 메소드만 노출하도록 한다.
만약 Status 에 따라서 더욱 구체적인 구현이 필요하다면 query 메소드를 abstract로 정의하고 type별로 개별구현하는 것도 가능하다.