主要内容:解决部署运行遇到问题
Q1 为什么storm 部署的任务没有日志 也就是说没有采集任何数据
A:
A:step1 【知识点补充】 emit transferred ack和fail是什么概念
- emitted栏显示的数字表示的是调用OutputCollector的emit方法的次数.
- transferred栏显示的数字表示的是实际tuple发送到下一个task的计数.
- ack和failed 含义
Spout的可靠性保证
在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功
当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id ,而acker就是利用这个id去跟踪所有的tuple的。 每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。 所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化
A:step2 判断flume有没有采集 kafka有没有数据
- 查看flume 日志:
tail -f /usr/local/apache-flume-1.7.0-bin/logs/flume.log
14 Jul 2017 20:46:27,733 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0
- ps -ef |grep flume
竟然没有启动 重启flume程序
flume-ng agent --conf conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent&
-查看kafa数据搜集情况
[root@VM-10-112-179-18 logs]# kafka-console-producer.sh --broker-list 10.112.179.18:9092 --topic gome
flume启动后根本没有采集数据
修改成文件形式
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory =/usr/local/apache-flume-1.7.0-bin/data
Specify the channel the sink should use
agent.sinks.s1.channel = c1
- 查看kafkalog日志内容
kafka-run-class.sh kafka.tools.DumpLogSegments --files \
/usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log --print-data-log
Dumping /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log
Starting offset: 0
再次查看flume 日志
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
问题在于 这说明已经采取到 数据没有传输到kafka中去
对kafa进行监控
重新检查flume配置文件 kafa迁移其他主机 正常
估计是防火墙的原因
主机间通信:
关闭命令: service iptables stop
永久关闭防火墙:chkconfig iptables off
查看状态:service iptables status
或者topic 配置的不正确
发布storm程序出错
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.log4j.Logger
原因分析:static修饰符作用
java序列化是不能序列化static变量的
解决办法:
出错代码:
public class Customer implements Serializable{
private Logger logger = Logger.getLogger(Customer.class)
}
修正代码:
public class Customer implements Serializable{
private static final Logger logger = Logger.getLogger(Customer.class)
}
storm storm 持久化引入json格式的数据 --缺少依赖
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
1. java.lang.NoClassDefFoundError: net/sf/json/JSONObject at gome.storm.model.SrsLogServeInfo.toJsonString(SrsLogServeInfo.java:31) at gome.storm.bolt.PersistentSrsLog.getEdgeTtoSrsToClitInfo
----->json-lib-2.4-jdk15.jar
2. java.lang.NoClassDefFoundError: org/apache/commons/lang/exception/NestableRuntimeException at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:
---->commons-lang-2.5.jar
3. java.lang.NoClassDefFoundError: net/sf/ezmorph/Morpher at
----> ezmorph-1.0.6.jar
4. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at net.sf.json.AbstractJSON.<clinit>(AbstractJSON.java:53)
---->commons-logging-1.1.1.jar
5. java.lang.NoClassDefFoundError: org/apache/commons/beanutils/DynaBean at
net.sf.json.JSONObject.fromObject(JSONObject.java:147) at
net.sf.json.JSONObject.fromObject(JSONObject.java:134)
------>commons-beanutils 1.8.0
A2:
json-lib官方网站
作用能把对象 map 数组 xml等转换成json结构 并且解析
http://json-lib.sourceforge.net/
综上,想用一个最简单的JSON也得导入以下的6个包:
Json-lib requires (at least) the following dependencies in your classpath:
commons-lang 2.5
commons-beanutils 1.8.0
commons-collections 3.2.1
commons-logging 1.1.1
ezmorph 1.0.6
json-lib-2.4-jdk15.jar
- strom 运行插入redis错误
Could not connect to Redis at 10.77.88.99:6379: Connection refused
估计是防火墙的原因
主机间通信:
关闭命令: service iptables stop
永久关闭防火墙:chkconfig iptables off
查看状态:service iptables status
结果不行 问题在redis服务器上查看redis.conf 说明
################################## NETWORK #####################################
# By default, if no "bind" configuration directive is specified, Redis listens
# for connections from all the network interfaces available on the server.
# It is possible to listen to just one or multiple selected interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
#
# Examples:
#
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
#
# ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only into
# the IPv4 lookback interface address (this means Redis will be able to
# accept connections only from clients running into the same computer it
# is running).
#
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#bind 127.0.0.1
翻译:
- 如果配置成bind 127.0.0.1方式
redis服务器只能接受本地连接的请求(same computer)
Redis will be able to
accept connections only from clients running into the same computer it is running
- 如果配置成#bind 127.0.0.1 方式
redis接受任何服务器的连接(all ) 默认配置
By default
if no "bind" configuration directive is specified, Redis listens
for connections from all the network interfaces available on the server.
- bind 运行访问的远程主机 ip xxxx
bing 允许外网访问的ip(followed by one or more IP addresses)
It is possible to listen to just one or multiple selected interfaces using
the "bind" configuration directive, followed by one or more IP addresses.
Examples:
bind 192.168.1.100 10.0.0.1
对比启动
Q2 重启之后从strom仍然从头开始读取kafka记录
https://github.com/apache/storm/tree/master/external/storm-kafka
How KafkaSpout recovers in case of failures
- kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
- kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
- If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime
意思是说:
spoutConf.ignoreZkOffsets = false; // 重启均是从offset中读取
参考
1 http://blog.csdn.net/yangyutong0506/article/details/46742601
2 https://github.com/dibbhatt/kafka-spark-consumer/issues/16
3 Storm消息可靠性与Ack机制
http://blog.jassassin.com/2014/10/22/storm/storm-ack/
4 Kafka 指南
http://wdxtub.com/2016/08/15/kafka-guide/
5 序列化
https://www.ibm.com/developerworks/cn/java/j-lo-serial/index.html