给EFK日志系统添加kafka队列中遇到的time问题解决
以下是我在生产环境中的EFK 架构

在添加kafka队列中的一个问题解决过程
在fluentd中使用fluent-plugin-kafka收集日志到kafka中,日志正常;
然后使用 elasticsearch-river-kafka 来把kafka中的数据写入到ES中,查看正常;
再使用kibana 来读取ES中的数据来展示,问题来了,怎么调试也不能成功的展示数据;
问题在于 fluent-plugin-kafka 默认没有收集日志中的time字段,打开 output_include_time 选项后
就可出现time字段,但并不是日志中的格式,而是转换成了 unix time(例如:1444961965)
而在es中date字段,首先尝试dateOptionalTime,失败的话会尝试UNIX-MS(例如:1444961965000),但是收集到的日志时间是UNIX time,少了毫秒(少了000),所以计算出的时间是在1970年左右,而我在Kibana默认的time tickper是 最近15分钟,于是乎我把时间拉到了最近50年,看到了数据。
查找网上并没找到解决方法,于是自己动手:
修改 out_kafka.rb 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| config_param :client_id, :string, :default => 'kafka' config_param :output_data_type, :string, :default => 'json' config_param :output_include_tag, :bool, :default => false - config_param :output_include_time, :bool, :default => false + + config_param :logstash_format, :bool, :default => false
config_param :max_send_retries, :integer, :default => 3 @@ -130,7 +131,17 @@ def emit(tag, es, chain) begin chain.next es.each do |time,record| - record['time'] = time if @output_include_time + if @logstash_format + if record.has_key?("@timestamp") + time = Time.parse record["@timestamp"] + elsif record.has_key?(@time_key) + time = Time.parse record[@time_key] + record['@timestamp'] = record[@time_key] + else + record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s}) + end + end + record['tag'] = tag if @output_include_tag topic = record['topic'] || self.default_topic || tag partition_key = record['partition_key'] || @default_partition_key
|
有同样需求的同学,请对比修改,完成后在td-agent.conf 的相应位置 添加
1 2 3 4 5
| type kafka
:
logstash_format true
|
github:https://github.com/elain/fluent-plugin-kafka/commit/2ca46cd00f61e0dc63717b20a6c70e90510ac731