首页 > 其他 > 详细

filebeat+logstash+es+kafka数据采集

时间:2020-08-18 16:18:39      阅读:158      评论:0      收藏:0      [点我收藏+]

初期选用fiume发送至Kafka。经过观察,每一个FlumeAgent都占用了较大的系统资源(至少会占用一颗CPU 50%以上的资源)。而另外一个业务,LogServer压力大,CPU资源尤其紧张,如果要实时收集分析日志,那么就需要一个更轻量级、占用资源更少的日志收集框架。 

 filebeat、logstash、es下载

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.8.1-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-linux-x86_64.tar.gz

filebeat解压后修改配置文件

启动命令:./filebeat -e -c filebeat.yml

filebeat.yml配置

filebeat.inputs:(log类型)

  - type: log
# Change to true to enable t
    enabled: true
# Paths that should be crawl
    paths:                      
      - /home/lw/test/filebeat/*.log
    fields:                 
      log_topic: lw_filebeat_t_2

kafka output:

output.kafka:
  enable: true
  #根据kafka指定对应端口和ip
    hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"]

    topic: %{[fields.log_topic]}
    partition.round_robin:
        reachable_only: false

    required_acks: 1
    compression: gzip #也可以none
    max_message_bytes: 1000000

    version: 0.9.0.1                                                                                                          
    codec.format:                                                              
        string: %{[host.name]}-%{[message]}

 

  • hosts是kafka集群的broker list;

  • topic: ‘%{[fields.log_topic]}’ : 这项指定了我们要写入kafka集群哪个topic, 可以看到它实现上是引用了上面test.yml配置中我们自定义的filed字段,通过这种方式我们就可以将收集的不同路径的数据写入到不同的topic中,但是这个有个限制就是只能写到一个kafka集群,因为当前版本的filebeat不允许同时配置多个output。

  • codec.format: 指定了写入kafka集群的消息格式,我们在从日志文件中读取的每行内容前面加上了当前机器的hostname。

在kafka上创建对应的topic

#查看topic
bin/kafka-topics.sh --list --zookeeper 123.321.112.42:2181
#创建
bin/kafka-topics.sh --create --zookeeper 123.321.112.42:2181 --replication-factor 3 --partitions 3 --topic test_log

 kafka其他命令

#生成数据
bin/kafka-console-producer.sh --broker-list 192.168.202.128:9094 --topic test
#查看数据
bin/kafka-console-consumer.sh --bootstrap-server 222.30.196.42:6667 --topic test_log --from-beginning
#主题描述
bin/kafka-topics.sh --zookeeper localhost:2181 --describe  --topic test
#删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete  --topic test

logstash解压

可直接命令启动:bin/logstash -e ‘input { stdin { } } output { stdout {codec=>rubydebug} }‘(控制台输入内容并输出内容)

根据需求修改配置文件:

Kafka输入插件配置

https://blog.csdn.net/weixin_34405354/article/details/88730394

 

kafka到es,期间可以根据需求过滤

input {
  kafka {
    ## app-log-服务名称
    topics_pattern => "app-log-.*"
    bootstrap_servers => "ip:9092"
    codec => json
    consumer_threads => 1    ## 因为只设置了一个partition,所以消费者线程数设置为1
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "app-log-group"
   }
   kafka {
    ## error-log-服务名称
    topics_pattern => "error-log-.*"
    bootstrap_servers => "ip:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "error-log-group"
   }
   
}

filter {
  
  ## 时区转换
  ruby {
    code => "event.set(‘index_time‘,event.timestamp.time.localtime.strftime(‘%Y.%m.%d‘))"
  }

  if "app-log" in [fields][logtopic]{
    grok {
        ## 表达式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\‘\‘|%{QUOTEDSTRING:throwable})"]
    }
  }

  if "error-log" in [fields][logtopic]{
    grok {
        ## 表达式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\‘\‘|%{QUOTEDSTRING:throwable})"]
    }
  }
  
}


## elasticsearch:
output {

  if "app-log" in [fields][logtopic]{
    ## es插件
    elasticsearch {
          # es服务地址
        hosts => ["ip:9200"]
        # 用户名密码      
        user => "elastic"
        password => "123456"
        ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
        ## javalog-app-service-2019.01.23 
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        # 是否嗅探集群ip:一般设置true;http://ip:9200/_nodes/http?pretty
        # 通过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        # logstash默认自带一个mapping模板,进行模板覆盖
        template_overwrite => true
    } 
  }
  
  if "error-log" in [fields][logtopic]{
    elasticsearch {
        hosts => ["ip:9200"]    
        user => "elastic"
        password => "123456"
        index => "error-log-%{[fields][logbiz]}-%{index_time}"  #"test_log-%{+YYYY.MM.dd}"
        sniffing => true
        template_overwrite => true
    } 
  }
  

}

 

ES集群安装

ES的7版本需要jdk1.8以上

解压修改配置文件

vim ./config/elasticsearch.yml

master节点

#配置详情在另一博客
cluster.name: elasticsearch
# ------------------------------------ Node ------------------------------------

node.name: es-master
node.master: true
node.data: true
# ----------------------------------- Paths ------------------------------------

path.data: /opt/server/elasticsearch-7.8.1/data
path.logs: /opt/server/elasticsearch-7.8.1/logs
# ---------------------------------- Network -----------------------------------

network.host: 0.0.0.0

http.cors.enabled: true
http.cors.allow-origin: "*"
# --------------------------------- Discovery ----------------------------------
cluster.initial_master_nodes: ["es-master"]
discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"]

从节点1

# ---------------------------------- Cluster -----------------------------------

cluster.name: elasticsearch
#
# ------------------------------------ Node -----------------------

node.name: es-node01
node.master: false
node.data: true
# ----------------------------------- Paths ------------------------------------

path.data: /opt/server/elasticsearch-7.8.1/data
path.logs: /opt/server/elasticsearch-7.8.1/logs

# ---------------------------------- Network -----------------------------------
network.host: 0.0.0.0

http.cors.enabled: true
http.cors.allow-origin: "*"
# --------------------------------- Discovery ----------------------------------

cluster.initial_master_nodes: ["es-master"] discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"]

从节点2修改node.name:即可

root用户修改配置

vim /etc/security/limits.conf

* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

soft nproc: 可打开的文件描述符的最大数(软限制)

hard nproc: 可打开的文件描述符的最大数(硬限制)

soft nofile:单个用户可用的最大进程数量(软限制)

hard nofile:单个用户可用的最大进程数量(硬限制)

 

vim /etc/sysctl.conf

vm.max_map_count=655360

 

启动

ES不允许使用root操作es,需要添加用户之后切换用户启动

./elasticsearch

后台启动./elasticsearch -d

访问 http://ip:9200/_cat/nodes?v会看到3个节点的信息

 

filebeat+logstash+es+kafka数据采集

原文:https://www.cnblogs.com/mergy/p/13517832.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!