Guest User

grok-troublesome-filter

a guest
Dec 2nd, 2015
164
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.14 KB | None | 0 0
  1. ==> 01-local-ftp.conf <==
  2. input {
  3. file {
  4. # start_position=>"beginning"
  5. path => "/var/log/xferlog"
  6. type => "xferlog"
  7. # add_field => { host => "oven02" }
  8. }
  9.  
  10. }
  11.  
  12. ==> 02-local-sftp.conf <==
  13. input {
  14. file {
  15. # start_position=>"beginning"
  16. path => "/var/log/sftp-server.log"
  17. type => "sftp"
  18. # add_field => { host => "oven02" }
  19. }
  20.  
  21. }
  22.  
  23. ==> 20-xfer-grok.conf <==
  24. filter {
  25. if [type] == "xferlog" {
  26. grok {
  27. match => [ "message" , "(?<atime>%{DAY} %{MONTH} [ ]?%{MONTHDAY} %{TIME} %{YEAR}) %{NUMBER:xferTime} %{IP:remoteHost} %{NUMBER:fileSize} %{UNIXPATH:dirName}?/(?<fileName>(?>[\w_%!$@:.,-]+|\\.)*) %{WORD:transferType} %{WORD:specialActionFlag} %{WORD:direction} %{WORD:accessmode} %{WORD:username} %{WORD:serviceName} %{WORD:authenticationMethod} %{DATA:aut} %{WORD:completionStatus}" ]
  28. add_tag => [ "xfer_grokked" , "grokked" ]
  29. }
  30.  
  31. #Handling the case where the stuff is dropped in the root dir.
  32. # ie. /filename.csv, and then the GROK pattern have a empty dirName
  33. if ! [dirName] {
  34. mutate {
  35. add_field => {
  36. "dirName" => ""
  37. }
  38. }
  39. }
  40.  
  41. geoip {
  42. source => "remoteHost"
  43. database => "/etc/logstash/geoip.db"
  44. target => "geoip"
  45. }
  46.  
  47. date {
  48. match => [ "atime", "EEE MMM dd HH:mm:ss YYYY",
  49. "EEE MMM d HH:mm:ss YYYY",
  50. "MMM dd HH:mm:ss YYYY"]
  51. # timezone => [ "Europe/London" ] # Change to your local timezone
  52. }
  53.  
  54. mutate {
  55. add_field => {
  56. "source_file" =>"/home/ftp/probes/%{username}/%{dirName}/%{fileName}"
  57. "dest_file" => "/parserhome/wave/in/%{username}-%{fileName}"
  58. }
  59. add_tag => [ "source_dest" ]
  60. }
  61. }
  62. }
  63.  
  64. ==> 21-sftp-grok.conf <==
  65. filter {
  66.  
  67. if [type] == "sftp" {
  68. grok {
  69. match => [
  70. "message", "%{SYSLOGBASE2} (?<action>close) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\" bytes read %{BASE10NUM:read} written %{BASE10NUM:written}" ,
  71. "message", "%{SYSLOGBASE2} (?<action>open) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\" flags %{NOTSPACE:flags} mode %{NUMBER:mode}" ,
  72. "message", "%{SYSLOGBASE2} (?<action>sent status) %{GREEDYDATA:response}" ,
  73. "message", "%{SYSLOGBASE2} (?<action>(open|close)dir) \"%{UNIXPATH:dirname}\"" ,
  74. "message", "%{SYSLOGBASE2} (?<action>session (closed|opened)) for %{NOTSPACE:userType} user %{USERNAME:user} from \[%{IPORHOST:remoteHost}]",
  75. "message", "%{SYSLOGBASE2} (?<action>remove name) \"%{UNIXPATH:dirName}/(?<fileName>(?>[\s\w_%!$@:.,-]+|\\.)*)\""
  76. ]
  77. add_tag => [ "grokked" , "sftp_grokked", "%{action}" ]
  78. add_field => { "sftp_id" => "%{logsource}-%{pid}" }
  79. }
  80.  
  81. if [fileName]{
  82. if [dirName] {
  83. mutate {
  84. add_field => { "sftp_fid" => "%{logsource}-%{pid}-%{dirName}/%{fileName}"}
  85. }
  86. }
  87. else {
  88. mutate {
  89. add_field => { "sftp_fid" => "%{logsource}-%{pid}-/%{fileName}"}
  90. }
  91. }
  92. }
  93.  
  94. if "sftp_grokked" in [tags] {
  95. date{
  96. match => [ "timestamp" , "MMM dd HH:mm:ss","MMM d HH:mm:ss" ]
  97. }
  98. elapsed {
  99. start_tag => "session opened"
  100. end_tag => "session closed"
  101. unique_id_field => "sftp_id"
  102. timeout => 1800
  103. }
  104. if [action] == "session opened" {
  105. geoip {
  106. source => "remoteHost"
  107. database => "/etc/logstash/geoip.db"
  108. target => "geoip"
  109. }
  110. aggregate {
  111. task_id => "%{sftp_id}"
  112. map_action => "create"
  113. code => "map['user']=event['user'];map['remoteHost']=event['remoteHost'];map['stime']=%{@timestamp};map['transactions']=0"
  114. remove_field => [ "sftp_fid" ]
  115. }
  116. } else if ( [action] == "close" ) or ( [action] == "open") {
  117. elapsed{
  118. start_tag => "open"
  119. end_tag => "close"
  120. unique_id_field => "sftp_fid"
  121. }
  122. aggregate {
  123. task_id => "%{sftp_id}"
  124. map_action => "update"
  125. code => "event['user']=map['user'];event['remoteHost']=map['remoteHost'];map['transactions']+=1"
  126. }
  127. mutate {
  128. # command => "mv %{dirName}/%{fileName} /parserhome/ranch/in/%{user}-%{fileName}"
  129. add_field => {
  130. "source_file" => "%{dirName}/%{fileName}"
  131. "dest_file" => "/parserhome/ranch/in/%{user}-%{fileName}"
  132. }
  133. add_tag => [ "source_dest" ]
  134. }
  135. } else if [action] == "session closed" {
  136. aggregate {
  137. task_id => "%{sftp_id}"
  138. map_action => "update"
  139. code => "event['transactions']=map['transactions'];event['stime']=map['stime']"
  140. end_of_task => true
  141. timeout => 1800
  142. remove_field => [ "sftp_fid" ]
  143. }
  144. }
  145.  
  146. }
  147. }
  148. }
  149.  
  150. ==> 40-mv-command.conf <==
  151. filter {
  152.  
  153. if "source_dest" in [tags] {
  154. mutate {
  155. add_field => {
  156. "mv_command" => "/usr/local/bin/mv-log -v --backup=numbered %{source_file} %{dest_file}"
  157. }
  158. add_tag => [ "mv_set" ]
  159. }
  160. }
  161. }
  162.  
  163. ==> 90-mv.conf <==
  164.  
  165. output {
  166. # stdout { codec => rubydebug }
  167.  
  168. if "grokked" in [tags] {
  169. file {
  170. codec => rubydebug { }
  171. flush_interval => 0
  172. path => "/var/log/logstash/%{type}-%{+YYYY-MM-dd-HH}.out"
  173. }
  174. if "mv_set" in [tags] {
  175. exec {
  176. # command => "mv /home/ftp/probes/%{username}/%{dirName}/%{fileName} /parserhome/wave/in/%{username}-%{fileName}"
  177. # command => "mv %{dirName}/%{fileName} /parserhome/ranch/in/%{user}-%{fileName}"
  178. command => "%{mv_command}"
  179. }
  180. }
  181. file {
  182. flush_interval => 60
  183. # gzip => true
  184. path => "/var/lib/logstash/%{type}.proc"
  185. }
  186. }
  187.  
  188. if ( "_grokparsefailure" in [tags] ) or ( "_jsonparsefailure" in [tags] ) {
  189. file {
  190. codec => json { }
  191. path => "/var/log/logstash/grok_failures.json"
  192. }
  193. }
  194. }
  195.  
  196. ==> 92-logstash.conf <==
  197. output {
  198.  
  199. if [type] == "sftp" or [type] == "xferlog" {
  200. elasticsearch {
  201.  
  202. hosts => "aqua-kib.envisage.ovh:9200"
  203. index => "ftptransfers-%{+YYYY.MM.dd}"
  204. template => "/etc/logstash/templates/elasticsearch-transfers.json"
  205. template_name => "ftptransfers"
  206. }
  207. } else {
  208. elasticsearch {
  209.  
  210. hosts => "aqua-kib.envisage.ovh:9200"
  211. # index => "ftptransfers-%{+YYYY.MM.dd}"
  212. # template => "/etc/logstash/templates/elasticsearch-transfers.json"
  213. # template_name => "ftptransfers"
  214. }
  215. }
  216.  
  217.  
  218. }
Advertisement
Add Comment
Please, Sign In to add comment