使用 Spring Kafka 进行非阻塞重试的集成测试

Kafka的非阻塞重试是通过为主题配置重试主题来实现的。如果需要,还可以配置额外的死信主题。如果所有重试都耗尽,事件将被转发到DLT。在公共领域中有很多资源可用于了解技术细节。对于代码中的重试机制编写集成测试确实是一项具有挑战性的工作。以下是一些测试方法,可以用来验证重试机制的正确性:

  1. 验证事件已经按照所需的次数进行了重试:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为所需的次数。

  • 使用断言来验证事件是否被重试了指定的次数。

  1. 验证只有在特定的异常发生时才进行重试,而不是其他异常:
  • 在测试中,模拟不同的异常情况,包括需要重试的异常和不需要重试的异常。

  • 使用断言来验证只有特定的异常触发了重试,而其他异常没有触发重试。

  1. 验证如果前一次重试已经解决了异常,不会进行另一次重试:
  • 在测试中,模拟一个会触发重试的事件,并在每次重试之间解决异常。

  • 使用断言来验证只有在异常没有被解决的情况下才进行重试。

  1. 验证在前面的 (n-1) 次重试失败后,第 n 次重试成功:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为 n。

  • 使用断言来验证在前面的 (n-1) 次重试失败后,第 n 次重试成功。

  1. 验证如果所有的重试尝试都失败,事件是否已经发送到了死信队列:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为一个较小的值。
  • 使用断言来验证当所有的重试尝试都失败后,事件是否已经发送到了死信队列。

设置可重试的消费者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

    private final CustomEventHandler handler;

    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }

    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }

}

如果您注意上面的代码片段,参数@RetryableTopic中包含includes。这告诉消费者只在方法抛出CustomRetryableException时进行重试。您可以添加任意数量的异常类型。还有一个exclude参数,但一次只能使用其中一个。在将事件发布到死信队列之前,事件处理最多应重试指定的次数。

设置测试基础设施

为了编写集成测试,您需要确保拥有一个正常运行的Kafka代理(最好是嵌入式的)和一个完全运行的发布者。让我们设置我们的基础设施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {

  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;


    // tests

}

配置从application-test.yml文件中导入。当使用嵌入式Kafka代理时,重要的是要提及要创建的主题。它们不会自动创建。在这种情况下,我们将创建四个主题,分别是:

"test", "test-retry-0", "test-retry-1", "test-dlt"

我们将最大重试次数设置为三次。每个主题对应于每次重试尝试。因此,如果三次重试都耗尽,事件应该被转发到DLT(死信队列)。

测试用例

如果在第一次尝试中成功消费,就不应该进行重试。可以通过方法只被调用一次来测试这一点。还可以添加对日志语句的进一步测试。

 @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

如果引发了不可重试的异常,就不应该进行重试。在这种情况下,方法CustomEventHandler#handleEvent应该只被调用一次。

 @Test    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {        CustomEvent event = new CustomEvent("Hello");        // GIVEN        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));        // WHEN        testKafkaTemplate.send("test", event).get();        // THEN        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));    }

如果抛出了RetryableException,则应该按照配置的最大重试次数进行重试,当重试次数耗尽时,事件应该被发布到死信主题。在这种情况下,方法CustomEventHandler#handleEvent应该被调用三次(maxRetries次),而方法CustomEventHandler#handleEventFromDlt应该只被调用一次。

 @Test
    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

在验证阶段添加了相当长的超时时间,以便在测试完成之前考虑指数退避延迟。这是很重要的,如果没有正确设置,可能会导致断言失败。应该重试直到RetryableException被解决,并且如果引发了不可重试的异常或者最终成功消费,就不应该继续重试。测试已经设置为首先抛出RetryableException,然后再抛出NonRetryableException,以便进行一次重试。

@Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }ndleEventFromDlt(any(CustomEvent.class));    }
 @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
        }

结论

因此,您可以看到集成测试是一种混合和匹配的策略,超时时间,延迟和验证,以确保您的Kafka事件驱动架构的重试机制是可靠的。

作者: Mukut Bhattacharjee

更多技术干货尽在wx“云原生数据库”

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容