使用spring boot和axon实现saga模式-- 中

在这个系统之前的文章我们介绍了什么是Saga模式。也描述了Saga的几种类型。最后,我们还描述了我们将会在一些场景中使用Saga模式来实现。

下面是我们关于Saga模式实现的本系列的提纲:
第一部分: 我们学习了《Saga》的基本内容。如果你不确定什么是Saga模式,我强烈建议你去看看那篇文章,然后再回到这篇文章。

第二部分(本文):我们将首先为一个常见场景用Saga模式 解决。

第三部分: 我们将继续未完成的工作,并用Axon Server来串联起各个服务。

第四部分:我们将测试我们的Saga应用程序。

这整个应用程序可以在Github上找到。

现在让我们开始动手吧。

Saga模式实现类型问题

正如我们前面所讨论的,我们将使用基于编排方式的Saga方法来实现Saga模式。当然基本协调方式的Saga也可以实现。

换句话说,编排器就是一个管理器,它编排参与的服务执行一个或多个本地事务。

下图显示了这种方法(基于编排)的一个典型示例:


图片.png

在这里,我们着眼于基于编排的Saga来解决订单管理的问题。这个订单管理问题可以应用于任何电子商务商店、食品配送应用程序或任何其他类似的用例。

当然,出于演示目的,我们将这个问题简化了很多,它比实际生产级别的复杂性要简单得多。

顶层组件

关于我们的Saga模式实现,这个应用程序有5个主要部分。各部分内容如下:

  • 订单服务:

该服务暴露api,帮助在系统中创建订单。此外,该服务还管理订单聚合。订单聚合只是一个维护订单相关信息的实体。然而,订单服务还是订单管理Saga的一个子服务(维护订单内部的事务)。

  • 支付服务:

支付服务根据订单管理Saga发出的创建发票命令进行操作。一旦它完成了它的工作,它就发布一个事件。这一事件将这个Saga推向了下一步流程。

  • 运输服务:

该服务负责在系统中创建与订单相关的发货。它根据Saga管理器发出的命令执行对应的动作。一旦它完成了它的任务,它还会发布一个推动Saga推向下一步流程。

  • 核心API( Core-APIs) :

这不是一个服务。然而,核心api充当了各种服务之间的集成粘合剂,这些服务构成了这个Saga的一部分。在我们的示例中,核心api将包含Saga实现运行所需的各种命令和事件定义。

  • Axon Server

Axon服务器是Axon平台的一部分。我们将使用Axon框架来管理我们的聚合,如订单、支付、发货。此外,我们将使用Axon服务器来处理这三个服务之间的通信。如果你想了解更多关于Axon服务器的细节,可以查看我的文章

让我们从具体的细节开始吧

整个应用架构

下面是我们的应用程序的总体工程结构图。

.
├── core-apis
├── order-service
├── payment-service
├── pom.xml
├── saga-axon-server-spring-boot.iml
└── shipping-service

如您所见,它是一个多maven模块结构。每个服务都是maven子模块,是整个项目的一部分。

这个pom.xml文件将他们合并在一起。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.progressivecoder.saga-pattern</groupId>
    <artifactId>saga-axon-server-spring-boot</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>order-service</module>
        <module>payment-service</module>
        <module>core-apis</module>
        <module>shipping-service</module>
    </modules>

</project>

订单服务的实现

Order Service(以及所有其他服务),我们将用Spring Boot来构建应用程序。如果您不知道Spring Boot或想要更新,请参阅我关于Spring Boot微服务的文章

下面是这个应用的依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Axon -->
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-spring-boot-starter</artifactId>
            <version>4.0.3</version>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- Swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>

        <dependency>
            <groupId>com.progressivecoder.saga-pattern</groupId>
            <artifactId>core-apis</artifactId>
            <version>${project.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

以下是一些需要考虑的重要问题:

  • 我们使用的是Axon Spring Boot starter(版本4.0.3)。他对对Axon框架和Axon服务器提供了比较好的接入。

  • 为了方便测试我们的应用程序,我们还包括使用了Swagger。

  • 我们使用Spring Boot Starter Data JPA和基于内存实现的H2数据库作为持久层。

  • 此外,我们还有另一个称为core-api的模块。我们一会再回来讲。

订单聚合根

订单聚合是Saga模式实现中最重要的部分之一。它是订单管理Saga工作的基础。让我们看看它是怎样的:

@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;

    private ItemType itemType;

    private BigDecimal price;

    private String currency;

    private OrderStatus orderStatus;

    public OrderAggregate() {
    }

    @CommandHandler
    public OrderAggregate(CreateOrderCommand createOrderCommand){
        AggregateLifecycle.apply(new OrderCreatedEvent(createOrderCommand.orderId, createOrderCommand.itemType,
                createOrderCommand.price, createOrderCommand.currency, createOrderCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderCreatedEvent orderCreatedEvent){
        this.orderId = orderCreatedEvent.orderId;
        this.itemType = ItemType.valueOf(orderCreatedEvent.itemType);
        this.price = orderCreatedEvent.price;
        this.currency = orderCreatedEvent.currency;
        this.orderStatus = OrderStatus.valueOf(orderCreatedEvent.orderStatus);
    }

    @CommandHandler
    protected void on(UpdateOrderStatusCommand updateOrderStatusCommand){
        AggregateLifecycle.apply(new OrderUpdatedEvent(updateOrderStatusCommand.orderId, updateOrderStatusCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderUpdatedEvent orderUpdatedEvent){
        this.orderId = orderId;
        this.orderStatus = OrderStatus.valueOf(orderUpdatedEvent.orderStatus);
    }
}

如您所见,这是一个典型的实体类。这里需要注意的主要事项是这里有axon自己的的注解@Aggregate和@AggregateIdentifier。这些注解允许Axon框架管理订单聚合实例。

此外,我们正在使用事件溯源来存储在此聚合上发生的事件。事件溯源是另一种微服务体系结构模式,它们存储领域事件然后构建聚合信息。如果你想了解更多的话,我有一个关于事件来源实现的详细文章

订单Service层

为了方便创建订单,我们还定义了一个订单服务接口及其相应的实现。

public interface OrderCommandService {

    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO);

}
@Service
public class OrderCommandServiceImpl implements OrderCommandService {

    private final CommandGateway commandGateway;

    public OrderCommandServiceImpl(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @Override
    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO) {
        return commandGateway.send(new CreateOrderCommand(UUID.randomUUID().toString(), orderCreateDTO.getItemType(),
                orderCreateDTO.getPrice(), orderCreateDTO.getCurrency(), String.valueOf(OrderStatus.CREATED)));
    }
}

该service服务层实现使用Axon框架的命令网关向聚合发出命令。该命令在我们前面声明的聚合类中处理。

订单Controller层

Order Controller类是我们创建API端点的地方。此时,出于演示的目的,我们只有一个端点(一个api接口)。

@RestController
@RequestMapping(value = "/api/orders")
@Api(value = "Order Commands", description = "Order Commands Related Endpoints", tags = "Order Commands")
public class OrderCommandController {

    private OrderCommandService orderCommandService;

    public OrderCommandController(OrderCommandService orderCommandService) {
        this.orderCommandService = orderCommandService;
    }

    @PostMapping
    public CompletableFuture<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO){
        return orderCommandService.createOrder(orderCreateDTO);
    }
}

为了帮助Swagger找到这些端点并暴露swagger api,我们还得配置Swagger。

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket apiDocket(){
        return new Docket(DocumentationType.SWAGGER_2)
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.progressivecoder.ordermanagement"))
                .paths(PathSelectors.any())
                .build()
                .apiInfo(getApiInfo());
    }

    private ApiInfo getApiInfo(){
        return new ApiInfo(
                "Saga Pattern Implementation using Axon and Spring Boot",
                "App to demonstrate Saga Pattern using Axon and Spring Boot",
                "1.0.0",
                "Terms of Service",
                new Contact("Saurabh Dashora", "progressivecoder.com", "coder.progressive@gmail.com"),
                "",
                "",
                Collections.emptyList());
    }

}

订单管理Saga

Saga模式实现的核心是订单管理Saga。简而言之,这也是一个典型的Java类,它就是一个流程管理器,用于处理业务的流程。

@Saga
public class OrderManagementSaga {

    @Inject
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderCreatedEvent orderCreatedEvent){
        String paymentId = UUID.randomUUID().toString();
        System.out.println("Saga invoked");

        //associate Saga
        SagaLifecycle.associateWith("paymentId", paymentId);

        System.out.println("order id" + orderCreatedEvent.orderId);

        //send the commands
        commandGateway.send(new CreateInvoiceCommand(paymentId, orderCreatedEvent.orderId));
    }

    @SagaEventHandler(associationProperty = "paymentId")
    public void handle(InvoiceCreatedEvent invoiceCreatedEvent){
        String shippingId = UUID.randomUUID().toString();

        System.out.println("Saga continued");

        //associate Saga with shipping
        SagaLifecycle.associateWith("shipping", shippingId);

        //send the create shipping command
        commandGateway.send(new CreateShippingCommand(shippingId, invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderShippedEvent orderShippedEvent){
        commandGateway.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderUpdatedEvent orderUpdatedEvent){
        SagaLifecycle.end();
    }
}
  • @StartSaga注解表示Saga的开始。它告诉Axon去创造一个新的Saga。Saga还与聚合的一个特定实例相关联。这是通过使用@SagaEventHandler指定的associationProperty来完成的。在本例中,我们使用属性orderId将Saga与订单聚合的实例关联起来。我们还指定了应该调用该方法的事件。在我们的例子中,它是Order Created Event。

  • 被@SagaEventHandler注解的其他方法表示属于Saga的其他事务(处理另一种业务)。

  • 我们还使用sagalifcycle .associate with()方法将Saga与其他业务关联起来,比如支付和运输。通过允许客户机生成唯一标识符,我们不必遵循请求-响应模型。这使我们能够轻松地将标识符与Saga关联起来。

  • 我们使用 SagaLifecycle.end()来结束这个Saga。

Core-APIs

现在是了解core - api模块的好时机(command,event没有setter方法)。

核心api只是一堆命令和事件的类,这些命令和事件将成为Saga模式实现的一部分。

命令(Commands)

在我们的应用程序中创建新订单时,将触发Create Order Command。此命令由订单聚合处理。

public class CreateOrderCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public CreateOrderCommand(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

接下来,创建订单时,订单管理Saga将触发Create Invoice Command。

public class CreateInvoiceCommand{

    @TargetAggregateIdentifier
    public final String paymentId;

    public final String orderId;

    public CreateInvoiceCommand(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

当发票创建和付款处理完成时,订单管理Saga也会触发Create Shipping Command。

public class CreateShippingCommand {

    @TargetAggregateIdentifier
    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public CreateShippingCommand(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我们有Update Order Status Command。当运输完成后,将触发此命令。

public class UpdateOrderStatusCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String orderStatus;

    public UpdateOrderStatusCommand(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

事件(Events)

整个过程中的第一个事件是订单创建事件(Order Created Event.)。就像我们之前看到的,这个事件也是开始这个Saga的原因。

public class OrderCreatedEvent {

    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public OrderCreatedEvent(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

下一个事件是Invoice Created Event。Invoice服务发布此事件。我们将在下一篇文章中看到它的实现。

public class InvoiceCreatedEvent  {

    public final String paymentId;

    public final String orderId;

    public InvoiceCreatedEvent(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

在那之后,我们有Order Shipped Event。该事件由Shipping服务在完成必要的操作后发布。

public class OrderShippedEvent {

    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public OrderShippedEvent(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我们有Order Updated Event。此事件由订单聚合在更新订单状态后发布。

public class OrderUpdatedEvent {

    public final String orderId;

    public final String orderStatus;

    public OrderUpdatedEvent(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

结论:

至此,我们已经成功实现了应用程序的两个主要部分——订单服务和核心api。

Order服务包含了我们的主要Saga模式实现代码。另一方面,核心api是我们订单管理Saga的支柱。

我们将在这里结束这篇文章,因为它已经变得相当长。然而,整个Saga实现的代码可以在Github上获得以供参考。

在下一篇文章中,我们将开始实现其他服务并将它们连接到Axon服务器。所以请继续关注。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348

推荐阅读更多精彩内容