给EFK日志系统添加kafka队列中遇到的time问题解决

以下是我在生产环境中的EFK 架构

efk

在添加kafka队列中的一个问题解决过程

在fluentd中使用fluent-plugin-kafka收集日志到kafka中,日志正常;
然后使用 elasticsearch-river-kafka 来把kafka中的数据写入到ES中,查看正常;
再使用kibana 来读取ES中的数据来展示,问题来了,怎么调试也不能成功的展示数据;

问题在于 fluent-plugin-kafka 默认没有收集日志中的time字段,打开 output_include_time 选项后
就可出现time字段,但并不是日志中的格式,而是转换成了 unix time(例如:1444961965)
而在es中date字段,首先尝试dateOptionalTime,失败的话会尝试UNIX-MS(例如:1444961965000),但是收集到的日志时间是UNIX time,少了毫秒(少了000),所以计算出的时间是在1970年左右,而我在Kibana默认的time tickper是 最近15分钟,于是乎我把时间拉到了最近50年,看到了数据。

查找网上并没找到解决方法,于是自己动手:

修改 out_kafka.rb 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

config_param :client_id, :string, :default => 'kafka'
config_param :output_data_type, :string, :default => 'json'
config_param :output_include_tag, :bool, :default => false
- config_param :output_include_time, :bool, :default => false
+ #config_param :output_include_time, :bool, :default => false
+ config_param :logstash_format, :bool, :default => false

# poseidon producer options
config_param :max_send_retries, :integer, :default => 3
@@ -130,7 +131,17 @@ def emit(tag, es, chain)
begin
chain.next
es.each do |time,record|
- record['time'] = time if @output_include_time
+ if @logstash_format
+ if record.has_key?("@timestamp")
+ time = Time.parse record["@timestamp"]
+ elsif record.has_key?(@time_key)
+ time = Time.parse record[@time_key]
+ record['@timestamp'] = record[@time_key]
+ else
+ record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s})
+ end
+ end
+ #record['time'] = time if @output_include_time
record['tag'] = tag if @output_include_tag
topic = record['topic'] || self.default_topic || tag
partition_key = record['partition_key'] || @default_partition_key

有同样需求的同学,请对比修改,完成后在td-agent.conf 的相应位置 添加

1
2
3
4
5
type kafka

: # Omit configuration about fluent-plugin-kafka

logstash_format true

github:https://github.com/elain/fluent-plugin-kafka/commit/2ca46cd00f61e0dc63717b20a6c70e90510ac731

文章目录
,