启动kafka
./kafka-server-start.sh ../config/server-10.properties
./kafka-server-start.sh -daemon ../config/server-12.properties
安全关闭kafka
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper localhost:2181/kafka --broker #brokerId# --num.retries 3 --retry.interval.ms 60
创建topic
./kafka-topics.sh --create --zookeeper 172.31.120.10:2181/kafka --replication-factor 3 --partitions 3 --topic test
删除topic
./kafka-topics.sh --delete --zookeeper 172.31.120.10:2181,172.31.120.11:2181,172.31.120.12:2181 --topic test-replicated-topic
查询topic
./kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic test-replicated-topic
启动控制台Producer,向Kafka发送消息
./kafka-console-producer.sh --broker-list localhost:9092/kafka --topic test-replicated-topic
启动控制台Consumer,消费刚刚发送的消息
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test-replicated-topic --from-beginning
控制台查看Consumer的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181/kafka --topic topic-blog --group sunck
增加分区的副本数
./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file add-replicate.json --execute
json文件格式:
{"partitions":
[{"topic": "topic-todo",
"partition": 0,
"replicas": [10,11,12]
}],
"version":1
}
重新选举分区的leader
./kafka-preferred-replica-election.sh --zookeeper localhost:2181/kafka --path-to-json-file partion-election.json --execute
json文件格式
{"partitions":
[{"topic": "topic-systemlog", "partition": 2},
{"topic": "test", "partition": 0}
]
}
查看ConsumerGroup信息
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 172.31.120.10:9092 --group waiqin365_aiimagecheck --describe
修改Topic的分区个数(此修改切记不要忘记增加zookeeper的offset)
./kafka-topics.sh --alter --zookeeper 172.31.120.10:2181/kafka --partitions 4 --topic topic-test