kafka学习二

上一期介绍了kafka的基本信息和简单使用。这一期介绍kafka的详细信息。

Kafka的Producer

1、消息和数据的生产者,向 Kafka 的 topic 发布消息。
2、Producer将消息发布到指定的Topic中,Producer可以指定将此消息归属于哪个partition,如果不指定,kafka会基于"round-robin"的方式,将消息存放到partition中去.
3、异步发送,批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。

Kafka的Consumer

1、消息和数据的消费者,订阅 topics 并处理其订阅的消息。
2、每个consumer属于一个consumer group,每个group中可以有多个consumer,发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
3、在 kafka中,我们可以认为一个group是一个"订阅"者,一个Topic中的每个partition只会被一个"订阅者"中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息。
4、consumer group(包含多个consumer)对一个topic进行消费,不同的consumer group之间独立订阅。

Kafka的Broker

1、Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。
2、为了减少磁盘写入的次数,broker会将消息暂时buffer起来(segment),当消息的个数达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
3、Broker不保存订阅者的状态,由订阅者自己保存(低版本kafka由zookeeper保存,新版本中broker保存订阅者的状态)。
4、kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
5、消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。

Kafka的Message

1、Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
2、Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。

Kafka的Partitions

1、kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。
2、可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
3、越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。

Kafka的持久化

1、kafka采用日志形式来持久化,添加数据时,直接追加到日志文件上面。
2、一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。
3、Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。
4、为数据文件建索引:稀疏存储,每隔一定字节的数据建立一条索引。

索引示意图

Kafka的分布式架构


Kafka的分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。

kafka-manager集群管理器

kafka-manager是yahoo的开源项目,通过Kafka Manager用户能够更容易地发现集群中哪些主题或者分区分布不均匀,同时能够管理多个集群,能够更容易地检查集群的状态,能够创建主题,执行首选的副本选择,能够基于集群当前的状态生成分区分配,并基于生成的分配执行分区的重分配,此外,Kafka Manager还是一个非常好的可以快速查看集群状态的工具。项目地址:https://github.com/yahoo/kafka-manager

Kafka集群配置

KafkaUtil文件:

package com.lqq.demo2;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaUtil {
    public final static String topic = "Kaffka_demo2";
    public final static String bootstrap_servers = "localhost:9092,localhost:9093,localhost:9094";
    
    public final static String group_id = "group_demo2";
    public final static String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";
    public final static String value_serializer = "org.apache.kafka.common.serialization.StringSerializer";
    public final static String key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    public final static String value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    
    private static KafkaProducer<String, String> kp;
    private static KafkaConsumer<String, String> kc;

    public static KafkaProducer<String, String> getProducer() {
        if (kp == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrap_servers);
            props.put("acks", "all");
            props.put("client.id", "demo2_producer");
            props.put("retries", 1);
            props.put("batch.size", 16384);
            props.put("key.serializer", key_serializer);
            props.put("value.serializer", value_serializer);
            kp = new KafkaProducer<String, String>(props);
        }
        return kp;
    }

    public static KafkaConsumer<String, String> getConsumer() {
        if (kc == null) {
            Properties props = new Properties();

            props.put("bootstrap.servers", bootstrap_servers);
            props.put("group.id", group_id);
            props.put("client.id", "demo2_consumer");
            props.put("heartbeat.interval.ms", "200");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", key_deserializer);
            props.put("value.deserializer", value_deserializer);
            kc = new KafkaConsumer<String, String>(props);
        }

        return kc;
    }
}

生产者类:

package com.lqq.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KProducer{
    private static AtomicInteger msgNo = new AtomicInteger(1);
    
    
    public static class ProducerThread implements Runnable{
        Producer<String, String> producer=KafkaUtil.getProducer();
        String topic;
        public ProducerThread(String topic){
            this.topic=topic;
        }
        @Override
        public void run() {
            while (true) {
                int no=msgNo.getAndIncrement();
                String data = new String("hello kafka message " + no);
                String key = String.valueOf(no);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, data);
                producer.send(record, new SendCallback());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService producerPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            producerPool.execute(new ProducerThread(KafkaUtil.topic));
        }
        producerPool.shutdown();

    }
}

KConsumer类:

package com.lqq.demo2;

import java.util.Arrays;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;


public class KConsumer{
    private final String topic;
    private final Consumer<String, String> consumer=KafkaUtil.getConsumer();
    
    public KConsumer(String topic) {
        this.topic = topic;
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Consumer  record  offset=" + record.offset() + "  key=" + record.key() + " value="
                        + record.value());
            }
        }
    }
    public void close(){
        consumer.close();
    }
    public static void main(String[] args) {
        KConsumer consumer=new KConsumer(KafkaUtil.topic);
        consumer.consume();
        consumer.close();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容