이전 편은 Redisson을 사용해서 락을 걸어 DB 동시성 문제를 해결했다.
이번 편에서는 만약에 MSA 환경을 구축할 정도의 트래픽이 몰렸을 때 카프카를 추가해서 메세징을 이용해 볼 것이다.
참고로 이 예제는 MSA 환경이 아닌 한 프로젝트 내에서 적용시킬 것이다.
코드
(kafka)
먼저 카프카 설정부터 해줄 것이다.
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
ReserveTicketProducer
@Component
@Transactional
@RequiredArgsConstructor
@Slf4j
public class ReserveTicketProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void send(TicketReserveRequest ticketRequest) {
try {
String jsonObject = objectMapper.writeValueAsString(ticketRequest);
kafkaTemplate.send("reserve_ticket", jsonObject);
} catch (Exception e) {
e.printStackTrace();
}
}
}
reserve_ticket이라는 토픽에 TicketRequest를 올린다.
ReserveTicketConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class ReserveTicketConsumer {
private final TicketReserveRedissonService ticketReserveRedissonService;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "reserve_ticket", groupId = "ticket")
public void consume(ConsumerRecord<String, String> record) {
try {
var ticket = objectMapper.readValue(record.value(), TicketReserveRequest.class);
ticketReserveRedissonService.reverseTicket(ticket.getTicketId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
reserve_ticket 토픽으로 받는데 서버를 분산했다고 가정하였기 때문에 groupId를 지정했다.
TicketKafkaService
@Service
@Transactional
@RequiredArgsConstructor
public class TicketKafkaService {
private final ReserveTicketProducer reserveTicketProducer;
public void sendReserveTicket(Long ticketId) {
reserveTicketProducer.send(new TicketReserveRequest(ticketId));
}
}
이 부분은 컨트롤러에 요청이 들어왔을 때 카프카 프로듀서로 보내주기 위한 서비스 단이다.
코드
(컨트롤러, 서비스)
TicketController
@RestController
@RequiredArgsConstructor
@RequestMapping("/ticket")
@Slf4j
public class TicketController {
private final TicketService ticketService;
private final TicketKafkaService ticketKafkaService;
// step 3) 레디스 분산락 + 카프카
@PostMapping("/kafka")
public ResponseEntity<String> reserveTicketWithRedisAndKafka(@RequestBody TicketReserveRequest ticketReserveRequest) throws TicketSoldOutException {
log.info("POST " + ticketReserveRequest.getTicketId() + ", " + LocalDateTime.now());
ticketKafkaService.sendReserveTicket(ticketReserveRequest.getTicketId());
return ResponseEntity.ok("티켓 예약 신청 완료");
}
// 티켓 생성
@PostMapping
private ResponseEntity<String> autoCreateTicket(@RequestBody TicketRequest ticketRequest) {
ticketService.save(ticketRequest);
return ResponseEntity.ok("티켓 생성 성공");
}
}
TicketReserveRedissonService
@RequiredArgsConstructor
@Slf4j
@Service
public class TicketReserveRedissonService {
private final RedissonClient redissonClient;
private final TicketService ticketService;
// step 2, 3) redisson 분산 락
public void reverseTicket(Long ticketId) {
RLock lock = redissonClient.getLock("ticket-" + ticketId);
try {
boolean isLocked = lock.tryLock(3, 3, TimeUnit.SECONDS);
if (!isLocked) {
throw new LockException();
}
ticketService.reserveTicket(ticketId);
} catch (InterruptedException e) {
log.error(e.getMessage());
} finally {
lock.unlock();
}
}
}
토픽 컨슈밍해서 Redisson을 사용하기 위해 이쪽으로 보내줄 것이다.
TicketService
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class TicketService {
private final TicketRepository ticketRepository;
public Long save(TicketRequest ticketRequest) {
return ticketRepository.save(Ticket.builder()
.ticketName(ticketRequest.getTicketName())
.ticketMaxCount(ticketRequest.getTicketMaxCount())
.build()).getId();
}
public TicketResponse findById(Long id) {
return TicketResponse.from(ticketRepository.findById(id).orElseThrow(TicketNotFoundException::new));
}
// step 2, 3) Redisson, Kafka
public void reserveTicket(Long ticketId) {
Ticket ticket = ticketRepository.findById(ticketId).orElseThrow(TicketNotFoundException::new);
ticket.currentCountIncrement();
log.info("잔여 티켓: " + (ticket.getTicketMaxCount() - ticket.getTicketCurrentCount()));
}
}
위 과정의 순서는 다음과 같다.
1. 컨트롤러에서 티켓 예매 요청이 들어온다.
2. 카프카 서비스 단에서 토픽에 요청 정보를 프로듀싱한다.
3. 카프카 컨슈머가 해당 토픽을 컨슈밍 하고 RedissonService로 분산 락하기 위해 티켓 ID를 보낸다.
4. 락 걸어서 데이터를 처리한다.
이렇게 해서 분산 락과 서버 분산 시 카프카를 사용해서 부담을 낮춰봤다.
다음 step)
'JAVA > kafka' 카테고리의 다른 글
[Spring] 카프카 스프링에서 토픽 프로듀싱, 컨슈밍 해보자. (feat. nGrinder) (0) | 2023.07.03 |
---|---|
[kafka/Docker] 도커로 카프카를 띄워보고, 토픽 생성 후 메시지를 보내보자. (0) | 2023.06.26 |