Spring-Kafka(四)—— KafkaTemplate发送消息及结果回调

在前几章中,我们使用KafkaTemplate.send(String data)这个方法发送消息到Kafka中,显然这个方法并不能满足我们系统的需求,那我们需要查看一下KafkaTemplate所实现的接口,看看还提供了什么方法。当我们发送消息到Kafka后,我们又怎么去确认消息是否发送成功呢?这就涉及到KafkaTemplate的发送回调方法了。接下来我们开始正式讲解。

查看发送接口

首先我们Ctrl+鼠标左键进入KafkaTemplate的源代码中查看一下,可以看到有关发送的接口如下。这里的参数还是比较简单的,值得一提的事,方法中有个Long类型的时间戳(timestamp)参数,这是Kafka0.10版本提供的新功能,主要用来使用时间索引进行查询数据以及日志切分清除策略。还有一个ProducerRecord参数,这个类其实就是整合了topic、partition、data等数据的消费实体类。


稍微提一下这些参数都是什么意思吧:
topic:这里填写的是Topic的名字
partition:这里填写的是分区的id,其实也是就第几个分区,id从0开始。表示指定发送到该分区中
timestamp:时间戳,一般默认当前时间戳
key:消息的键
data:消息的数据
ProducerRecord:消息对应的封装类,包含上述字段
Message<?>:Spring自带的Message封装类,包含消息及消息头

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);


使用sendDefault发送消息

首先在KafkaConfiguration编写一个带有默认Topic参数的KafkaTemplate,同时为另外一个KafkaTemplate加上@Primary注解,@Primary注解的意思是在拥有多个同类型的Bean时优先使用该Bean,到时候方便我们使用@Autowired注解自动注入。

    //这个是我们之前编写的KafkaTemplate代码,加入@Primary注解
    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        return template;
    }

    @Bean("defaultKafkaTemplate")
    public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        template.setDefaultTopic("topic.quick.default");
        return template;
    }

接着编写测试方法,可以看到我们这里调用的是sendDefault方法,而且并没有在方法参数上添加topicName,这是因为我们在声明defaultKafkaTemplate这个Bean的时候添加了这行代码 template.setDefaultTopic("topic.quick.default"),只要调用sendDefault方法,kafkaTemplate会自动把消息发送到名为"topic.quick.default"的Topic中。

    @Resource
    private KafkaTemplate defaultKafkaTemplate;

    @Test
    public void testDefaultKafkaTemplate() {
        defaultKafkaTemplate.sendDefault("I`m send msg to default topic");
    }
测试结果

这里也顺便测试一下其他几个吧。

    @Test
    public void testTemplateSend() {
        //发送带有时间戳的消息
        kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");

        //使用ProducerRecord发送消息
        ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
        kafkaTemplate.send(record);

        //使用Message发送消息
        Map map = new HashMap();
        map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
        map.put(KafkaHeaders.PARTITION_ID, 0);
        map.put(KafkaHeaders.MESSAGE_KEY, 0);
        GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
        kafkaTemplate.send(message);
    }


消息结果回调

一般来说我们都会去获取KafkaTemplate发送消息的结果去判断消息是否发送成功,如果消息发送失败,则会重新发送或者执行对应的业务逻辑。所以这里我们去实现这个功能。

KafkaSendResultHandler

第一步还是编写一个消息结果回调类KafkaSendResultHandler。当我们使用KafkaTemplate发送消息成功的时候回调用OnSuccess方法,发送失败则会调用onError方法。

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}



接下来就使用KafkaSendResultHandler实现消息发送结果回调,这里为什么又要休眠,稍后进行讲解

    @Autowired
    private KafkaSendResultHandler producerListener;

    @Test
    public void testProducerListen() throws InterruptedException {
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send("topic.quick.demo", "test producer listen");
        Thread.sleep(1000);
    }



运行测试方法,我们可以看到控制台输出的日志如下

2018-09-08 15:51:39.975  INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler     : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)

KafkaTemplate异步发送消息

上文提及了发送消息的时候需要休眠一下,否则发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。主要是因为KafkaTemplate发送消息是采取异步方式发送的,我们可以看下KafkaTemplate的源代码


这是我们刚才调用的发送消息方法,可以看到KafkaTemplate会使用ProducerRecord把我们传递进来的参数再一次封装,最后调用doSend方法发送消息到Kafka中

send(String topic, V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }


ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)

doSend方法先是检测是否开启事务,紧接着使用SettableListenableFuture发送消息,然后判断是否启动自动冲洗数据到Kafka中,我们再接着看看SettableListenableFuture实现了什么接口

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }

        final Producer<K, V> producer = this.getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }

        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
                        }

                        if (KafkaTemplate.this.logger.isTraceEnabled()) {
                            KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
                        }
                    } else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord, exception);
                        }

                        if (KafkaTemplate.this.logger.isDebugEnabled()) {
                            KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
                        }
                    }
                } finally {
                    if (!KafkaTemplate.this.transactional) {
                        KafkaTemplate.this.closeProducer(producer, false);
                    }

                }

            }
        });
        if (this.autoFlush) {
            this.flush();
        }

        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }

        return future;
    }



可以看到SettableListenableFuture实现了ListenableFuture接口,ListenableFuture则实现了Future接口,Future是Java自带的实现异步编程的接口,支持返回值的异步,而我们使用Thread或者Runnable都是不带返回值的。

public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T> 


KafkaTemplate同步发送消息

KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,我们只需要在send方法后面调用get方法即可。Future模式中,我们采取异步执行事件,等到需要返回值得时候我们再调用get方法获取future的返回值

    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        kafkaTemplate.send("topic.quick.demo", "test sync send message").get();
    }



get方法还有一个比较有意思的重载方法,get(long timeout, TimeUnit unit),当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,但需要注意,这里仅抛出异常,消息还是会发送成功的。这里的测试方法设置send耗时必须小于 一微秒(那必须得失败呀,嘿嘿嘿),运行后我们可以看到抛出的异常,但也发现消息能发送成功并被监听器接收了。那这功能有什么作用呢,如果还没有接触过SQL慢查询可以去了解一下,使用该方法作为SQL慢查询记录的条件。

    @Test
    public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
        kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);
    }
2018-09-08 16:36:09.110  INFO 7724 --- [     demo-0-C-1] com.viu.kafka.listen.DemoListener        : demo receive : test send message timeout

java.util.concurrent.TimeoutException


更多文章请关注该 Spring-Kafka史上最强入门教程 专题

博主常驻地~ http://blog.seasedge.cn/archives/15.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容

  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong阅读 22,394评论 1 92
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,908评论 2 11
  • 到底好不好用呢…… 找个图片
    时了个光阅读 227评论 0 0
  • 也许以前是从来没有在外面过年的经历,又或许是以前的我从来就没有在意,然而今年的春节却让我深深地感受到了一个普通的农...
    小杨林阅读 288评论 0 0