kafka常用命令脚本

python3 基于kafka-python 2.0.2实现的常用命令工具

转载请注明出处

import argparse
import json
import logging
import re

from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin.new_topic import NewTopic
from kafka.errors import UnknownTopicOrPartitionError
from kafka.structs import TopicPartition

# pip install kafka-python
conf = {
    'bootstrap_servers': 'kafka:9092',
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'SCRAM-SHA-256',
    'sasl_plain_username': 'admin',
    'sasl_plain_password': 'adminsecret'
}

# logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# 创建 logger
logger = logging.getLogger(__name__)
# 设置日志级别为 INFO
logger.setLevel(logging.DEBUG)
# 创建控制台日志
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# 创建文件日志
file_handler = logging.FileHandler('kafka-console.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# 将处理器添加到 logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)


# 定义过滤函数
def filter_dict_by_regex(dictionary, regex):
    # 如果没有提供有效的正则表达式,则返回原始的键列表
    if regex is None:
        return list(dictionary)

    pattern = re.compile(regex)
    matched_keys = [key for key in dictionary if pattern.match(key)]
    return matched_keys


def list_topics():
    print('>>>list topic')
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    topics = admin_client.list_topics()
    for key in topics:
        if '__consumer_offsets' != key:
            print(key)
    admin_client.close()


def delete_topic(topic):
    print(f">>>deleting topic {topic}")
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    # 构建要删除的主题列表
    topics = admin_client.list_topics()
    filter_topic_list = filter_dict_by_regex(topics, topic)
    deletion_result = admin_client.delete_topics(filter_topic_list)
    # 等待主题删除完成
    for topic, error_code in deletion_result.topic_error_codes:
        if error_code == 0:
            print(f"删除主题 '{topic}' 成功")
        else:
            print(f"删除主题 '{topic}' 失败, error_code='{error_code}'")
    admin_client.close()


def create_topic(topic, partitions, replication):
    print(f">>> create topic {topic} --partitions {partitions} --replication-factor {replication}")
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    topics = [NewTopic(topic, num_partitions=partitions, replication_factor=replication)]
    admin_client.create_topics(topics)
    print(f"创建主题 '{topic}' 成功")
    admin_client.close()


def show_offset(topic, group_id):
    print('>>> show offset', topic, group_id)
    conf['group_id'] = group_id
    consumer = KafkaConsumer(topic, **conf)
    try:
        partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
        # total
        toff = consumer.end_offsets(partitions)
        toff = [(key.partition, toff[key]) for key in toff.keys()]
        toff.sort()
        print("total offset: {}".format(str(toff)))
        # current
        coff = [(x.partition, consumer.committed(x)) for x in partitions]
        coff.sort()
        print("current offset: {}".format(str(coff)))
        # cal sum and left
        toff_sum = sum([x[1] for x in toff])
        cur_sum = sum([x[1] for x in coff if x[1] is not None])
        left_sum = toff_sum - cur_sum
        print("kafka left: {}".format(left_sum))
    except Exception as e:
        print("topic no found! error:", e)
    finally:
        consumer.close()


def consumer_topic(topic, offset):
    print('>>> consumer topic', topic)
    # 判断topic是否存在
    admin_client = KafkaAdminClient(**conf)
    try:
        topics = admin_client.list_topics()
        if topic not in topics:
            print(f"Topic '{topic}' does not exist.")
            return None
    except UnknownTopicOrPartitionError:
        print(f"Topic '{topic}' does not exist.")
    finally:
        admin_client.close()
    # 消费
    conf['group_id'] = 'kafka-console-test-consumer'
    conf['auto_offset_reset'] = offset
    consumer = KafkaConsumer(topic, **conf)
    try:
        for message in consumer:
            logger.info(f"receive offset:({message.partition},{message.offset})")
            logger.debug(f"value: {json.loads(message.value)}")
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Topic Management Script')
    subparsers = parser.add_subparsers(dest='command')

    # 创建子命令 delete
    del_parser = subparsers.add_parser('delete', help='Delete something')
    del_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    del_parser.set_defaults(func=delete_topic)

    # 创建子命令 list
    list_parser = subparsers.add_parser('list', help='List all topics')
    list_parser.set_defaults(func=list_topics)

    # 创建子命令 create
    create_parser = subparsers.add_parser('create', help='Create Topic')
    create_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    create_parser.add_argument('-p', dest='partitions', type=int, help='Number of partitions', required=True)
    create_parser.add_argument('-r', dest='replication', type=int, help='Replication factor', required=True)
    create_parser.set_defaults(func=create_topic)

    # 查询offset
    show_offset_parser = subparsers.add_parser('offset', help='Show Offset')
    show_offset_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    show_offset_parser.add_argument('-g', dest='group_id', type=str, help='groupId', required=True)
    show_offset_parser.set_defaults(func=show_offset)

    # 实时消费
    consumer_parser = subparsers.add_parser('consumer', help='Consumer Topic')
    consumer_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    # consumer_parser.add_argument('-o', dest='offset', type=str, help='offset (earliest or latest)', required=True)
    consumer_parser.set_defaults(func=consumer_topic)

    args = parser.parse_args()
    if hasattr(args, 'func'):
        if args.command == 'delete':
            args.func(args.topic)
        elif args.command == 'create':
            args.func(args.topic, args.partitions, args.replication)
        elif args.command == 'offset':
            args.func(args.topic, args.group_id)
        elif args.command == 'consumer':
            args.func(args.topic, 'latest')
        else:
            args.func()
    else:
        parser.print_help()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容