일단 수강 신청 시에 많은 트래픽이 몰려온다고 가정을 하고 아래와 같이 아키텍처를 구성하였다.
구조는 브로커를 복제해서 하나의 브로커가 고장이 나도 예비 브로커를 사용할 수 있도록 하였고, enrollment_create의 경우 순서 보장이 확실하게 이루어져야 하기 때문에 파티션 하나를 사용하였다.
그 외는 순서 보장이 필요 없기 때문에 파티션 3개를 사용하여 분할 적재하였다.
흐름은 아래와 같다.
- 학생은 수강 신청을 요청하면 수강 신청 정보는 학생 쪽 Producer에서 enrollment_create 토픽에 적재가 된다.
- 적재된 정보는 수강 신청 서비스에서 Consumer 해준 다음 수강 신청 로직을 돌린 후 enrollment_history_create에 수강 신청 정보를 적재한다.
- 수강 신청 로그와 알람은 해당 토픽의 레코드를 가져온다.
- 수강 신청 로그는 DB에 로그를 저장한다.
- 알림은 알림에 맞게 DB에 저장하고 해당 정보를 notification_create에 적재한다.
- 학생은 수강 신청 정보를 받는다.
KafkaProducerConfig, KafkaConsumerConfig 설정
먼저 프로듀서부터 코드를 작성해 보면
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, 복제2:9092, 복제3IP:9092...");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
config 옵션을 더 줄 수 있는데 간단하게 부트 스트랩 서버(복제 IP들)를 설정하고 데이터의 키와 값을 직렬화하는 옵션을 준다.
그리고 아래는 카프카 템플릿을 String 타입으로 메시지를 적재하겠다는 말이다.
여기서는 JSON을 쓸 것이다.
KafkaConsumerConfig
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, ...");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "student");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
컨슈머의 경우에는 똑같이 부트 스트랩 서버와 그룹 아이디 설정(이 부분은 빼줘도 될 것 같다. 컨슈밍 하는 @KafkaListener에서 설정해 줘도 되기 때문) 등을 해주면 된다.
프로듀서랑 비슷하다.
메시지 적재할 프로듀서 클래스 (학생 서버)
EnrollmentCreateProducer
EnrollmentCreateProducer
@Component
@Transactional
@RequiredArgsConstructor
@Slf4j
public class EnrollmentCreateProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private static final Logger logger = LoggerFactory.getLogger(EnrollmentCreateProducer.class);
public void send(Enrollment enrollment) {
try {
String jsonObject = objectMapper.writeValueAsString(enrollment);
// 로그
logger.info("producer: enrollment_create: " + jsonObject);
kafkaTemplate.send("enrollment_create", jsonObject);
} catch (Exception e) {
e.printStackTrace();
}
}
}
프로듀서에서는 위와 같이 설정해 주었는데 로그를 좀 보기 위해 Slf4j를 사용하였다.
나의 경우 컨트롤러로 수강 신청이 오면 컨트롤러 -> 서비스 -> 프로듀서로 요청 정보(Enrollment 객체)를 보내 Json으로 변환 후 로그를 찍고 kafkaTemplate를 사용하여 send(토픽명, 내용) 해주었다.
그러면 카프카 토픽에 적재가 된다.
적재된 메시지를 컨슈밍 하는 클래스 (수강 신청 서버)
EnrollmentCreateConsumer
EnrollmentCreateConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class EnrollmentCreateConsumer {
private final EnrollmentService enrollmentService;
private final ObjectMapper objectMapper;
private static final Logger logger = LoggerFactory.getLogger(EnrollmentCreateConsumer.class);
@KafkaListener(topics = "enrollment_create", groupId = "enrollment")
public void consume(ConsumerRecord<String, String> record) {
try {
var enrollment = objectMapper.readValue(record.value(), Enrollment.class);
logger.info("consumer: enrollment_create -, Partition: {}, Offset: {}: {}",
record.partition(), record.offset(), record.value());
if (enrollment.getEnrollmentStatus().equals(EnrollmentStatus.REQUEST)) {
enrollmentService.save(enrollment);
} else if (enrollment.getEnrollmentStatus().equals(EnrollmentStatus.CANCEL)) {
enrollmentService.cancel(enrollment);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
컨슈밍 하는 것은 @KafkaListener 애노테이션을 사용해서 레코드를 받을 수 있다.
topics = 컨슈밍 하는 토픽 이름이다. {enrollment_create, enrollment_history_create}와 같이 토픽을 여러 개 설정할 수도 있다.
그리고 groupId는 토픽이 메시지를 전달할 때 groupId를 설정해 주면 해당 groupId를 가진 컨슈머들에게 라운드 로빈 방식으로 메시지를 전달해 준다.
즉, 파티션 3개가 있을 때 메시지를 중복해서 보내주는 것이 아닌 서버에 분할해서 나눠준다.
이게 중요한 개념 같다. 그리고 동일 groupId의 서버가 모두 중단되었을 때도 다시 작동한다면 마지막 오프셋 이후의 메시지도 다시 받게 된다.
그리고 받은 메시지는 json이고 레코드의 정보(파티션, 오프셋, 값 등)를 얻기 위해 먼저 record로 받았다.
그리고 이걸 객체로 변환해서 서비스 단으로 보내준다.
이렇게 하면 카프카로 메시지를 적재하고 적재된 메시지가 컨슈밍 되어 해당 로직을 돌게 된다.
nGrinder로 과부하 테스트
일단 모든 카프카는 EC2에 올려두었고, 요청은 분산된 3대의 서버(PC)에서 나눠서 로직을 처리했다.
서버를 나눴으니 당연히 성능이 좋게 나온다.
서버 1개
서버 3개
테스트는 간단하게 2분만 해보았다.
TPS와 실행 테스트를 보면 약 1.7배 정도 높게 처리할 수 있게 되었다.
'JAVA > kafka' 카테고리의 다른 글
[Redis/kafka] 데이터베이스 동시성 문제 해결 코드 (STEP 3. Redisson, kafka로 과부하 줄이기) (0) | 2023.07.06 |
---|---|
[kafka/Docker] 도커로 카프카를 띄워보고, 토픽 생성 후 메시지를 보내보자. (0) | 2023.06.26 |