Command Dispatching
使用显示的命令调度机制具有许多优点。首先,有一个明确描述客户端意图的对象。通过记录命令,您可以存储意图和相关数据以备将来参考。命令处理也可以很容易地通过Web服务将命令处理组件开放给远程客户端。测试也变得容易多了,你可以定义命令的开始情境,命令执行(当)和预期的结果(然后)列出一些事件和命令(见测试)来定义测试脚本。最后一个主要优点是在同步和异步以及本地和分布式命令处理之间切换是非常容易的。
这并不意味着使用显式命令对象进行命令分发是唯一的方法。 Axon的目标不是规定一种特定的工作方式,而是支持你按照自己的方式来做,同时提供最佳实践作为默认行为。仍然可以使用可以调用的服务层来执行命令。该方法只需要启动一个工作单元(请参阅工作单元),并在方法结束时执行提交或回滚。
下一节将概述与使用Axon框架设置Command调度基础架构有关的任务。
命令路由
命令网关是一个方便的命令调度机制的接口。虽然不需要使用网关来发送命令,但通常这是最简单的选择。
有两种使用Command Gateway的方法。首先是使用由Axon提供的CommandGateway接口和DefaultCommandGateway实现。命令网关提供了许多方法,允许您发送命令并同步等待结果,还有超时或异步方式。
另一种选择也许是最灵活的。使用CommandGatewayFactory可以把几乎任何接口变成一个命令网关。这允许你使用强类型定义你的应用程序的接口,并声明自己的(checked)业务异常。Axon将在运行时自动为该接口生成一个实现。
配置网关路由
您的自定义网关和Axon提供的自定义网关都需要配置对Command Bus的访问。另外,Command Gateway可以配置一个RetryScheduler,CommandDispatchInterceptors和CommandCallbacks。
当命令执行失败时,RetryScheduler能够进行重试。 IntervalRetryScheduler是他的一个实现,它将以设定的时间间隔重试给定的命令,直到成功或重试次数达到最大值。当一个命令由于明确的非暂时的异常而失败时,根本不会执行重试。请注意,重试scheduler 程序仅在由于RuntimeException而导致命令失败时才会调用。如果该异常是“业务异常”,则不会触发重试。 RetryScheduler的典型用法是在分布式Command Bus上分发命令时。如果某个节点发生故障,则重试调度程序将导致将命令分派给下一个能够处理该命令的节点(请参阅分发命令总线)。
CommandDispatchInterceptors允许在将CommandMessages分发给Command Bus之前进行修改。与在CommandBus上配置的CommandDispatchInterceptors相比,只有在通过此网关发送消息时,才会调用这些拦截器。拦截器可用于将元数据附加到命令或进行验证,例如。
为每个发送的命令调用CommandCallbacks。这允许通过该网关发送的所有命令的一些通用行为,而不管它们的类型如何。
自定义网关路由
Axon允许自定义接口用作命令网关。在接口中声明的每个方法的行为都是基于参数类型,返回类型和声明的异常。使用这个网关不仅方便,而且允许你在需要的地方模拟你的接口,使得测试变得更容易。
这些参数在CommandGateway的作用:
1.第一个参数是期望要发送的实际命令对象。
2.用@MetaDataValue注解的参数将其值分配给元数据字段,其中标识符作为注解参数传递
3.MetaData类型的参数将与CommandMessage上的元数据合并。如果后者参数定义的元数据相同,则将覆盖较早参数的元数据。
4.CommandCallback类型的参数将在处理命令后调用onSuccess或onFailure。您可能会传入多个回调,并可能与返回值组合在一起。在这种情况下,回调的调用将始终与返回值(或异常)相匹配。
5.最后两个参数可以是long(或int)和TimeUnit类型。在这种情况下这个方法至多会被阻塞为这些参数声明的时间。该方法对超时作出什么响应取决于方法中声明的异常(见下文)。请注意,如果方法的其他属性完全阻止了阻塞,超时将不会发生。
方法的声明返回值也会影响其行为:
- 返回类型为void将导致方法立即返回,除非在方法上还有其他的指示要等待,比如超时或声明的异常。
- Future,CompletionStage和CompletableFuture的返回类型将使该方法立即返回。您可以使用方法返回的CompletableFuture实例来访问命令处理程序的结果。方法中声明的异常和超时将会被忽略。
- 任何其他返回类型将导致方法阻塞,直到有真实的结果。结果将转换为返回类型(如果类型不匹配,则会导致ClassCastException)。
异常对其的影响:
- 如果命令处理程序(或拦截器)抛出异常,则将抛出任何已声明的已检查异常。如果受检异常尚未声明,它被包裹在一个CommandExecutionException中,这是一个RuntimeException。
- 发生超时时,默认行为是从方法返回一个null值。这可以通过声明一个TimeoutException来改变。如果声明了这个异常,则抛出一个TimeoutException。
- 当一个线程在等待结果时被中断,默认的行为是返回null。在这种情况下,被中断的标志在线程上被重新设置。如果在方法上声明一个InterruptedException,该行为会被改为抛出异常。当抛出异常时,中断标志被移除,与java规范一致。
- 其他运行时异常可以在方法上声明,但除了对API用户进行说明之外,不会有任何影响。
最后,有可能使用的注解:
1.在参数部分指定的,用@MetaDataValue注解的参数的值作为元数据值添加。元数据项的key作为参数提供给注解。
2用@Timeout注解的方法最多会阻塞到指定的时间。如果方法声明超时参数,这个注解将被忽略。
3.用@Timeout注解的类,将导致这个类中声明的所有方法,最多阻塞到指定的时间。除非他们用自己的@Timeout注解或指定超时参数。
public interface MyGateway {
// fire and forget
void sendCommand(MyPayloadType command);
// method that attaches meta data and will wait for a result for 10 seconds
@Timeout(value = 10, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command,
@MetaDataValue("userId") String userId);
// alternative that throws exceptions on timeout
@Timeout(value = 20, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command)
throws TimeoutException, InterruptedException;
// this method will also wait, caller decides how long
void sendCommandAndWait(MyPayloadType command, long timeout, TimeUnit unit)
throws TimeoutException, InterruptedException;
}
// To configure a gateway:
CommandGatewayFactory factory = new CommandGatewayFactory(commandBus);// note that the commandBus can be obtained from the Configuration
object returned on configurer.initialize()
.
MyGateway myGateway = factory.createGateway(MyGateway.class);
命令总线
命令总线是将命令分发到它们各自的命令处理程序。每个命令总是发送到一个命令处理程序。如果分发的命令没有找到可以处理的命令处理程序,则引发NoHandlerForCommandException异常。如果多个command handler订阅了同一类型的command,那么他们会以先后顺序来覆蓋上一个,也就是最后一个订阅的会生效。
分发命令
CommandBus提供了两种方法来将命令分发到各自的处理程序:dispatch(commandMessage,callback)和dispatch(commandMessage)。第一个参数是包含实际分发的命令的消息。可选的第二个参数是一个回调接口,允许在命令处理完成时通知须要回调的方法。这个回调函数有两个方法:onSuccess()和onFailure(),分别在命令处理正常返回或抛出异常时调用。
调用组件可能不采取在分发命令的同一线程中调用回调函數。如果调用线程在继续处理时会依赖于之前结果,则可以使用FutureCallback。它是Future的一个组合(在java.concurrent包中定义)和Axon的CommandCallback。或者,考虑使用命令网关。
如果应用程序不直接对Command的结果感兴趣,则可以使用dispatch(commandMessage)方法。
SimpleCommandBus
SimpleCommandBus顾名思义就是最简单的实现。它可以直接处理调度它们的线程中的命令。处理命令后,修改后的聚合被保存,生成的事件将在同一个线程中发布。在大多数情况下,如Web应用程序,它和您的需要很匹配。 SimpleCommandBus是配置API中默认使用的实现。
像大多数CommandBus实现一样,SimpleCommandBus也可以配置拦截器。在命令总线上分发命令时,将调用CommandDispatchInterceptors。在实际的命令处理程序方法之前调用CommandHandlerInterceptors,允许您修改或阻止该命令。有关更多信息,请参阅命令拦截器。
由于所有的命令处理都是在同一个线程中完成的,所以这个实现仅限于JVM内。这个性能的表现是很不错的,但不是最叼的。要跨越JVM边界,或充分利用CPU周期,请查看其他CommandBus实现。
AsynchronousCommandBus
顾名思义,AsynchronousCommandBus使用异步线程来执行命令。它使用Executor分配新的线程来处理实际的业务邏輯.
默认情况下,AsynchronousCommandBus使用无限制的缓存线程池。这意味着一个命令分发时就会有一个线程被创建。完成处理命令的线程将继续被用于处理新命令。如果60秒线程没有处理命令,则会停止线程。
另外,可以给Executor实例配置不同的线程策略。
请注意,在停止应用程序时,应关闭AsynchronousCommandBus,以确保任何等待的线程已正确关闭。要关闭的话,请调用shutdown()方法。这也将关闭任何提供的Executor实例,如果它实现了ExecutorService接口的话。
DisruptorCommandBus
SimpleCommandBus在性能方面有自己的特性,特别是你有性能调优的经历的话就很容易理解。 事实上,SimpleCommandBus需要锁来防止多个线程并发访问同一聚合,导致处理开销和锁争用。
DisruptorCommandBus采取不同的方法来处理多线程。不是多个线程每个都执行同样的处理,而是有多个线程,每个负责一件处理。DisruptorCommandBus使用Disruptor(http://lmax-exchange.github.io/disruptor/)(一个用于并发编程的小型框架),通过采用不同的多线程方法来获得更好的性能。不是在调用线程中进行处理,而是将这些任务交给两组线程,每个线程处理一部分处理。第一组线程将执行命令处理程序,更改聚合的状态。第二组将存储事件并将其发布到事件存储。
虽然DisruptorCommandBus优于SimpleCommandBus 4倍(!),但有一些限制:
- DisruptorCommandBus仅支持事件溯源的聚合。该命令总线也充当Disruptor处理的集合的仓储。要获取对存储库的引用,请使用createRepository(AggregateFactory)。
- 命令只能导致单个聚合实例中的状态更改。
- 当使用缓存时,它只允许给定的标识符为单个聚合。这意味着它是不可能有两个具有相同的标识符的不同类型的聚合。
- 命令通常不会导致需要回滚工作单元的失败。发生回滚时,DisruptorCommandBus不能保证命令按照它们的分发顺序进行处理。此外,它需要重试一些其他命令,会导致不必要的计算。
- 在创建一个新的聚合实例时,命令更新所创建实例可能并不完全按照所提供的顺序进行。一旦创建了聚合,所有命令将按照它们被分发顺序执行。为了确保顺序,在创建命令上使用回调去等待正在创建的聚合。它不应该耗时超过几毫秒。
要构建一个DisruptorCommandBus实例,您需要一个EventStore。该组件在“存储库和事件存储”中进行了说明。
或者,你通过DisruptorConfiguration实例来配置优化在特定环境下的性能:
- 缓冲区大小:用于注册传入命令的环形缓冲区上的插槽数量。更高的值可能会增加吞吐量,但也会导致更高的延迟。其值必须始终是2的幂。默认为4096。
- 生产者类型:声明这个实体是由多个或者单个线程生成,默认为多个
- 等待策略:处理器线程(假如有三个线程负责实际处理)需要使用的策略需要彼此等待。WaitStrategy取决于机器中可用的内核数量,以及运行的其他进程的数量。如果你比较关心低延迟,并且DisruptorCommandBus可以为自己申请核心,那么可以使用BusySpinWaitStrategy。如果想命令总线获取更少的CPU资源并允许其他线程进行处理,可以使用YieldingWaitStrategy。最后,您可以使用SleepingWaitStrategy和BlockingWaitStrategy来允许其他进程公平地竞争CPU。如果命令总线不希望占用全部的时间来处理,后者更合适。默认策略为BlockingWaitStrategy。
- Executor:我们可以通过设置Executor来修改DisruptorCommandBus所须的线程数。DisruptorCommandBus使用的是Executor线程池来处理。这个Executor 必须能够提供至少4个线程。其中有3个是供DisruptorCommandBus组件使用的。另外的线程用于调用回调函数,并在检测到Aggregate的状态不正确的情况下进行重试。默认实现是一个CachedThreadPool,它提供了一个名为“DisruptorCommandBus”的线程组的线程。
- 事务管理:他作为是确保事件的存储和发布在一个事务中完成。
- InvokerInterceptors:在Command Handler被调用的时候会调用(如果已定义)CommandHandlerInterceptor
- PublisherInterceptors:在存储事件和发布事件的时候会调用(如果已定义)PublisherInterceptors
- RollbackConfiguration:表示Unit of Work什么样的异常会被回滚。默认为unchecked的异常会被回滚。
- RescheduleCommandsOnCorruptState:声明Command已经被执行,但是对于聚合而言这次操作是不是成功(例如,因为工作单元已回滚),须要进行再次重新处理。如果值为false,那么会调用onFailure()而不会进行重新处理。如果值是true(默认值),那么该命令会被重新处理。
- CoolingDownPeriod:设置等待的秒数主要是为了确保所有命令能够被处理。在这个等待时间内,他不会接收新的命令,但是现在正在处理的命令在必要的时候可以进行重新安排处理。冷却期主要是确保线程可用于重新安排处理命令和调用回调函数,他的默认值是1000(1秒)。
- Cache:设置缓存存储从Event Store中恢复的聚合实例。缓存用disruptor来存储那些不活跃的聚合实例
- InvokerThreadCount:分配给调用命令处理程序的线程数。最好的方法是先设置其值为机器内核数量的一半。
- PublisherThreadCount:用于发布事件的线程数。最好的方法是先设置其值为内核数量的一半,如果在IO上花费大量时间,可以适当增加这个值。
- SerializerThreadCount:用于预先序列化事件的线程数。默认值为1,但是如果没有配置序列化器,就不管。
- Serializer:用于执行预先序列化的序列化器。如果配置了序列化程序,DisruptorCommandBus会将所有生成的事件包装在SerializationAware消息中。payload和元数据的序列化会在事件发布到 Event Store之前处理。
命令拦截器
使用命令总线的优点之一是能够根据所有传入命令进行操作。例如日志记录或认证,无论是什么命令类型,您都可能想要执行此操作。这是使用拦截器完成的。
axon提供了不同类型的拦截器:Dispatch 拦截器和处理器(Handler )拦截器。Dispatch拦截器会在命令分发给命令处理程序之前调用。在那个时候,甚至还不能确定这个命令是否有匹配的Command Handler。Handler拦截器在调用Command Handler之前被调用。
消息分发拦截器
Message Dispatch Interceptors会在消息到达命令总线上时调用。他们可以把这些命令消息进行修改,如添加元数据,或者抛出异常来阻止命令继续传递。这些拦截器总是在调度Command的线程上调用。这些拦截器总是在分发命令的线程上被调用。
Structural validation
如果传入的命令想要被正确处理却没有包含必要的信息,则没有意义。事实上,一个不正确的命令应该尽早被阻止,最好甚至在任何事务开始之前。因此,拦截器应该检查所有传入的命令信息来判断有效性。这就是所谓的structural validation。
Axon框架支持基于JSR 303 Bean Validation的验证。您可以使用@NotEmpty和@Pattern之类的注解来注解命令上的字段。您需要在类路径中包含JSR 303实现(如Hibernate-Validator)。然后,在你的命令总线上配置一个BeanValidationInterceptor,它会自动查找并配置你的验证器实现。虽然它使用合理的默认值,但您可以根据自己的特定需求进行微调。
建议:你想花尽可能少的资源在一个无效的命令上。所以这个拦截器一般都放在拦截器链的最前面。在某些情况下,你须要将日志记录或Auditing 拦截器放置在前面,而紧接着在后面添加一个验证拦截器。
BeanValidationInterceptor也实现了MessageHandlerInterceptor,允许你配置它为Handler Interceptor。
消息处理拦截器
消息处理拦截器可以在命令处理之前和之后执行操作。拦截器甚至可以完全禁止命令处理,例如出于安全原因。
拦截器必须实现MessageHandlerInterceptor接口。这个接口只有一个方法叫handle(),它有三个参数:命令消息,当前的UnitOfWork和一个InterceptorChain。 InterceptorChain是很多Interceptor串联起来处理消息的chain。
与Dispatch 拦截器不同,处理程序拦截器是在命令处理程序的上下文中调用的。这意味着他们可以将基于正在处理的消息的关联数据附加到工作单元。这个关联数据将被附加到在该工作单元的上下文中创建的消息上。
处理程序拦截器通常也用于管理处理命令的事务。为此,注册一个TransactionManagingInterceptor,它又配置了一个TransactionManager来启动和提交(或回滚)实际的事务。
分布式命令总线
前面介绍的CommandBus实现只允许在单个JVM中分发命令消息。有时,你可以把Command Bus配置在多个JVM。在一个JVM的命令总线上分发的命令应该无缝地传送到另一个JVM中的 Command Handle,同时也能收到结果。
这就是DistributedCommandBus的功能。与其他CommandBus实现不同,DistributedCommandBus根本不调用任何处理程序。它所做的只是在不同的JVM上的命令总线实现之间形成一个“桥梁”。每个JVM上的每个DistributedCommandBus实例称为“Segment”。
注意:尽管分布式命令总线本身是Axon Framework Core模块的一部分,但它需要的组件,你可以在其中一个以axon-distributed-commandbus -* 的模块中找到。如果你使用Maven,可以使用添加dependencies来获取 ,他们的groupId和version与Core模块的相同。
DistributedCommandBus依赖于两个组件:实现JVM之间的通信协议的CommandBusConnector和为每个传入的命令选择目的地的CommandRouter。该路由器基于由路由策略计算的路由key来定义应该给予分布式命令总线的哪个segment 。具有相同路由key的两个命令将始终路由到相同的网段,只要这些网段的数量和配置没有变化。通常情况下,目标聚合的标识被用作路由key。
axon提供了两个RoutingStrategy实现:MetaDataRoutingStrategy,它使用命令消息中的元数据属性来查找路由key;以及AnnotationRoutingStrategy,它使用命令消息payload上的@TargetAggregateIdentifier注解来提取路由key。显然,你也可以提供你自己的实现。
默认情况下,如果从命令消息中没有找到key,则RoutingStrategy会抛出异常。通过在MetaDataRoutingStrategy或AnnotationRoutingStrategy的构造函数中提供UnresolvedRoutingKeyPolicy,可以更改此行为。有三个可能的政策:
ERROR:这是默认设置,当路由key不可用时将引发异常
RANDOM_KEY:当无法从命令消息解析路由key时,将返回一个随机值。这实际上意味着这些命令将被路由到命令总线的一个随机segment 。
STATIC_KEY:如果没有找到路由Key,那么他将返回一个固定key.这实际上意味着只要segments 的配置没有改变,所有这些命令将被路由到同一个segments 。
JGroupsConnector
JGroupsConnector使用JGroups作为底层的发现和调度机制(作为名称已经提供)。对于本参考指南,描述JGroups的功能集有点多,所以请参阅JGroups用户指南了解更多详细信息。
由于JGroups同时处理节点的发现和它们之间的通信,所以JGroupsConnector既充当CommandBusConnector又充当CommandRouter。
注意:您可以在axon-distributed-commandbus-jgroups模块中找到DistributedCommandBus的JGroups相关的组件。
JGroupsConnector有四个必需的配置元素:
1.第一个是JChannel,它定义了JGroups协议栈。通常,JChannel是通过引用JGroups配置文件构建的。 JGroups提供了许多默认配置,可以作为您自己配置的基础。请记住,IP组播通常不能在云服务中使用,例如Amazon。 TCP Gossip通常在这种类型的环境中是不错的选择。
2.集群里的每个segment都应该根据名称注册到对应的集群上。具有相同的集群名称的Segment最终会知道到彼此,并在彼此间分发命令。
3.“本地segment”是命令总线的实现,它分发命令到本地JVM。这些命令可能已由其他JVM或本地实例分发。
4.最后,Serializer的作用是在命令消息在发送之前进行序列化。
注意:当使用缓存时,当ConsistentHash发生改变时应该将其清除,以避免潜在的数据错误(例如,当命令未指定@TargetAggregateVersion并且新成员快速加入并离开JGroup时,修改聚合然而它还要缓存到其他地方)。
最终,JGroupsConnector需要实际连接,以便将消息发送到其他段。为此,请调用connect()方法。
JChannel channel = new JChannel("path/to/channel/config.xml");
CommandBus localSegment = new SimpleCommandBus();
Serializer serializer = new XStreamSerializer();
JGroupsConnector connector = new JGroupsConnector(channel, "myCommandBus", localSegment, serializer);
DistributedCommandBus commandBus = new DistributedCommandBus(connector, connector);
// on one node:
commandBus.subscribe(CommandType.class.getName(), handler);
connector.connect();
// on another node, with more CPU:
commandBus.subscribe(CommandType.class.getName(), handler);
commandBus.subscribe(AnotherCommandType.class.getName(), handler2);
commandBus.updateLoadFactor(150); // defaults to 100
connector.connect();
// from now on, just deal with commandBus as if it is local...
注意:
请注意,不要求所有segments 都具有用于相同类型的命令的命令处理程序。您可以为不同的命令类型使用不同的段。分布式命令总线将始终选择一个节点来分派一个命令,以支持该特定类型的命令。
如果你使用Spring,你可以考虑使用JGroupsConnectorFactoryBean。它在ApplicationContext启动时自动连接Connector,并在ApplicationContext关闭时进行断开连接。此外,它对测试环境使用合理的默认值(但不应将其视为生产就绪),并对配置进行自动装配。
Spring Cloud Connector
Spring Cloud Connector程序使用Spring Cloud描述的服务注册和发现机制来分发命令总线。因此,您可以自由选择使用哪种Spring Cloud实现来分发您的命令。一个示例实现是Eureka Discovery / Eureka服务器组合。
注意:SpringCloudCommandRouter使用Spring Cloud 里的ServiceInstance来实现.Metadata字段通知系统中所有节点的消息路由信息。因此,所选的Spring Cloud实现支持ServiceInstance.Metadata字段的使用是非常重要的。如果所需的Spring Cloud实现不支持修改ServiceInstance.Metadata(例如Consul),那么SpringCloudHttpBackupCommandRouter是一个可行的解决方案。有关SpringCloudHttpBackupCommandRouter的配置细节,请参阅本章末尾的内容。
提供每个SpringCloud实现的描述将推动本参考指南。因此,我们参考他们各自的文件以获得进一步的信息。
Spring Cloud连接器装置是一个SpringCloudCommandRouter和SpringHttpCommandBusConnector的组合,分别填充CommandRouter的地点和 DistributedCommandBus的CommandBusConnector。
注意:Spring Cloud Connector中的DistributedCommandBus相关组件可以在axon-distributed-commandbus-springcloud模块中找到。
SpringCloudCommandRouter必须通过提供以下内容来创建:
1.DiscoveryClient类型的“发现客户端”。这可以通过使用@EnableDiscoveryClient注解您的Spring Boot应用程序来提供,该应用程序将在您的类路径中查找Spring Cloud实现。
- RoutingStrategy类型的“路由策略”。axon核心模块目前提供了几个实现,但也你也可以通用调用函数来实现。例如,如果您想根据“聚合标识符”路由命令,则可以使用AnnotationRoutingStrategy并在payload上标注使用@TargetAggregateIdentifier标识聚合的字段。
SpringHttpCommandBusConnector创建所需要的三个参数:
- CommandBus类型的“本地命令总线”。这是将命令分发到本地JVM的命令总线上。这些命令可能已由其他JVM上的实例或本地实例发布。
- RestOperations对象是将执行命令消息发布到另一个实例。
- 最后是Serializer类型的“序列化器”。他的作用是在命令消息发送到命令总线之前进行序列化。
SpringCloudCommandRouter和SpringHttpCommandBusConnector都是用于创建DistributedCommandsBus的。在Spring Java配置中,他们看起来如下所示:
// Simple Spring Boot App providing the DiscoveryClient
bean@EnableDiscoveryClient@SpringBootApplicationpublic class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
// Example function providing a Spring Cloud Connector
@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy());
}
@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
RestOperations restOperations,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restOperations, serializer);
}
@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}
}
// if you don't use Spring Boot Autoconfiguration, you will need to explicitly define the local segment:@Bean@Qualifier("localSegment")public CommandBus localSegment() {
return new SimpleCommandBus();
}
注意:请注意,不要求所有segments 都具有用于相同类型的命令的命令处理程序。您可以为不同的命令类型使用不同的segments 。分布式命令总线将始终选择一个节点来分发一个命令,以支持该指定类型的命令。
Spring Cloud Http Back Up Command Router
在内部,SpringCloudCommandRouter使用Spring Cloud ServiceInstance中包含的Metadata映射在axon分布式环境中传递路由消息信息。如果所需的Spring Cloud实现不允许修改ServiceInstance.Metadata字段(例如Consul),则可以选择实例化SpringCloudHttpBackupCommandRouter而不是SpringCloudCommandRouter。
顾名思义,SpringCloudHttpBackupCommandRouter具有备份机制,如果ServiceInstance.Metadata字段不包含预期的路由消息信息。该备份机制是提供可以从中检索消息路由信息的HTTP端点,并且通过同时添加功能来查询集群中其他已知节点的该端点以检索其消息路由信息。因此,备份机制函数是一个Spring控制器,用于在可指定端点接收请求,并使用RestTemplate向可指定端点上的其他节点发送请求。
要使用SpringCloudHttpBackupCommandRouter而不是SpringCloudCommandRouter,添加下面的Spring Java配置(它取代了我们前面例子中的SpringCloudCommandRouter方法):
@Configurationpublic class MyApplicationConfiguration {
@Bean
public CommandRouter springCloudHttpBackupCommandRouter(DiscoveryClient discoveryClient,
RestTemplate restTemplate,
@Value("${axon.distributed.spring-cloud.fallback-url}") String messageRoutingInformationEndpoint) {
return new SpringCloudHttpBackupCommandRouter(discoveryClient, new AnnotationRoutingStrategy(), restTemplate, messageRoutingInformationEndpoint);
}
}