Kafka介绍之安装配置API

Kafka集群安装步骤:

    1、解压

   2、修改server.properties

             broker.id=1

             zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181

  3、将zookeeper集群启动

  4、在每一台节点上启动broker

            bin/kafka-server-start.sh config/server.properties

   5、在kafka集群中创建一个topic

           bin/kafka-topics.sh --create --zookeeper zk_hostname:2181 --replication-factor 3 --partitions 1 --topic topic_name

   6、用一个producer向某一个topic中写入消息

          bin/kafka-console-producer.sh --broker-list broker_hostname:9092 --topic topic_name

    7、用一个comsumer从某一个topic中读取信息

         bin/kafka-console-consumer.sh --zookeeper zk_hostname:2181 --from-beginning --topic topic-name

    8、查看一个topic的分区及副本状态信息

         bin/kafka-topics.sh --describe --zookeeper zk_hostname:2181 --topic topic_name


Java API

     Producer端:


               public class ProducerDemo{

                       public static void main(String[] args){

                               //创建Properties实例,设置属性

                              Properties properties = new Properties();

                              //声明zk                            

                             properties.put("zk.connect","zk_hostname:2181,....");

                               //声明broker                           

                              properties.put("metadata.broker.list","broker_hostname:9092,.....");

                              properties.put("serializer.class","kafka.serializer.StringEncoder");

                               //创建ProducerConfig配置实例                             

                              ProducerConfig config = new ProducerConfig(properties);

                               //创建Producer实例                            

                              Producer<String,String> producer = new Producer<String,String>(config);

                              //发送消息                         

                              producer.send(new KeyedMessage<String,String>(topic_name,content));

             }

}


Consumer端:




         public class ConsumerDemo{

                      public static void main(String[] args){

                             //创建Properties实例,设置属性

                          Properties properties = new Properties();

                          properties.put("zookeeper.connect","zk_hostname:2181,....");

                              //如果生产者和消费者在同一个组中,则不能访问同一组Topic内的数据

                          properties.put("group.id","id_name");

                           //声明ConsumerConnector

                        ConsumerConnector consumer =  Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

                        MaptopicCountMap = new HashMap();

                        //一次从一个主题中读取数据

                       topicCountMap.put(topic, 1);

                        topicCountMap.put(“topic_name", 1);

                        topicCountMap.put("topic_name1", 1);

                        Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =

                                         consumer.createMessageStreams(topicCountMap);

                      List<KafkaStream<byte[],byte[]>> streams = consumerMap.get("topic_name");

                        for(final KafkaStreamkafkaStream : streams){

                                       new Thread(new Runnable() {

                                                 @Override

                                         public void run() {

                                                for(MessageAndMetadatamm : kafkaStream){

                                                        String msg = new String(mm.message());

                                                         System.out.println(msg);

                                                   }

                                    }

                       }).start();

               }

         }


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,027评论 19 139
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,890评论 4 54
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,505评论 0 34
  • ** 今天看了一下kafka官网,尝试着在自己电脑上安装和配置,然后学一下官方document。** Introd...
    RainChang阅读 5,053评论 1 30
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,373评论 1 15