kafka和flume的区别与

(1)kafka和flume都是日志系统。kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器)[source channel sink]。
(2)kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。

采集层 主要可以使用Flume, Kafka两种技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。
Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
正如你们所知Flume内置很多的source和sink组件。Kafka明显有一个更小的生产消费者生态系统,使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。
Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。
Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。
Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。

flume+kafka flume接收kafka的数据 写入hive和hbase
配置参考:
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.If you have multiple Kafka sources running, you can configure them with the same Consumer Groupso each will read a unique set of partitions for the topics.

Property Name Default Description

channels
type The component type name, needs to beorg.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agentsindicates that they are part of the same consumer group
kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex Regex that defines set of topics the source is subscribed on. This property has higher prioritythankafka.topics and overrideskafka.topics if exists.
batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to ChannelThe batch will be written whenever the first of size and time will be reached.
..........
Other Kafka Consumer Properties These properties are used to configure the Kafka Consumer. Any consumer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer .eg:kafka.consumer.auto.offset.reset

flume-ng-node Application main方法解析shell命令 加载configuration 形成context上下文,执行LifecycleAware接口的start方法,kafkasource 继承了abstractsource 实现了 configurable和pollablesource(轮询),重写了LifecycleAware的start和stop方法,configurable的configure方法,以及pollablesource的process方法。执行LifecycleAware的方法时实际上运行的是kafkasource的start方法,根据kafka配置建立消费者连接和kafkastream流。在start之前加载了配置文件时已经将重载后的configure执行了一遍,更改了一些source的配置,如果没有的话会有默认配置。
之后由sourceRunner的子类PollableSourceRunner驱动kafkasource运行(PollableSourceRunner启动了一个PollingRunner线程,该线程调用了kafkasource的process方法),process方法将stream的流数据读取转换为channel所需的event,写入channel中。
当时间达到一定的时间间隔或者批处理的事件条数达到一定数目时,将eventlist一次性发往channel(由absractsource的getChannelProcessor返回的processor调用processEventBatch方法发送)。
核心代码为:

while (eventList.size() < batchUpperLimit &&
              System.currentTimeMillis() < maxBatchEndTime) {
.....
event = EventBuilder.withBody(eventBody, headers);
        eventList.add(event);
....
}
if (eventList.size() > 0) {
        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
        counter.addToEventReceivedCount((long) eventList.size());
        getChannelProcessor().processEventBatch(eventList);
        counter.addToEventAcceptedCount(eventList.size());
        if (log.isDebugEnabled()) {
          log.debug("Wrote {} events to channel", eventList.size());
        }
        eventList.clear();

基本配置为:

public class KafkaSourceConstants {

  public static final String KAFKA_PREFIX = "kafka.";
  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
  public static final String DEFAULT_KEY_DESERIALIZER =
      "org.apache.kafka.common.serialization.StringDeserializer";
  public static final String DEFAULT_VALUE_DESERIALIZER =
      "org.apache.kafka.common.serialization.ByteArrayDeserializer";
  public static final String BOOTSTRAP_SERVERS =
      KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
  public static final String TOPICS = KAFKA_PREFIX + "topics";
  public static final String TOPICS_REGEX = TOPICS + "." + "regex";
  public static final String DEFAULT_AUTO_COMMIT =  "false";
  public static final String BATCH_SIZE = "batchSize";
  public static final String BATCH_DURATION_MS = "batchDurationMillis";
  public static final int DEFAULT_BATCH_SIZE = 1000;
  public static final int DEFAULT_BATCH_DURATION = 1000;
  public static final String DEFAULT_GROUP_ID = "flume";

  public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;

  public static final String AVRO_EVENT = "useFlumeEventFormat";
  public static final boolean DEFAULT_AVRO_EVENT = false;

  /* Old Properties */
  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
  public static final String TOPIC = "topic";
  public static final String OLD_GROUP_ID = "groupId";

  // flume event headers
  public static final String TOPIC_HEADER = "topic";
  public static final String KEY_HEADER = "key";
  public static final String TIMESTAMP_HEADER = "timestamp";
  public static final String PARTITION_HEADER = "partition";

}

kafkasource根据消费者建立的数据流datastream读取,每隔一段批处理时间或者数据达到一定数目将会往channel写数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,314评论 1 15
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,467评论 0 34
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,821评论 4 54
  • 爱是一件很美好的事儿, 很爱很爱却是一件很糟糕的事儿。 有时候,虽然能想明白, 但心里就是接受不了。 大道理人人都...
    夕子宁洋阅读 102评论 0 0