librdkafka带鉴权认证访问kafka服务器

librdkafka简介

librdkafka是用c语言实现的一个高性能的kafka客户端,因为性能强大,开发者们基于librdkafka开发了各种语言的kafka客户端,比如librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要体现在其多线程的设计以及尽可能的降低内存拷贝。

librdkakfa API 简介

librdkafka github地址:https://github.com/edenhill/librdkafka ,
其中,C语言API可以参考src/rdkafka.h头文件,简要介绍几个关键的对象

  • rd_kafka_t: kafka客户端对象
  • rd_kafka_conf_t: kafka客户端配置对象
  • rd_kafka_topic_t: kafka topic对象

创建这几个对象所使用的函数:

  • rd_kafka_new()
  • rd_kafka_conf_new()
  • rd_kafka_topic_new()

librdkafka支持多种协议以控制kafka服务器的访问权限,如SASL_PALIN, PLAINTEXT, SASL_SSL等,在使用librdkafka时需要通过security.protocol参数指定协议类型,再辅以相应协议所需的其它参数完成权限认证。

如果使用SASL协议进行权限认证,需要对librdkafka添加SASL库依赖并重新编译。例如:在CentOS下安装如下依赖包:

yum -y install cyrus-sasl cyrus-sasl-devel

经过重新编译librdkafka后,进入examples目录下,执行

./rdkafka_example -X builtin.features

结果为:

 builtin.features = gzip,snappy,ssl,sasl,regex

可以看到librdkafka已经有了sasl特性,后续可以通过sasl协议进行访问认证。

producer 代码示例

初始化producer


int KafkaApi::init_producer(const std::string &brokers,
                            const std::string &username,
                            const std::string &password) {
  char errstr[512];
  /* Kafka configuration */
  if (NULL == conf_) {
    conf_ = rd_kafka_conf_new();
  }

  rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0);
  rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "api.version.request", "true", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline);

  /* Create Kafka handle */
  if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) {
    fprintf(stderr, "%% Failed to init producer: %s\n", errstr);
    exit(1);
  }

  return 0;
}

初始化过程介绍:

  1. 首先通过rd_kafka_conf_new()函数创建rd_kafka_conf_t对象

  2. 设置rd_kafka_conf_t对象,设置kafka客户端参数,示例参数为:

    • bootstrap.servers: broker地址列表
    • security.protocol: 安全协议类型,示例为SASL_PLAINTEXT
    • sasl.mechanisms: sasl协议机制,示例为PLAIN, 表示普通文本
    • sasl.username: 认证用户名
    • sasl.password: 认证密码
    • api.version.request: 可选,librdkafka与kafka服务器版本适配参数,该参数为true表示允许librdkafka向broker发送请求询问broker支持的API版本列表(Apache Kafka v0.10.0版本后支持),以完成版本适配,更多版本适配要点见https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
    • 设置发送消息的回调函数,因为librdkafka发送消息为非阻塞的,需要通过rd_kafka_poll()方法轮询消息是否发送成功,并设置响应的回调函数确认消息是否发送成功
  3. 调用rd_kafka_new()函数创建rd_kafka_t对象

发送消息

int KafkaApi::send_message(const std::string &topic, const char *data,
                           const int &data_len) {
  rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL);

  if (!rkt) {
    COMMLIB_LOG_ERR("kafka: create topic failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
                             const_cast<char *>(data), data_len, NULL, 0, NULL);
  if (ret == -1) {
    COMMLIB_LOG_ERR("kafka: send message failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  COMMLIB_LOG_DEBUG("produce message [%s]", data);
  rd_kafka_poll(rk_, 0);
  return rt::SUCCESS;
}

发送消息过程介绍:

  1. 通过rd_kafka_topic_new()方法创建rd_kafka_topic_t对象,注意topic是自动创建的(需要broker端设置能否自动创建topic的参数:auto.create.topics.enable=true), 除此之外,topic能否创建成功还与认证用户的权限有关,如果认证用户在broker端为super.users,则topic能够自动创建成功,否则则会报错: 用户无权限,需要先给用户添加ACL权限才行;最后一点,对于已经存在的topic, rd_kafka_topic_new()方法仍然返回的是旧的对象

  2. 发送消息通过调用rd_kafka_produce()函数完成,该函数的参数为:

    • rd_kafka_topic_t对象
    • partition: RD_KAFKA_PARTITION_UA表示为不设置
    • msgflags: 可设置为0或RD_KAFKA_MSG_F_COPY, RD_KAFKA_MSG_F_FREE, RD_KAFKA_MSG_F_BLOCK, RD_KAFKA_MSG_F_COPY表示发送的消息内容参数为值传递,rd_kafka_produce()函数返回之后将不会仍持有消息内容的引用
    • payload, 消息内容指针
    • len, 消息长度
    • key, 消息的key
    • msg_opaque: 每条消息的透明度指针,在消息发送的回调函数中使用
  3. 调用rd_kafka_poll()函数,使得消息发送的回调函数能够触发, 该函数第一个参数为rd_kafka_t对象,第二个参数为timeout_ms,设置为0表示为非阻塞

注意事项

在使用librdkafka带鉴权认证访问kafka服务器的过程中,解决消息发送失败问题的关键点有:

  • librdkafka的SASL依赖有没有添加
  • SASL认证的参数配置有没有正确,需要确认用户在broker端是否已经添加,以及确认用户拥有的权限
  • api.version.request参数,该参数设置不正确,将直接导致消息发送失败,使用过程中需要注意librdkafka的版本与broker的版本
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容