전 과정에서는 카프카를 사용해서 서버 분산 로직을 구현해 봤다.
이번에는 요청이 들어왔을 때 모든 메시지를 카프카에 적재하는 것이 아닌
레디스를 사용해서 티켓 수량이 매진됐을 때는 메시지를 적재하지 않도록 하여 서버의 부하를 막을 것이다.
따라서 순서를 보면
- 티켓 예매 요청이 들어온다.
- 레디스에서 티켓 수량이 유효한지 확인한다.
- 유효하다면 카프카에 예매 요청 메시지 적재
- 메시지 컨슘해서 서비스 단에서 DB의 티켓 수량 변경(Redisson Lock 사용)
그리고 위 순서대로 코드를 작성해 볼 것이다.
(카프카 설정 코드는 전 편인 Step 3 참조)
코드
(요청 처리 순서대로)
TicketController
@RestController
@RequiredArgsConstructor
@RequestMapping("/ticket")
@Slf4j
public class TicketController {
private final TicketService ticketService;
private final TicketKafkaService ticketKafkaService;
// step 4) 레디스 분산 락을 사용해서 레디스에서 잔여 표 확인 및 카운팅 후 카프카 메시징 처리
@PostMapping("/kafkaAndRedis")
public ResponseEntity<String> reserveTicketWithRedisAndKafkaV2(@RequestBody TicketReserveRequest ticketReserveRequest) throws TicketSoldOutException {
log.info("POST " + ticketReserveRequest.getTicketId() + ", " + LocalDateTime.now());
ticketKafkaService.sendReserveTicketWithRedis(ticketReserveRequest.getTicketId());
return ResponseEntity.ok("티켓 예약 신청 완료");
}
// 티켓 생성
@PostMapping
private ResponseEntity<String> autoCreateTicket(@RequestBody TicketRequest ticketRequest) {
ticketService.save(ticketRequest);
return ResponseEntity.ok("티켓 생성 성공");
}
}
컨트롤러에서 예매 요청이 들어온다.
TicketKafkaService
@Service
@Transactional
@RequiredArgsConstructor
@Slf4j
public class TicketKafkaService {
private final ReserveTicketProducer reserveTicketProducer;
private final TicketRedisRepository ticketRedisRepository;
public void sendReserveTicketWithRedis(Long ticketId) {
Long remainTicketCount = ticketRedisRepository.decreaseTicketCount(ticketId);
log.info("remain count: " + remainTicketCount);
if (remainTicketCount < 0) {
throw new TicketSoldOutException();
}
reserveTicketProducer.send(new TicketReserveRequest(ticketId));
}
}
카프카로 예약 메시지를 프로듀싱하기 전에 먼저 레디스에서 티켓이 유효한지 확인해서 처리한다.
TicketRedisRepository
@Repository
@RequiredArgsConstructor
public class TicketRedisRepository {
private final RedisTemplate<String, String> redisTemplate;
public void createTicket(Long ticketId, int maxCount) {
String key = getCurrentCountKey(ticketId);
redisTemplate.opsForValue().set(key, String.valueOf(maxCount));
}
public Long decreaseTicketCount(Long ticketId) {
String key = getCurrentCountKey(ticketId);
return redisTemplate.opsForValue().decrement(key);
}
private String getCurrentCountKey(Long ticketId) {
return "ticket-currentCount-" + ticketId;
}
}
나의 경우 티켓이 생성될 때 해당 티켓의 currentCount를 지정해서 저장하고 티켓 예매 요청이 오면 마이너스 처리를 해주었다.
ReserveTicketProducer
@Component
@Transactional
@RequiredArgsConstructor
@Slf4j
public class ReserveTicketProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void send(TicketReserveRequest ticketRequest) {
try {
String jsonObject = objectMapper.writeValueAsString(ticketRequest);
kafkaTemplate.send("reserve_ticket", jsonObject);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ReserveTicketConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class ReserveTicketConsumer {
private final TicketReserveRedissonService ticketReserveRedissonService;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "reserve_ticket", groupId = "ticket")
public void consume(ConsumerRecord<String, String> record) {
try {
var ticket = objectMapper.readValue(record.value(), TicketReserveRequest.class);
ticketReserveRedissonService.reverseTicket(ticket.getTicketId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
TicketReserveRedissonService
@RequiredArgsConstructor
@Slf4j
@Service
public class TicketReserveRedissonService {
private final RedissonClient redissonClient;
private final TicketService ticketService;
// step 2, 3, 4) redisson 분산 락
public void reverseTicket(Long ticketId) {
RLock lock = redissonClient.getLock("ticket-" + ticketId);
try {
boolean isLocked = lock.tryLock(3, 3, TimeUnit.SECONDS);
if (!isLocked) {
throw new LockException();
}
ticketService.reserveTicket(ticketId);
} catch (InterruptedException e) {
log.error(e.getMessage());
} finally {
lock.unlock();
}
}
}
이제 DB에 접근해서 수량을 바꿔줄 거기 때문에 Redisson Lock을 사용하여서 동시성 문제를 해결하는 것이다.
TicketService
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class TicketService {
private final TicketRepository ticketRepository;
private final TicketRedisRepository ticketRedisRepository;
public Long save(TicketRequest ticketRequest) {
Long ticketId = ticketRepository.save(Ticket.builder()
.ticketName(ticketRequest.getTicketName())
.ticketMaxCount(ticketRequest.getTicketMaxCount())
.build()).getId();
ticketRedisRepository.createTicket(ticketId, ticketRequest.getTicketMaxCount());
return ticketId;
}
public TicketResponse findById(Long id) {
return TicketResponse.from(ticketRepository.findById(id).orElseThrow(TicketNotFoundException::new));
}
// step 2, 3, 4) Redisson, Kafka
public void reserveTicket(Long ticketId) {
Ticket ticket = ticketRepository.findById(ticketId).orElseThrow(TicketNotFoundException::new);
ticket.currentCountIncrement();
log.info("잔여 티켓: " + (ticket.getTicketMaxCount() - ticket.getTicketCurrentCount()));
}
}
테스트 코드
@SpringBootTest
class TicketServiceTest {
@Autowired
private TicketService ticketService;
@Autowired
private TicketKafkaService ticketKafkaService;
private Long id1 = null;
private final Integer maxCount = 100;
@BeforeEach
public void before() {
TicketRequest ticket1 = new TicketRequest("ticket1", maxCount);
id1 = ticketService.save(ticket1);
}
@Test
void 동시성_문제_with_Redis_And_Kafka() throws InterruptedException {
int peopleCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch tt = new CountDownLatch(peopleCount);
for (int i = 0; i < peopleCount; i++) {
executorService.execute(() -> {
ticketKafkaService.sendReserveTicketWithRedis(id1);
tt.countDown();
});
}
tt.await();
Thread.sleep(3000);
TicketResponse ticket = ticketService.findById(id1);
Assertions.assertThat(ticket.getTicketCurrentCount()).isEqualTo(maxCount);
}
}
실행 결과
다음과 같이 티켓이 매진되면 예외를 던져서 더 이상 카프카에 메시지 적재를 안 해준다.
이로 인해 서버 쪽에서 과부하를 막을 수 있다.
'JAVA > Redis' 카테고리의 다른 글
[Spring/Redis] 데이터베이스 동시성 문제 해결 코드 (STEP 2. Redisson 분산 락) (0) | 2023.07.06 |
---|---|
[Redis] 도커 컴포즈로 Redis와 RedisInsight 설치해보자. (0) | 2023.07.05 |