Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Antoine Cotten
    @antoineco
    Let me know if you need extra guidance!
    efftee
    @sriptorium
    Hi @antoineco 👋
    Thanks for your answers. All very clear and to the point. I’ll work with that and should be just fine ! 👍
    Siddharth Balyan
    @alt-glitch
    Hi, I want to use --config.reload.automatic in this docker-elk stack such that changing the config file on my host reloads the logstash and use the updated one.
    Do I add this in docker-compose.yml or docker-stack.yml?
    Quite new to all this so I appreciate anyone's patience and help :)
    Antoine Cotten
    @antoineco
    @alt-glitch you have to pass it as a command argument in the docker-compose.yaml file. Here is an example of issue where a user was using that exact setting: deviantony/docker-elk#506
    Siddharth Balyan
    @alt-glitch
    @antoineco: Brilliant! Solves my doubt fully. Thank you so much.
    Siddharth Balyan
    @alt-glitch
    Is this gitter only for docker-elk related queries or can we ask some elk related queries too :P
    Antoine Cotten
    @antoineco
    If it's not too advanced you can try here and I'll try my best, but there aren't many active people in this Gitter room (if any) besides me :smile: Otherwise, you may have more chances at discuss.elastic.co/
    Siddharth Balyan
    @alt-glitch

    Ah cool. What i basically want to do is to assign the timestamp from a nested JSON log. If this is the file;

    {
      "ip": "127.0.0.1",
      "data": {
        "ssh": {
          "status": "success",
          "protocol": "ssh",
          "result": {
            "server_id": {
              "raw": "SSH-2.0-OpenSSH_X.X",
              "version": "2.0",
              "software": "OpenSSH_X.X"
            },
            "algorithm_selection": {
              "dh_kex_algorithm": "curve25519-sha256@libssh.org",
              "host_key_algorithm": "ecdsa-sha2-nistp256",
              "client_to_server_alg_group": {
                "cipher": "aes128-ctr",
                "mac": "hmac-sha2-256",
                "compression": "none"
              },
              "server_to_client_alg_group": {
                "cipher": "aes128-ctr",
                "mac": "hmac-sha2-256",
                "compression": "none"
              }
            },
            "key_exchange": {
              "curve25519_sha256_params": {
                "server_public": "fnBsVPDLuMTsaQBUTii6/cBuG3+AUeDIFj4QlcdYqEM="
              }
            }
          },
          "timestamp": "2021-10-14T06:42:07Z"
        }
      }
    }

    How do I read the timestamp from inside a nested JSON?

    Here is my logstash conf
    input {
        beats {
            port => 5044
        }
        tcp {
            port => 5000
        }
    }
    
    filter {
        json {
            source => "message"
        }
    
        ruby {
            code => '
            t = event.get("[data]")
            event.set(event.get("[@timestamp]"), t.values[0][timestamp])
            '
        }
    
        mutate {
            remove_field => ["message"]
        }
    }
    
    output {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "XXXXXXXX"
            password => "XXXXXXX"
            ecs_compatibility => disabled
        }
    }
    But this doesn't seem to be working :(
    Antoine Cotten
    @antoineco
    @alt-glitch Maybe you can achieve that with a single mutation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
    filter {
        if [data][ssh][timestamp] {
            mutate {
                replace => { "[@timestamp]" => "${[data][ssh][timestamp]}" }
            }
        }
    }
    Antoine Cotten
    @antoineco
    In fact, you were right, the input first needs to be consumed as JSON. And there is a problem with my mutation: it yields the wrong data type ("wrong argument type String (expected LogStash::Timestamp)").
    image.png
    I think I managed to achieve what you wanted with the following filter:
    filter {
            json {
                    source => "message"
            }
    
            date {
                    match => [ "[data][ssh][timestamp]", "ISO8601" ]
            }
    
            mutate {
                    remove_field => [ "message" ]
            }
    }
    Antoine Cotten
    @antoineco
    Please note that it might be more convenient to set codec => json directly on the input instead of inside a filter (example: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html#plugins-inputs-tcp-codec), this way you wouldn't need to perform this deletion with remove_field since the message would already enter the pipeline as structured data.
    Siddharth Balyan
    @alt-glitch

    @alt-glitch Maybe you can achieve that with a single mutation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

    filter {
        if [data][ssh][timestamp] {
            mutate {
                replace => { "[@timestamp]" => "${[data][ssh][timestamp]}" }
            }
        }
    }

    But the problem is that my log files are not consistent. I have different files for different protocols; ssh, postgres, FTP, HTTP, etc etc.

    {
      "ip": "127.0.0.1",
      "data": {
        "postgres": {
          "status": "success",
          "protocol": "postgres",
          "result": {
            "supported_versions": "FATAL:  unsupported frontend protocol 0.0: server supports 1.0 to 3.0",
            "protocol_error": {
              "code": "0A000",
              "file": "postmaster.c",
              "line": "2071",
              "message": "unsupported frontend protocol 255.255: server supports 1.0 to 3.0",
              "routine": "ProcessStartupPacket",
              "severity": "FATAL",
              "severity_v": "FATAL"
            },
            "is_ssl": true
          },
          "timestamp": "2021-10-14T06:42:08Z"
        }
      }
    }

    ^^ another example

    Please note that it might be more convenient to set codec => json directly on the input instead of inside a filter (example: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html#plugins-inputs-tcp-codec), this way you wouldn't need to perform this deletion with remove_field since the message would already enter the pipeline as structured data.

    This is actually a dummy, test config. I am using your entire docker-elk and ingest this logstash pipeline into elasticsearch. But to troubleshoot, I'm using the config I shared above

    So my problem is that i want to access the timestamp from data[protocol][timestamp]
    However i have around 12 different protocols and more would be added
    The timestamp is always located in the same location, ie the last key:value pair inside the protocol nested json
    Siddharth Balyan
    @alt-glitch
    https://discuss.elastic.co/t/getting-key-value-from-nested-event-element/107852
    this issue is kind of the same as mine? I want to get the value from nested event element too.
    https://discuss.elastic.co/t/create-timestamp-from-nested-json-elements/287992
    Here is my issue posted on the forum but I seem to have gotten no replies.
    Antoine Cotten
    @antoineco
    OK I see.
    Antoine Cotten
    @antoineco

    I'm not an expert when it comes to filtering and transforming data using Logstash, but it seems like the data format you're using makes this problem tricky, because the nested object has unpredictable keys.
    It would be simpler if the data looked more like

    {
      "ip": "127.0.0.1",
      "protocol": "ssh",
      "data": {
        // data for the message related to the "ssh" protocol
      }

    This way, you could always be sure the timestamp can be accessed at [data][timestamp], regardless of the protocol.

    In case modifying the data schema isn't an option for you, you're probably going to need a complex conditional which covers all types of data which you expect to receive (or one date filter per type, which is similar in my opinion).
    Antoine Cotten
    @antoineco
    Another approach, which doesn't require modifying the actual message but involves injecting metadata, would be to let the sender "label" the payload, for example with message_type: ssh. This way, you could use an expression such as "${[data][@metadata.message_type][timestamp]}. (not exactly sure about the format, but you get the idea)
    But this is just another workaround around the impractical data schema.
    Siddharth Balyan
    @alt-glitch
    Ahh i see. I was wondering if I could access the indices numerally (like in python?) such as data[0][timestamp] but looks like this isn't feasible. Thanks for your help! I'll look at some of these ideas :)
    Siddharth Balyan
    @alt-glitch

    Hey @antoineco thanks for all your help again! I was able to solve my issue using ruby to iterate over the events. I am not sure if it's optimal but it works for now :P
    logstash.conf

    input {
        beats {
            port => 5044
        }
        tcp {
            port => 5000
        }
    }
    
    filter {
        json {
            source => "message"
        }
    
        ruby {
            code => "
            event.get('[data]').each do |key, value|
                value.each do |k, v|
                    if k == 'timestamp'
                        event.set('@timestamp', LogStash::Timestamp.parse_iso8601(v))
                    end
                end
            end 
            "
        }
    
        mutate {
            remove_field => ["message", "@version"]
        }
    }
    
    output {
        elasticsearch {
            hosts => "elasticsearch:9200"
        }
    }

    The JSON files get indexed properly now!

    image.png
    Antoine Cotten
    @antoineco

    @alt-glitch awesome! I was also going to suggest using Ruby's .each method but I wasn't 100% sure how the JSON was being parsed under the hood. Looks like it's just a plain Ruby Hash.

    You could optimize the Ruby code a bit by replacing the value.each do ... end iteration with a direct access to the timestamp key, since it's the only value you are actually interested in:

    data = event.get('[data]')
    if data.is_a?(Hash)
        data.each do |key, value|
            if value.is_a?(Hash)
                ts = value.fetch('timestamp', nil)
                event.set('@timestamp', LogStash::Timestamp.parse_iso8601(ts)) if ts
            end
        end
    end

    Also the check on the data type with is_a? is important here, because if you receive a JSON where data does not exist, or contains at least one top-level value which isn't a Hash (like a String, Int, Bool or Array), value.each will crash with undefined methodeach' for "value":String`.

    Siddharth Balyan
    @alt-glitch
    After a lot of googling, I found out that JSON was being parsed as Ruby Hashes. Realised I had to iterate through the hashes to get the timestamp.
    Thanks for the optimization and error resistance! I was wondering if this was the ideal solution or not (since this will scale up)
    Antoine Cotten
    @antoineco
    If your Ruby code is written carefully, I believe it's a better approach than trying to chain multiple filters, definitely.
    Pre-made filters like date exist to cover common use-cases, but when you need something tailored, writing a little bit of Ruby is unavoidable and might even perform much better.
    Aiden Mitchell
    @aidenmitchell
    Hi there! Are there instructions on how to setup Elastic Security with this Docker image? I believe additional ports are needed for Fleet Server etc.
    Antoine Cotten
    @antoineco
    @aidenmitchell Are you referring to this product? https://www.elastic.co/security
    Isn't it only available in the Elastic Cloud?
    Antoine Cotten
    @antoineco
    Ah no, it's part of Kibana! (learned something)
    https://www.elastic.co/guide/en/security/current/sec-requirements.html
    Antoine Cotten
    @antoineco
    I believe the steps listed here should be sufficient? https://www.elastic.co/guide/en/security/current/detections-permissions-section.html
    The data ingestion part depends on the shipper(s) you want to use, but it doesn't seem like you need to open any additional port for that: https://www.elastic.co/guide/en/security/current/ingest-data.html
    Regarding the Fleet server, it communicates with Elasticsearch on the standard HTTP port (9200) if I understand the docs correctly: https://www.elastic.co/guide/en/fleet/7.15/fleet-server.html
    On this page, in the "self managed" tab, there is a description of a couple of steps you can follow in Kibana to add a Fleet server.
    Let me know if you need any assistance!
    Aiden Mitchell
    @aidenmitchell
    @antoineco Great, thank you!
    Siddharth Balyan
    @alt-glitch
    Hey @antoineco. What do you think is the optimum way of using Filebeats with docker-elk? Is it better to install filebeats locally or integrate it with docker-elk?
    Antoine Cotten
    @antoineco
    @alt-glitch sorry I completely forgot to respond!
    Antoine Cotten
    @antoineco

    In my opinion it's better to run Beats agents (Filebeat, Metricsbeat, ...) separately from the Elastic stack.

    The ELK stack is the central part of the architecture, where data is stored and processed.
    Whereas Beats is a suite of lightweight agents which purpose is to collect data on various hosts, and send it to the stack for further processing.

    It wouldn't make much sense to run Beats and the Elastic stack as part of the same Compose deployment, unless the host running the stack is the only host you want to collect file data from.

    Siddharth Balyan
    @alt-glitch

    @alt-glitch sorry I completely forgot to respond!

    No problem! :D

    It wouldn't make much sense to run Beats and the Elastic stack as part of the same Compose deployment, unless the host running the stack is the only host you want to collect file data from.

    Yeah, currently the agents are running on the same host but that is definitely going to change in the future

    To give a rough overview; I want to use the ELK stack to ingest network/security scans and make them searchable and easy to visualise. This would increase in importance in case we move to a continuous scanning pipeline. So the end goal is to have the security people easily be able to view this scan data (example shown in the JSON documents I sent above) on Kibana, in a timeline
    Since i want make setting this up as easy for them as possible I was considering integrating FIlebeats (which ships the scan data) into the Compose deployment
    But yeah, a few scripts have made installing and using it easy and your point makes sense!
    Antoine Cotten
    @antoineco
    I mean, you can still use Compose to run the Filebeats agents on the hosts, either within the same Compose file as the stack or outside of it (inside its own Compose file).