在这个系统之前的文章我们介绍了什么是Saga模式。也描述了Saga的几种类型。最后,我们还描述了我们将会在一些场景中使用Saga模式来实现。
下面是我们关于Saga模式实现的本系列的提纲:
第一部分: 我们学习了《Saga》的基本内容。如果你不确定什么是Saga模式,我强烈建议你去看看那篇文章,然后再回到这篇文章。
第二部分(本文):我们将首先为一个常见场景用Saga模式 解决。
第三部分: 我们将继续未完成的工作,并用Axon Server来串联起各个服务。
第四部分:我们将测试我们的Saga应用程序。
这整个应用程序可以在Github上找到。
现在让我们开始动手吧。
Saga模式实现类型问题
正如我们前面所讨论的,我们将使用基于编排方式的Saga方法来实现Saga模式。当然基本协调方式的Saga也可以实现。
换句话说,编排器就是一个管理器,它编排参与的服务执行一个或多个本地事务。
下图显示了这种方法(基于编排)的一个典型示例:
在这里,我们着眼于基于编排的Saga来解决订单管理的问题。这个订单管理问题可以应用于任何电子商务商店、食品配送应用程序或任何其他类似的用例。
当然,出于演示目的,我们将这个问题简化了很多,它比实际生产级别的复杂性要简单得多。
顶层组件
关于我们的Saga模式实现,这个应用程序有5个主要部分。各部分内容如下:
-
订单服务:
该服务暴露api,帮助在系统中创建订单。此外,该服务还管理订单聚合。订单聚合只是一个维护订单相关信息的实体。然而,订单服务还是订单管理Saga的一个子服务(维护订单内部的事务)。
-
支付服务:
支付服务根据订单管理Saga发出的创建发票命令进行操作。一旦它完成了它的工作,它就发布一个事件。这一事件将这个Saga推向了下一步流程。
-
运输服务:
该服务负责在系统中创建与订单相关的发货。它根据Saga管理器发出的命令执行对应的动作。一旦它完成了它的任务,它还会发布一个推动Saga推向下一步流程。
-
核心API( Core-APIs) :
这不是一个服务。然而,核心api充当了各种服务之间的集成粘合剂,这些服务构成了这个Saga的一部分。在我们的示例中,核心api将包含Saga实现运行所需的各种命令和事件定义。
-
Axon Server
Axon服务器是Axon平台的一部分。我们将使用Axon框架来管理我们的聚合,如订单、支付、发货。此外,我们将使用Axon服务器来处理这三个服务之间的通信。如果你想了解更多关于Axon服务器的细节,可以查看我的文章。
让我们从具体的细节开始吧
整个应用架构
下面是我们的应用程序的总体工程结构图。
.
├── core-apis
├── order-service
├── payment-service
├── pom.xml
├── saga-axon-server-spring-boot.iml
└── shipping-service
如您所见,它是一个多maven模块结构。每个服务都是maven子模块,是整个项目的一部分。
这个pom.xml文件将他们合并在一起。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.progressivecoder.saga-pattern</groupId>
<artifactId>saga-axon-server-spring-boot</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>order-service</module>
<module>payment-service</module>
<module>core-apis</module>
<module>shipping-service</module>
</modules>
</project>
订单服务的实现
Order Service(以及所有其他服务),我们将用Spring Boot来构建应用程序。如果您不知道Spring Boot或想要更新,请参阅我关于Spring Boot微服务的文章。
下面是这个应用的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Axon -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>com.progressivecoder.saga-pattern</groupId>
<artifactId>core-apis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
以下是一些需要考虑的重要问题:
我们使用的是Axon Spring Boot starter(版本4.0.3)。他对对Axon框架和Axon服务器提供了比较好的接入。
为了方便测试我们的应用程序,我们还包括使用了Swagger。
我们使用Spring Boot Starter Data JPA和基于内存实现的H2数据库作为持久层。
此外,我们还有另一个称为core-api的模块。我们一会再回来讲。
订单聚合根
订单聚合是Saga模式实现中最重要的部分之一。它是订单管理Saga工作的基础。让我们看看它是怎样的:
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private ItemType itemType;
private BigDecimal price;
private String currency;
private OrderStatus orderStatus;
public OrderAggregate() {
}
@CommandHandler
public OrderAggregate(CreateOrderCommand createOrderCommand){
AggregateLifecycle.apply(new OrderCreatedEvent(createOrderCommand.orderId, createOrderCommand.itemType,
createOrderCommand.price, createOrderCommand.currency, createOrderCommand.orderStatus));
}
@EventSourcingHandler
protected void on(OrderCreatedEvent orderCreatedEvent){
this.orderId = orderCreatedEvent.orderId;
this.itemType = ItemType.valueOf(orderCreatedEvent.itemType);
this.price = orderCreatedEvent.price;
this.currency = orderCreatedEvent.currency;
this.orderStatus = OrderStatus.valueOf(orderCreatedEvent.orderStatus);
}
@CommandHandler
protected void on(UpdateOrderStatusCommand updateOrderStatusCommand){
AggregateLifecycle.apply(new OrderUpdatedEvent(updateOrderStatusCommand.orderId, updateOrderStatusCommand.orderStatus));
}
@EventSourcingHandler
protected void on(OrderUpdatedEvent orderUpdatedEvent){
this.orderId = orderId;
this.orderStatus = OrderStatus.valueOf(orderUpdatedEvent.orderStatus);
}
}
如您所见,这是一个典型的实体类。这里需要注意的主要事项是这里有axon自己的的注解@Aggregate和@AggregateIdentifier。这些注解允许Axon框架管理订单聚合实例。
此外,我们正在使用事件溯源来存储在此聚合上发生的事件。事件溯源是另一种微服务体系结构模式,它们存储领域事件然后构建聚合信息。如果你想了解更多的话,我有一个关于事件来源实现的详细文章
。
订单Service层
为了方便创建订单,我们还定义了一个订单服务接口及其相应的实现。
public interface OrderCommandService {
public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO);
}
@Service
public class OrderCommandServiceImpl implements OrderCommandService {
private final CommandGateway commandGateway;
public OrderCommandServiceImpl(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@Override
public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO) {
return commandGateway.send(new CreateOrderCommand(UUID.randomUUID().toString(), orderCreateDTO.getItemType(),
orderCreateDTO.getPrice(), orderCreateDTO.getCurrency(), String.valueOf(OrderStatus.CREATED)));
}
}
该service服务层实现使用Axon框架的命令网关向聚合发出命令。该命令在我们前面声明的聚合类中处理。
订单Controller层
Order Controller类是我们创建API端点的地方。此时,出于演示的目的,我们只有一个端点(一个api接口)。
@RestController
@RequestMapping(value = "/api/orders")
@Api(value = "Order Commands", description = "Order Commands Related Endpoints", tags = "Order Commands")
public class OrderCommandController {
private OrderCommandService orderCommandService;
public OrderCommandController(OrderCommandService orderCommandService) {
this.orderCommandService = orderCommandService;
}
@PostMapping
public CompletableFuture<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO){
return orderCommandService.createOrder(orderCreateDTO);
}
}
为了帮助Swagger找到这些端点并暴露swagger api,我们还得配置Swagger。
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket apiDocket(){
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.basePackage("com.progressivecoder.ordermanagement"))
.paths(PathSelectors.any())
.build()
.apiInfo(getApiInfo());
}
private ApiInfo getApiInfo(){
return new ApiInfo(
"Saga Pattern Implementation using Axon and Spring Boot",
"App to demonstrate Saga Pattern using Axon and Spring Boot",
"1.0.0",
"Terms of Service",
new Contact("Saurabh Dashora", "progressivecoder.com", "coder.progressive@gmail.com"),
"",
"",
Collections.emptyList());
}
}
订单管理Saga
Saga模式实现的核心是订单管理Saga。简而言之,这也是一个典型的Java类,它就是一个流程管理器,用于处理业务的流程。
@Saga
public class OrderManagementSaga {
@Inject
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderCreatedEvent orderCreatedEvent){
String paymentId = UUID.randomUUID().toString();
System.out.println("Saga invoked");
//associate Saga
SagaLifecycle.associateWith("paymentId", paymentId);
System.out.println("order id" + orderCreatedEvent.orderId);
//send the commands
commandGateway.send(new CreateInvoiceCommand(paymentId, orderCreatedEvent.orderId));
}
@SagaEventHandler(associationProperty = "paymentId")
public void handle(InvoiceCreatedEvent invoiceCreatedEvent){
String shippingId = UUID.randomUUID().toString();
System.out.println("Saga continued");
//associate Saga with shipping
SagaLifecycle.associateWith("shipping", shippingId);
//send the create shipping command
commandGateway.send(new CreateShippingCommand(shippingId, invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderShippedEvent orderShippedEvent){
commandGateway.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderUpdatedEvent orderUpdatedEvent){
SagaLifecycle.end();
}
}
@StartSaga注解表示Saga的开始。它告诉Axon去创造一个新的Saga。Saga还与聚合的一个特定实例相关联。这是通过使用@SagaEventHandler指定的associationProperty来完成的。在本例中,我们使用属性orderId将Saga与订单聚合的实例关联起来。我们还指定了应该调用该方法的事件。在我们的例子中,它是Order Created Event。
被@SagaEventHandler注解的其他方法表示属于Saga的其他事务(处理另一种业务)。
我们还使用sagalifcycle .associate with()方法将Saga与其他业务关联起来,比如支付和运输。通过允许客户机生成唯一标识符,我们不必遵循请求-响应模型。这使我们能够轻松地将标识符与Saga关联起来。
我们使用 SagaLifecycle.end()来结束这个Saga。
Core-APIs
现在是了解core - api模块的好时机(command,event没有setter方法)。
核心api只是一堆命令和事件的类,这些命令和事件将成为Saga模式实现的一部分。
命令(Commands)
在我们的应用程序中创建新订单时,将触发Create Order Command。此命令由订单聚合处理。
public class CreateOrderCommand {
@TargetAggregateIdentifier
public final String orderId;
public final String itemType;
public final BigDecimal price;
public final String currency;
public final String orderStatus;
public CreateOrderCommand(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
this.orderId = orderId;
this.itemType = itemType;
this.price = price;
this.currency = currency;
this.orderStatus = orderStatus;
}
}
接下来,创建订单时,订单管理Saga将触发Create Invoice Command。
public class CreateInvoiceCommand{
@TargetAggregateIdentifier
public final String paymentId;
public final String orderId;
public CreateInvoiceCommand(String paymentId, String orderId) {
this.paymentId = paymentId;
this.orderId = orderId;
}
}
当发票创建和付款处理完成时,订单管理Saga也会触发Create Shipping Command。
public class CreateShippingCommand {
@TargetAggregateIdentifier
public final String shippingId;
public final String orderId;
public final String paymentId;
public CreateShippingCommand(String shippingId, String orderId, String paymentId) {
this.shippingId = shippingId;
this.orderId = orderId;
this.paymentId = paymentId;
}
}
最后,我们有Update Order Status Command。当运输完成后,将触发此命令。
public class UpdateOrderStatusCommand {
@TargetAggregateIdentifier
public final String orderId;
public final String orderStatus;
public UpdateOrderStatusCommand(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}
}
事件(Events)
整个过程中的第一个事件是订单创建事件(Order Created Event.)。就像我们之前看到的,这个事件也是开始这个Saga的原因。
public class OrderCreatedEvent {
public final String orderId;
public final String itemType;
public final BigDecimal price;
public final String currency;
public final String orderStatus;
public OrderCreatedEvent(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
this.orderId = orderId;
this.itemType = itemType;
this.price = price;
this.currency = currency;
this.orderStatus = orderStatus;
}
}
下一个事件是Invoice Created Event。Invoice服务发布此事件。我们将在下一篇文章中看到它的实现。
public class InvoiceCreatedEvent {
public final String paymentId;
public final String orderId;
public InvoiceCreatedEvent(String paymentId, String orderId) {
this.paymentId = paymentId;
this.orderId = orderId;
}
}
在那之后,我们有Order Shipped Event。该事件由Shipping服务在完成必要的操作后发布。
public class OrderShippedEvent {
public final String shippingId;
public final String orderId;
public final String paymentId;
public OrderShippedEvent(String shippingId, String orderId, String paymentId) {
this.shippingId = shippingId;
this.orderId = orderId;
this.paymentId = paymentId;
}
}
最后,我们有Order Updated Event。此事件由订单聚合在更新订单状态后发布。
public class OrderUpdatedEvent {
public final String orderId;
public final String orderStatus;
public OrderUpdatedEvent(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}
}
结论:
至此,我们已经成功实现了应用程序的两个主要部分——订单服务和核心api。
Order服务包含了我们的主要Saga模式实现代码。另一方面,核心api是我们订单管理Saga的支柱。
我们将在这里结束这篇文章,因为它已经变得相当长。然而,整个Saga实现的代码可以在Github上获得以供参考。
在下一篇文章中,我们将开始实现其他服务并将它们连接到Axon服务器。所以请继续关注。