首页 > 其他 > 详细

logstash数据处理及格式化功能详解

时间:2019-08-21 02:27:50      阅读:491      评论:0      收藏:0      [点我收藏+]

Grok正则提取日志

环境延续我上一篇ELK单机版的filebeat-->redis-->logstash-->elasticsearch-->kibana环境,详情请参考:

 Elasticsearch + Logstash + Kibana +Redis +Filebeat 单机版日志收集环境搭建

正则表达式

 普通正则表达式

 

. 任意一个字符

 

* 前面一个字符出现0次或者多次

 

[abc] 中括号内任意一个字符

 

[^abc] 非中括号内的字符

 

[0-9] 表示一个数字

 

[a-z]   小写字母

 

[A-Z] 大写字母

 

[a-zA-Z] 所有字母

 

[a-zA-Z0-9] 所有字母+数字

 

[^0-9] 非数字

 

^xx xx开头

 

xx$ xx结尾

 

\d 任何一个数字

 

\s 任何一个空白字符

 

扩展正则表达式,在普通正则符号再进行了扩展

 

? 前面字符出现0或者1

 

+ 前面字符出现1或者多次

 

{n} 前面字符匹配n

 

{a,b} 前面字符匹配ab

 

{,b} 前面字符匹配0次到b

 

{a,} 前面字符匹配aa+

 

(string1|string2) string1string2

在Kibana的grokdebugger上进行测试

在编写grok提取正则配置前可以在Kibana的grokdebugger上进行测试:

比如我想提取一个如下的Nginx日志:

 192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"

那么可以按照正则表达式和grok语法在grokdebugger进行如下测试:

技术分享图片

可以看到我的Grok成功提取了我想要的内容,我的Grok匹配规则如下:

(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"

(?<字段名>正则)表示将匹配的内容提取为字段,其他不用提取为字段的地方原样写上或者用正则匹配即可。

在配置文件中引入Grok提取规则

vim ./logstash_grok.conf
# logstash_grok.conf内容
input {
   redis {
        host => ‘192.168.1.4‘
        port => 6379
        key => "queue"
        data_type => "list"
  }
}
filter {
    grok {
        match => {
            "message" => ‘(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"‘
        }
    }
}
output {
  elasticsearch {
    hosts => ["http://192.168.1.4:9200"]
  }
}

启动logstah:

/opt/es/logstash-7.2.0/bin/logstash -f ./logstash_grok.conf 

测试是否成功:

echo 192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" >> example.log

在Kinaba中配置index,方便查看:

点击Management-->Kibana-->Index patterns会出现如下提示:技术分享图片

 

 点击Create Index Pattern按照如下步骤创建新的Kibana索引:技术分享图片

输入索引匹配规则,如logstash*会匹配所有logstash开头的ES索引数据,然后点击下一步选择时间字段用于按时间戳筛选数据:

技术分享图片

选择@timestamp将会根据@timestamp字段的时间过滤数据,点击Create index pattern即可在Discover页面选择你创建的Kibana索引来查看数据:

技术分享图片

可以看到我配置的Kibana起作用了。这里要注意的是Kibana采用的是UTC时间,比东八区时间要快8个小时,所以在筛选时间范围的时候我将范围往后扩大了一点,否则可能看不到数据。

展开最上方的记录可以看到我刚才插入的数据被成功提取了:

技术分享图片

去除字段

 通过上面的图可以看到Grok配置的确起作用了,但是Logstash给我们添加了很多字段,有时候这些字段是不必要的,这时候就要可以使用remove_field配置来去除一些字段。

修改配置文件如下:

# logstash_grok.conf内容
input {
   redis {
        host => 192.168.1.4
        port => 6379
        key => "queue"
        data_type => "list"
  }
}
filter {
    grok {
        match => {
            "message" => (?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"
        }
        remove_field => ["message","@version","path"]
    }
}
output {
  elasticsearch {
    hosts => ["http://192.168.1.4:9200"]
  }
}

重启重复上述测试,数秒后查看Kibana:

技术分享图片

Logstash确实为我们去除了我们不需要的字段。

使用日志时间而非插入时间

由于日志输出和消息队列同步以及ELK中的数据传输都是需要时间的,使用ES自动生成的时间戳可能和日志产生的时间并不一致,而且由于时区问题会产生更大的影响,所以需要自定义时间字段,而非使用自动生成的时间字段。

更改配置文件如下:

# logstash_grok.conf内容
input {
   redis {
        host => 192.168.1.4
        port => 6379
        key => "queue"
        data_type => "list"
  }
}
filter {
    grok {
        match => {
            "message" => (?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"
        }
        remove_field => ["message","@version","path"]
    }
date {
        match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
    } } output { elasticsearch { hosts
=> ["http://192.168.1.4:9200"] } }

 

 

 

重复上述测试,扩大时间范围只2018年某月可以看到有一条2019年2月的数据

技术分享图片

 

提取json格式的数据

Grok对于提取非结构化的数据是很方便的,但是对于json格式的数据如果还用Grok来提取未免也太麻烦了点,毕竟采用json这种半结构化数据来输出日志本来就是为了方便处理。还好Logstash早就考虑到了这点,并提供了json格式数据的提取规则。

修改配置文件以提取json数据:

 

# logstash_json.conf
input {
   redis {
        host => 192.168.1.4
        port => 6379
        key => "queue"
        data_type => "list"
  }
}
filter {
  json {
    source => "message"
    remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
  } 
  date {
        match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
    }
}
output {
  elasticsearch {
    hosts => ["http://192.168.1.4:9200"]
  } 
} 

 

测试是否配置成功:

logstash_json.conf启动Logstash:

 /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_json.conf

向example.log追加一条json数据:

echo {"@timestamp":"24/Feb/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"} >> example.log

数秒后在kibana查看数据:

技术分享图片

可以看到我们的配置是生效的,只是日期好像没有替换成我们自定义的日期

不早了,明天再更。。。

 

logstash数据处理及格式化功能详解

原文:https://www.cnblogs.com/yanshaoshuai/p/11386442.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!