1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > 大数据日志分析系统-logstash

大数据日志分析系统-logstash

时间:2019-03-03 14:06:54

相关推荐

大数据日志分析系统-logstash

logstash简介

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

logstash-2.2.2的配置:

从logstash-forward 到kafka的配置

ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf

input {

lumberjack {

port => "5044"

ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"

ssl_key => "/home/ubuntu/logstash-2.2.2/config/lumberjack.key"

type => "fc_access"

}

}

output {

if "_grokparsefailure" not in [tags] {

# stdout { codec => rubydebug }

kafka {

topic_id => "kafka_es"

bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"

compression_type => "snappy"

acks => ["1"]

value_serializer => "org.mon.serialization.StringSerializer"

timeout_ms => 10000

retries => 5

retry_backoff_ms => 100

send_buffer_bytes => 102400

workers => 2

}

}

}

从kafka到es配置

其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)

ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf

input {

kafka {

topic_id => "kafka_es"

group_id => "kafka_es"

zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181"

consumer_threads => 1

consumer_restart_on_error => true

consumer_restart_sleep_ms => 5000

decorate_events => true

consumer_timeout_ms => 1000

queue_size => 100

auto_offset_reset => "smallest"

rebalance_max_retries => 50

}

}

filter {

mutate {

add_field => [ "messageClone", "%{message}" ]

}

mutate {

split => { "messageClone" => '"' }

add_field => {"agent" => "%{[messageClone][3]}"}

}

useragent {

source => "agent"

}

mutate {

split => { "message" => " " }

add_field => {"timestamp" => "%{[message][0]}"}

add_field => {"reqtime" => "%{[message][1]}"}

add_field => {"clientIP" => "%{[message][2]}"}

add_field => {"squidCache" => "%{[message][3]}"}

add_field => {"repsize" => "%{[message][4]}"}

add_field => {"reqMethod" => "%{[message][5]}"}

add_field => {"requestURL" => "%{[message][6]}"}

add_field => {"username" => "%{[message][7]}"}

add_field => {"requestOriginSite" => "%{[message][8]}"}

add_field => {"mime" => "%{[message][9]}"}

add_field => {"referer" => "%{[message][10]}"}

add_field => {"agentCheck" => "%{[message][11]}"}

add_field => {"dnsGroup" => "%{[message][-1]}"}

remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]

}

if [agentCheck] =~ "ChinaCache" {

grok { match => { "agentCheck" => "OOPS" } }

}

mutate {

convert => {

"timestamp" => "float"

"reqtime" => "integer"

"repsize" => "integer"

}

remove_field => ["agentCheck"]

}

ruby {

code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"

}

date { match => [ "timestamp_str", "ISO8601" ]

}

mutate {

split => { "requestURL" => '/' }

add_field => {"uriHost" => "%{[requestURL][2]}"}

remove_field => ["timestamp_str"]

}

mutate {

join => { "requestURL" => '/' }

}

ruby {

code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"

}

}

output {

if "ChinaCache" not in [agent] {

#stdout { codec => "rubydebug" }

elasticsearch {

index => "logstash-%{+YYYY.MM.dd.HH}"

workers => 1

flush_size => 5000

idle_flush_time => 1

hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]

}

}

}

启动命令:

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置

从filbeat到kafka的配置

ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf

input {

beats {

port => "5055"

type => "log"

}

}

output {

# stdout { codec => rubydebug }

kafka {

codec => "json"

bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"

topic_id => "test"

compression_type => "snappy"

value_serializer => "org.mon.serialization.StringSerializer"

}

}

检测

/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.test_and_exit

启动

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic 2>&1 > /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log &

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。