kafka

一 kafka官网

https://kafka.apache.org/

二 安装Kafka

1 安装zookeeper集群

2 安装kafka

1. 解压

[root@lihl01 software]# tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
[root@lihl01 apps]# mv kafka_2.11-1.1.1/ kafka-2.11/

2. 修改$KAFKA_HOME/config/server.properties

The id of the broker. This must be set to a unique integer for each broker.

broker.id=1 ## 当前kafka实例的id,必须为整数,一个集群中不可重复
log.dirs=/opt/apps/kafka-2.11/data/kafka ## 生产到kafka中的数据存储的目录,目录需要手动创建
zookeeper.connect=lihl01,lihl02,lihl03/kafka ## kafka数据在zk中的存储目录

3. 配置环境变量

envrioment

export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$HIVE_HOME/bin
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin

4. 分发,并修改其他的borker.id

[root@lihl01 apps]# scp -r kafka-2.11/ lihl02:/opt/apps/
[root@lihl01 apps]# scp -r kafka-2.11/ lihl03:/opt/apps/

5. 启动zk

[root@lihl01 apps]# zkServer.sh start
[root@lihl02 apps]# zkServer.sh start
[root@lihl03 apps]# zkServer.sh start

6. 启动kafka

[root@lihl01 apps]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[root@lihl02 apps]# kafka-server-start.sh -daemon KAFKA_HOME/config/server.properties [root@lihl03 apps]# kafka-server-start.sh -daemonKAFKA_HOME/config/server.properties

三 Kafka基本操作-命令行

1 创建主题

./kafka-topics.sh \
--create \
--topic lihltest \
--if-not-exists \
--partitions 3 \
--replication-factor 2 \
--zookeeper lihl01,lihl02,lihl03/kafka

Created topic "lihltest".

tip:
副本因子的个数应该小于等于broker的个数

2 查询所有的主题列表

./kafka-topics.sh \
--list \
--zookeeper lihl01,lihl02,lihl03/kafka

lihltest

3 查询主题详情

./kafka-topics.sh
--describe
--topic lihltest
--zookeeper lihl01,lihl02,lihl03/kafka

Topic:lihltest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: lihltest Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: lihltest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: lihltest Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3

Partition: 当前topic对应的分区编号
Replicas : 副本因子,当前kafka对应的partition所在的broker实例的broker.id的列表
Leader : 该partition的所有副本中的leader领导者,处理所有kafka该partition读写请求
ISR : 该partition的存活的副本对应的broker实例的broker.id的列表

4 修改主题

./kafka-topics.sh
--alter
--topic lihltest
--partitions 2
--zookeeper lihl01,lihl02,lihl03/kafka

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

tip:
修改主题的时候,主要是修改主题的分区数,但是分区数只能扩大不能减少

5 删除主题

./kafka-topics.sh
--delete
--topic flink_test1
--zookeeper lihl01,lihl02,lihl03/kafka

6 测试生产与消费

6.1 生产端

./kafka-console-producer.sh
--topic lihltest
--broker-list lihl01:9092,lihl02:9092,lihl03:9092

6.2 消费端

kafka-console-consumer.sh
--topic flink_test1
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092

6.3 消费者组

kafka-console-consumer.sh
--topic lihltest
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092
--group lihltest
--offset latest \ # 从什么位置(消息的偏移量)开始消费。数字、latest、earlist
--partition 0 # 消费者对应的分区

tip:
当前的消费者属于lihltest的消费者组,每次从0分区的最新的位置开始消费
我的主题的并行度由分区和消费者决定,消费者组内的消费者至多可以同时消费数据量最多和分区数相同
我们的kafka中的主题的偏移量默认是保存在zk中,我们需要读取这个主题中间的消息就必须要先获取到偏移量才可以

四 Kafka的JavaAPI

1 生产者生产数据

1.1 初版代码

public class Demo1_Kafka_Producer {
    public static void main(String[] args) {
        //0. 申明连接到kafka的配置的url
        Properties props = new Properties();
        props.put("bootstrap.servers", "lihl01:9092, lihl02:9092, lihl03:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //1. 创建生产者对象
        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 创建你想要发送的记录对象
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("lihltest", "hello");
        producer.send(record);

        //3. 释放
        producer.close();
    }
}

1.2 优化之后

package cn.lihl.spark.kafka.day1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.Properties;

public class Demo1_Kafka_Producer {
    public static void main(String[] args) throws IOException {
        //0. 申明连接到kafka的配置的url
        Properties props = new Properties();
        props.load(Demo1_Kafka_Producer.class.getClassLoader().getResourceAsStream("producer.properties"));

        //1. 创建生产者对象
        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 创建你想要发送的记录对象
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("hzbigdata2002", "hello");
        producer.send(record);

        //3. 释放
        producer.close();
    }
}

1.3 producer.properties常见参数的说明

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer # key的序列器
value.serializer=org.apache.kafka.common.serialization.StringSerializer # value的序列器
acks=[0|-1|1|all] ## 消息确认机制
0: 不做确认,直管发送消息即可
-1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
batch.size=1024 #每个分区内的用户缓存未发送record记录的空间大小

如果缓存区中的数据,没有沾满,也就是仍然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0,

linger.ms=10 ## 不管缓冲区是否被占满,延迟10ms发送request
buffer.memory=10240 #控制的是一个producer中的所有的缓存空间
retries=0 #发送消息失败之后的重试次数

1.4 producer.properties

############################# Producer Basics #############################

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
compression.type=gzip
max.block.ms=3000
linger.ms=1
batch.size=16384
buffer.memory=33554432
acks=1
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

2 消费者消费数据

2.1 consumer.properties

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
group.id=hzbigdata2002
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

2.2 消费者代码

package cn.lihl.spark.kafka.day2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

public class Demo1_Kafka_Consumer {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("lihltest"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
        }
    }
}

3 操作Topic

3.1 创建主题

public class Demo2_Kafka_Admin {
    public static void main(String[] args) {
        //1. 创建配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
        //2. 创建对象
        AdminClient client = AdminClient.create(props);
        //3. 创建主题
        client.createTopics(Arrays.asList(new NewTopic("chengzhiyuan", 3, (short)1)));
        //4. 释放
        client.close();
    }
}

3.2 打印所有的主题列表

public class Demo3_Kafka_Admin_list {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
        //2. 创建对象
        AdminClient client = AdminClient.create(props);
        //3. list
        ListTopicsResult listTopicsResult = client.listTopics();
        //4. 获取到所有的主题名称
        KafkaFuture<Set<String>> names = listTopicsResult.names();
        //5. 获取到所有的名字字符串
        Set<String> topicNames = names.get();
        //6. 遍历
        for (String topicName : topicNames) {
            System.out.println(topicName);
        }
        client.close();
    }
}

4 自定义分区

4.1 默认分区策略

每一条producerRecord有,topic名称、可选的partition分区编号,以及一对可选的key和value组成。
三种策略进入分区
1、如果指定的partition,那么直接进入该partition
2、如果没有指定partition,但是指定了key,使用key的hash选择partition
3、如果既没有指定partition,也没有指定key,使用轮询的方式进入partition

4.2 随机分区器

4.2.1 代码

package cn.lihl.spark.kafka.day2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

public class Demo4_Kafka_RandomPartitioner implements Partitioner{

    private Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //1. 获取我的分区个数
        int partitionCount = cluster.partitionCountForTopic(topic);
        int partition = random.nextInt(partitionCount);
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4.2.2 修改配置文件:producer.properties

partitioner.class=cn.lihl.spark.kafka.day2.Demo4_Kafka_RandomPartitioner

五 Flume整合Kafka

1 安装Flume

[root@lihl01 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/apps/
[root@lihl01 apps]# mv apache-flume-1.9.0-bin/ flume-1.9.0

envrioment

export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export FLUME_HOME=/opt/apps/flume-1.9.0
export CLASSPATH=.:JAVA_HOME/lib/dt.jar:JAVA_HOME/lib/tools.jar
export PATH=PATH:JAVA_HOME/bin:HADOOP_HOME/bin:HADOOP_HOME/sbin:SCALA_HOME/bin:HIVE_HOME/bin
export PATH=PATH:SPARK_HOME/bin:SPARK_HOME/sbin:ZOOKEEPER_HOME/bin:KAFKA_HOME/bin:FLUME_HOME/bin

2 新建一个主题

kafka-topics.sh --create
--topic flume-kafka
--zookeeper lihl01,lihl02,lihl03/kafka
--partitions 3
--replication-factor 1

Created topic "flume-kafka".

3 配置flume:netcat_kafka.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.49.111
a1.sources.r1.port = 6666

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.bootstrap.servers = lihl01:9092,lihl02:9092,lihl03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4 启动测试

1 start-yarn.sh
2 zkServer.sh start
3 kafka-server-start.sh

4 启动flume,后台启动

  1. 前台启动
    flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf -Dflume.root.logger=INFO,console

  2. 后台启动
    nohup flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf > /dev/null 2>&1 &

5 启动消费者脚本
kafka-console-consumer.sh
--topic flume-kafka
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092

6 安装一个web服务器
[root@lihl01 home]# yum -y install telnet

7 启动telnet
telnet lihl01 6666

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,743评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,296评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,285评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,485评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,581评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,821评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,960评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,719评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,186评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,516评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,650评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,936评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,757评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,991评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,370评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,527评论 2 349

推荐阅读更多精彩内容