https://github.com/hgs-study/payment-saga-pattern
위의 레포지토리를 보면서 대략의 SAGA패턴에 대해서 감을 잡을 수 있었다.
그러나 stock , payment 서비스는 로그출력으로 대신하고 빈약한 부분이 많이 존재하였기에 이를 보완하고 개선하는 프로젝트를 진행해보고자 했다!
Saga 패턴에서의 Orchestration 방식과 Choreography 방식은 분산 트랜잭션을 관리하는 데 두 가지 상반된 접근 방식이다.
Orchestration 방식에서는 중앙에서 트랜잭션 흐름을 제어하는 서비스(또는 오케스트레이터)가 존재합니다. 이 서비스가 각 마이크로서비스에 필요한 작업을 순차적으로 호출하고, 상태를 모니터링하여 각 단계가 성공 또는 실패했는지를 확인합니다.
예시: 주문 트랜잭션에서 오케스트레이터가 "주문 생성 -> 결제 처리 -> 재고 확인" 순서로 서비스를 호출하며 각 서비스의 상태를 확인하고, 실패 시 롤백을 관리합니다.
Choreography 방식에서는 각 마이크로서비스가 자체적으로 트랜잭션을 관리하며 이벤트를 통해 서로 상태를 통지하는 방식입니다. 한 서비스에서 발생한 이벤트가 다음 서비스에 의해 처리되며 트랜잭션이 이어집니다.
예시: 주문 트랜잭션에서 "주문 생성 -> 결제 처리 -> 재고 확인" 순으로 각 단계가 이벤트로 전달되어 이어집니다. 결제 처리 후 결제 완료 이벤트가 발생하며 재고 서비스가 이를 받아 처리합니다.
레퍼런스의 개발자는 프로젝트가 간단하므로 구현하기 쉬운 Chreography 방식을 채택한것 같다.
각설하고 레퍼런스를 Fork 하여 실행해보았다.
각 서비스들을 실행시키전에 Event들을 관리할 Kafka를 다음 docker-compose 파일로 실행시킨다.
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "order-create:1:1, order-rollback:1:1, stock-decrease:1:1, stock-rollback:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Order Application
주문에 대한 서비스를 관리하며 H2의 인메모리 데이터베이스로 간단하게 동작하는 구조였다.
Stock Application
Payment Application
우선 Mysql 과 연동하기 위해서 Mysql에 대한 의존성과 JPA 의존성을 추가해준다!
이후 Stock Entity를 생성하였다. _ id, productId, stock(재고)
Event로 날려온 orderId를 기반으로 상품 재고의 증감 서비스 레이어를 작성하고 간단하게 테스트하기 위한 용도로 재고(Stock)을 생성하는 API를 생성했다
Stock 서비스는 Order Id가 아니라 Product Id를 기반으로 재고가 증감되어야 하는데 Event로 보내는 정보는 Order Id밖에 없었다…! → Order Id로 Stock 서비스를 증감하다 보니 6번 주문 발생하면 6번재고가 없어지는 이상한 프로그램이 되어버렸다. (6번 주문의 productId 가 1번이여도.. )
이를 해결하기 위해서는 크게 2가지 정도의 방법이 있다.
나는 2번 방식을 선택하였다. 왜냐하면 1번방식을 하기 위해서는 kafka에 대한 serializer/deserializer를 설정해줘야 하는데 내가 이 분야에 대해서 깊은 지식을 갖고 있지 않아서 오래걸릴 것 같았고, 기존에도 Spring 서버에서 서버간 통신을 해본 경험이 있기 때문에 성공할 수 있을 것이란 생각이 들었다..!
이때 서버간에 통신을 Webflux(비동기)를 활용해서 최대한 MSA의 효율을 올려보고자 했다.
Webflux의 WebCleint를 사용하기 위해서는 Configuration을 작성해줘서 Bean을 생성한다.
Order 서비스가 8080 포트에서 실행되고 있었기 떄문에 다음과 같이 작성해주었다.
@Configuration
public class WebClientConfig {
// 이 BASEURL은 통신할 서버 URL
public static final String BASE_URL = "<http://localhost:8080>";
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl(BASE_URL)
.build();
}
}
이후 Order 서비스와 비동기 통신을 해서 OrderId를 기반으로 ProductId를 흭득한다!!
public Mono<Long> getProductIdByOrderService(Long orderId) {
return webClient.get()
.uri("/order/{orderId}", orderId)
.retrieve()
.bodyToMono(Long.class);
}
이떄, Mono라는 클래스에 담겨져서 오게 되는데 Mono에 대해서 알아보자.
Mono란, Reactive Streams의 Publisher 인터페이스를 구현하는 구현체이며, 0..1 개의 데이터를 처리한다.
정의는 다음과 같고 비동기로 처리하다보니까 있을 수도 없을 수도 있습니다. 라는 느낌이다.
stockService.getProductIdByOrderService(orderId)
.subscribe(
stockService::decrease,
error -> log.error("error : {}", error.getMessage())
);
우선 기존의 stockservice.decrease(orderId) 로 호출하던 비즈니스 로직을 → orderId로 Order 서비스와 비동기 통신을 통해 ProductId를 가져와 stockservice.decrease 함수로 넘겨주는것을 볼 수 있다.
stockService.getProductIdByOrderService(orderId) 까지가 Mono<Long> 즉 ProductId가 담긴 부분이라고 볼 수 있고 subcribe 메소드를 사용해서 정상적으로 값이 있다면(Order Service와의 정상적인 통신) stockService 로직을 처리하고, error 가 발생하면 error 로그를 출력하도록 하였다.
( 이때 error가 발생한 다는것은 Order 서비스와 제대로 통신이 되지 않았거나, 그럴 일은 없겠지만 OrderId로 ProductId를 찾지 못했거나 그런 경우 이겠지만 그런 가능성은 배제한다.)
Order Service
Stock Service
1,2번 주문에 대해서 둘다 1번 상품으로 시도했고 정상적으로 동작하는 것을 볼 수 있다.
물론 Rollback도 정상적으로 동작한다.
https://velog.io/@hgs-study/saga-1
https://seongwon.dev/Spring-WebFlux/20230219-WebFlux란/
https://yeongchan1228.tistory.com/47
https://hoons-dev.tistory.com/120
https://hoons-dev.tistory.com/119
https://velog.io/@zenon8485/Reactor-Java-1.-Mono와-Flux를-생성하는-방법