Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ==> 01-local-ftp.conf <==
- input {
- file {
- # start_position=>"beginning"
- path => "/var/log/xferlog"
- type => "xferlog"
- # add_field => { host => "oven02" }
- }
- }
- ==> 02-local-sftp.conf <==
- input {
- file {
- # start_position=>"beginning"
- path => "/var/log/sftp-server.log"
- type => "sftp"
- # add_field => { host => "oven02" }
- }
- }
- ==> 20-xfer-grok.conf <==
- filter {
- if [type] == "xferlog" {
- grok {
- match => [ "message" , "(?<atime>%{DAY} %{MONTH} [ ]?%{MONTHDAY} %{TIME} %{YEAR}) %{NUMBER:xferTime} %{IP:remoteHost} %{NUMBER:fileSize} %{UNIXPATH:dirName}?/(?<fileName>(?>[\w_%!$@:.,-]+|\\.)*) %{WORD:transferType} %{WORD:specialActionFlag} %{WORD:direction} %{WORD:accessmode} %{WORD:username} %{WORD:serviceName} %{WORD:authenticationMethod} %{DATA:aut} %{WORD:completionStatus}" ]
- add_tag => [ "xfer_grokked" , "grokked" ]
- }
- #Handling the case where the stuff is dropped in the root dir.
- # ie. /filename.csv, and then the GROK pattern have a empty dirName
- if ! [dirName] {
- mutate {
- add_field => {
- "dirName" => ""
- }
- }
- }
- geoip {
- source => "remoteHost"
- database => "/etc/logstash/geoip.db"
- target => "geoip"
- }
- date {
- match => [ "atime", "EEE MMM dd HH:mm:ss YYYY",
- "EEE MMM d HH:mm:ss YYYY",
- "MMM dd HH:mm:ss YYYY"]
- # timezone => [ "Europe/London" ] # Change to your local timezone
- }
- mutate {
- add_field => {
- "source_file" =>"/home/ftp/probes/%{username}/%{dirName}/%{fileName}"
- "dest_file" => "/parserhome/wave/in/%{username}-%{fileName}"
- }
- add_tag => [ "source_dest" ]
- }
- }
- }
- ==> 21-sftp-grok.conf <==
- filter {
- if [type] == "sftp" {
- grok {
- match => [
- "message", "%{SYSLOGBASE2} (?<action>close) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\" bytes read %{BASE10NUM:read} written %{BASE10NUM:written}" ,
- "message", "%{SYSLOGBASE2} (?<action>open) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\" flags %{NOTSPACE:flags} mode %{NUMBER:mode}" ,
- "message", "%{SYSLOGBASE2} (?<action>sent status) %{GREEDYDATA:response}" ,
- "message", "%{SYSLOGBASE2} (?<action>(open|close)dir) \"%{UNIXPATH:dirname}\"" ,
- "message", "%{SYSLOGBASE2} (?<action>session (closed|opened)) for %{NOTSPACE:userType} user %{USERNAME:user} from \[%{IPORHOST:remoteHost}]",
- "message", "%{SYSLOGBASE2} (?<action>remove name) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\""
- ]
- add_tag => [ "grokked" , "sftp_grokked", "%{action}" ]
- add_field => { "sftp_id" => "%{logsource}-%{pid}" }
- }
- if [fileName]{
- if [dirName] {
- mutate {
- add_field => { "sftp_fid" => "%{logsource}-%{pid}-%{dirName}/%{fileName}"}
- }
- }
- else {
- mutate {
- add_field => { "sftp_fid" => "%{logsource}-%{pid}-/%{fileName}"}
- }
- }
- }
- if "sftp_grokked" in [tags] {
- date{
- match => [ "timestamp" , "MMM dd HH:mm:ss","MMM d HH:mm:ss" ]
- }
- elapsed {
- start_tag => "session opened"
- end_tag => "session closed"
- unique_id_field => "sftp_id"
- timeout => 1800
- }
- if [action] == "session opened" {
- geoip {
- source => "remoteHost"
- database => "/etc/logstash/geoip.db"
- target => "geoip"
- }
- aggregate {
- task_id => "%{sftp_id}"
- map_action => "create"
- code => "map['user']=event['user'];map['remoteHost']=event['remoteHost'];map['stime']=%{@timestamp};map['transactions']=0"
- remove_field => [ "sftp_fid" ]
- }
- } else if ( [action] == "close" ) or ( [action] == "open") {
- elapsed{
- start_tag => "open"
- end_tag => "close"
- unique_id_field => "sftp_fid"
- }
- aggregate {
- task_id => "%{sftp_id}"
- map_action => "update"
- code => "event['user']=map['user'];event['remoteHost']=map['remoteHost'];map['transactions']+=1"
- }
- mutate {
- # command => "mv %{dirName}/%{fileName} /parserhome/ranch/in/%{user}-%{fileName}"
- add_field => {
- "source_file" => "%{dirName}/%{fileName}"
- "dest_file" => "/parserhome/ranch/in/%{user}-%{fileName}"
- }
- add_tag => [ "source_dest" ]
- }
- } else if [action] == "session closed" {
- aggregate {
- task_id => "%{sftp_id}"
- map_action => "update"
- code => "event['transactions']=map['transactions'];event['stime']=map['stime']"
- end_of_task => true
- timeout => 1800
- remove_field => [ "sftp_fid" ]
- }
- }
- }
- }
- }
- ==> 40-mv-command.conf <==
- filter {
- if "source_dest" in [tags] {
- mutate {
- add_field => {
- "mv_command" => "/usr/local/bin/mv-log -v --backup=numbered %{source_file} %{dest_file}"
- }
- add_tag => [ "mv_set" ]
- }
- }
- }
- ==> 90-mv.conf <==
- output {
- # stdout { codec => rubydebug }
- if "grokked" in [tags] {
- file {
- codec => rubydebug { }
- flush_interval => 0
- path => "/var/log/logstash/%{type}-%{+YYYY-MM-dd-HH}.out"
- }
- if "mv_set" in [tags] {
- exec {
- # command => "mv /home/ftp/probes/%{username}/%{dirName}/%{fileName} /parserhome/wave/in/%{username}-%{fileName}"
- # command => "mv %{dirName}/%{fileName} /parserhome/ranch/in/%{user}-%{fileName}"
- command => "%{mv_command}"
- }
- }
- file {
- flush_interval => 60
- # gzip => true
- path => "/var/lib/logstash/%{type}.proc"
- }
- }
- if ( "_grokparsefailure" in [tags] ) or ( "_jsonparsefailure" in [tags] ) {
- file {
- codec => json { }
- path => "/var/log/logstash/grok_failures.json"
- }
- }
- }
- ==> 92-logstash.conf <==
- output {
- if [type] == "sftp" or [type] == "xferlog" {
- elasticsearch {
- hosts => "aqua-kib.envisage.ovh:9200"
- index => "ftptransfers-%{+YYYY.MM.dd}"
- template => "/etc/logstash/templates/elasticsearch-transfers.json"
- template_name => "ftptransfers"
- }
- } else {
- elasticsearch {
- hosts => "aqua-kib.envisage.ovh:9200"
- # index => "ftptransfers-%{+YYYY.MM.dd}"
- # template => "/etc/logstash/templates/elasticsearch-transfers.json"
- # template_name => "ftptransfers"
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment