8

I am reading checkpoint log file with csv format with logstash and some fields have null value.

i want to remove all fields with null value.

i can not foresee exactly which fields(keys) will have null value because i have 150 columns in the csv file and i dont want check each one of them.

is it possible to do a dynamic filter in logstash that will remove any fields with null value?

my logstash configuration file look like that:

input {
  stdin { tags => "checkpoint" } 
   file {
   type => "file-input"
   path =>  "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
   sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
   start_position => "beginning"
   tags => ["checkpoint","offline"]
  }
}
filter {
 if "checkpoint" in [tags] {
        csv {
        columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
      #  remove_field => [ any fields with null value] how to do it please 
        separator => "|"
        }
    # drop csv header
        if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
        drop { }
    }
    }
  }

}
output {
   stdout {
    codec => rubydebug 
  }
   file {
      path => "output.txt"
   }

HERE I ATTACH SOME LOGS EXAMPLE:

num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tomer
  • 231
  • 2
  • 4
  • 12

5 Answers5

10

Ruby filter can meet your requirement.

input {
        stdin {
        }
}

filter {
        csv {
                columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
                separator => "|"
        }
        ruby {
                code => "
                        hash = event.to_hash
                        hash.each do |k,v|
                                if v == nil
                                        event.remove(k)
                                end
                        end
                "
        }
}

output {
    stdout { codec => rubydebug }
}

You can use ruby plugin to filter all the field with nil value (null in Ruby)

Updated:

This is my environment: Windows server 2008 and Logstash 1.4.1. Your logs sample is work at me! I have updated the configuration, input and output.

Input

2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

Output:

{
        "@version" => "1",
      "@timestamp" => "2015-03-12T00:30:34.123Z",
            "host" => "BENLIM",
             "num" => "2",
            "date" => "8Jun2012",
            "time" => "16:52:39",
            "orig" => "10.0.0.1",
            "type" => "log",
          "action" => "keyinst",
        "i/f_name" => "daemon",
         "i/f_dir" => "inbound",
         "product" => "VPN-1 & FireWall-1",
    "Internal_CA:" => "Certificate initialized",
     "serial_num:" => "86232",
             "dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}
Ban-Chuan Lim
  • 7,840
  • 4
  • 35
  • 52
  • Hello Ben thanks for helping me, i am testing you ruby answer but in the output to file the null fields still exist... – tomer Mar 10 '15 at 10:22
  • Can I know the field value is "nil" or "null"? If you want to remove the field with value "null", in the ruby code you want to use "if v== 'null '" – Ban-Chuan Lim Mar 10 '15 at 11:27
  • this not working when your logstash output is file or elasticsearch – tomer Mar 10 '15 at 13:53
  • But it's worked on me in either stdout or file. Can you provide your log sample? – Ban-Chuan Lim Mar 11 '15 at 01:31
  • Hi Ben and 10x for helping me. i update my question and provided a log example – tomer Mar 11 '15 at 17:05
  • I have update the answer. Your logs is worked at me. Maybe you have try to figure out other problem. You can use my config and test again. – Ban-Chuan Lim Mar 12 '15 at 00:35
  • can you try add file to ouput section. please tell me if there is null value in the output file – tomer Mar 12 '15 at 21:31
  • Yes! I also use the file output. The null value field will be eliminated – Ban-Chuan Lim Mar 13 '15 at 01:01
  • Ok Ben your answer work great, i had a mistake in my config file because that in my filter section i use remove_field => [ ] on the csv plugin. so its ignore the removed fields from ruby plugin – tomer Mar 13 '15 at 08:46
  • There is also an option in the csv filter named skip_empty_columns (at least there is now) - but the ruby was inventive. :) – st2rseeker Mar 24 '16 at 16:40
1

Check the skip_empty_columns option of the csv filter - was really helpful in my use case. :)

Usage:

skip_empty_columns => true
st2rseeker
  • 473
  • 1
  • 5
  • 28
1

If you need to remove all null, blank, and empty fields recursively (0 and false remain), this function might be able to help. It uses the Ruby filter in Logstash. It's by no means elegant, but seems to work pretty effectively.

filter {
    ruby {
        init => "
            def Compact(key)
                modifiedKey = nil
                parentKey = nil

                if key.kind_of?(String)
                    if key.start_with?('[')
                        modifiedKey = key
                    else
                        modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
                    end

                parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
                end

                unless modifiedKey.nil?
                    if event.get(modifiedKey).is_a?(Enumerable) &&
                    (event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
                         event.remove(modifiedKey)
                    elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
                         event.remove(modifiedKey)
                     end

                    if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
                    (event.get(parentKey).nil? || event.get(parentKey).empty?)
                        event.remove(parentKey)
                    end
                end

               if key == event.to_hash ||
               event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
                   key = event.get(modifiedKey) unless modifiedKey.nil?
                   key.each{ |k|
                      Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
                   }
               end

               rescue Exception => e
                   puts %{ruby_exception_#{__method__.to_s} - #{e}}
           end
      "

     code => "
         Compact(event.to_hash)
     "
    }
}
StephenSolace
  • 459
  • 5
  • 4
1
ruby {
            init => "
                def removeEmptyField(event,h,name)
                    h.each do |k,v|
                            if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
                                removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
                            else
                            if v == '' || v.to_s == '{}'
                                event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
                            end
                        end
                    end
                end
            "
            code => "
                removeEmptyField event,event.to_hash,''
            "
    }
dounine
  • 71
  • 1
  • 3
  • I can confirm that the code above does what is expected – remove fields with null value. Using logstash 6.8.1. I've enhanced it a bit to remove fields with "-" value as well. Just make the second `if` line to look like: `if v == '' || v.to_s == '{}' || v == '-'` – Yalok Iy Jul 31 '19 at 07:46
  • Actually, to remove fields with `null` value, one needs to add `|| v == nil` as well. – Yalok Iy Aug 08 '19 at 12:00
0

To do things this dynamically, you'll want to use the ruby{} filter. There's some good sample code in this answer.

Community
  • 1
  • 1
Alain Collins
  • 16,268
  • 2
  • 32
  • 55
  • Hi Alain thnks for helping me, i am tring to use ruby filter but i am new to ruby.... and the result output to my file still got a null fields, i will be a glad to get code example to my specific problem that i can test – tomer Mar 10 '15 at 10:33