tls
branch is because web browsers would show users a loud warning when they try to open the Kibana URL, due to the fact that the certificate presented by Kibana wouldn't be considered "trusted". We were afraid this would be a harsh user experience, so we preferred to let users decide about the certificate they want to use (and be intentional about it).
Ah cool. What i basically want to do is to assign the timestamp from a nested JSON log. If this is the file;
{
"ip": "127.0.0.1",
"data": {
"ssh": {
"status": "success",
"protocol": "ssh",
"result": {
"server_id": {
"raw": "SSH-2.0-OpenSSH_X.X",
"version": "2.0",
"software": "OpenSSH_X.X"
},
"algorithm_selection": {
"dh_kex_algorithm": "curve25519-sha256@libssh.org",
"host_key_algorithm": "ecdsa-sha2-nistp256",
"client_to_server_alg_group": {
"cipher": "aes128-ctr",
"mac": "hmac-sha2-256",
"compression": "none"
},
"server_to_client_alg_group": {
"cipher": "aes128-ctr",
"mac": "hmac-sha2-256",
"compression": "none"
}
},
"key_exchange": {
"curve25519_sha256_params": {
"server_public": "fnBsVPDLuMTsaQBUTii6/cBuG3+AUeDIFj4QlcdYqEM="
}
}
},
"timestamp": "2021-10-14T06:42:07Z"
}
}
}
How do I read the timestamp from inside a nested JSON?
input {
beats {
port => 5044
}
tcp {
port => 5000
}
}
filter {
json {
source => "message"
}
ruby {
code => '
t = event.get("[data]")
event.set(event.get("[@timestamp]"), t.values[0][timestamp])
'
}
mutate {
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "XXXXXXXX"
password => "XXXXXXX"
ecs_compatibility => disabled
}
}
filter {
if [data][ssh][timestamp] {
mutate {
replace => { "[@timestamp]" => "${[data][ssh][timestamp]}" }
}
}
}
filter {
json {
source => "message"
}
date {
match => [ "[data][ssh][timestamp]", "ISO8601" ]
}
mutate {
remove_field => [ "message" ]
}
}
codec => json
directly on the input
instead of inside a filter
(example: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html#plugins-inputs-tcp-codec), this way you wouldn't need to perform this deletion with remove_field
since the message would already enter the pipeline as structured data.
@alt-glitch Maybe you can achieve that with a single mutation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
filter { if [data][ssh][timestamp] { mutate { replace => { "[@timestamp]" => "${[data][ssh][timestamp]}" } } } }
But the problem is that my log files are not consistent. I have different files for different protocols; ssh, postgres, FTP, HTTP, etc etc.
{
"ip": "127.0.0.1",
"data": {
"postgres": {
"status": "success",
"protocol": "postgres",
"result": {
"supported_versions": "FATAL: unsupported frontend protocol 0.0: server supports 1.0 to 3.0",
"protocol_error": {
"code": "0A000",
"file": "postmaster.c",
"line": "2071",
"message": "unsupported frontend protocol 255.255: server supports 1.0 to 3.0",
"routine": "ProcessStartupPacket",
"severity": "FATAL",
"severity_v": "FATAL"
},
"is_ssl": true
},
"timestamp": "2021-10-14T06:42:08Z"
}
}
}
^^ another example
Please note that it might be more convenient to set
codec => json
directly on theinput
instead of inside afilter
(example: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html#plugins-inputs-tcp-codec), this way you wouldn't need to perform this deletion withremove_field
since the message would already enter the pipeline as structured data.
This is actually a dummy, test config. I am using your entire docker-elk and ingest this logstash pipeline into elasticsearch. But to troubleshoot, I'm using the config I shared above
data[protocol][timestamp]
I'm not an expert when it comes to filtering and transforming data using Logstash, but it seems like the data format you're using makes this problem tricky, because the nested object has unpredictable keys.
It would be simpler if the data looked more like
{
"ip": "127.0.0.1",
"protocol": "ssh",
"data": {
// data for the message related to the "ssh" protocol
}
This way, you could always be sure the timestamp can be accessed at [data][timestamp]
, regardless of the protocol.
date
filter per type, which is similar in my opinion).
message_type: ssh
. This way, you could use an expression such as "${[data][@metadata.message_type][timestamp]}
. (not exactly sure about the format, but you get the idea)Hey @antoineco thanks for all your help again! I was able to solve my issue using ruby to iterate over the events. I am not sure if it's optimal but it works for now :P
logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
}
}
filter {
json {
source => "message"
}
ruby {
code => "
event.get('[data]').each do |key, value|
value.each do |k, v|
if k == 'timestamp'
event.set('@timestamp', LogStash::Timestamp.parse_iso8601(v))
end
end
end
"
}
mutate {
remove_field => ["message", "@version"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
The JSON files get indexed properly now!
@alt-glitch awesome! I was also going to suggest using Ruby's .each
method but I wasn't 100% sure how the JSON was being parsed under the hood. Looks like it's just a plain Ruby Hash.
You could optimize the Ruby code a bit by replacing the value.each do ... end
iteration with a direct access to the timestamp
key, since it's the only value you are actually interested in:
data = event.get('[data]')
if data.is_a?(Hash)
data.each do |key, value|
if value.is_a?(Hash)
ts = value.fetch('timestamp', nil)
event.set('@timestamp', LogStash::Timestamp.parse_iso8601(ts)) if ts
end
end
end
Also the check on the data type with is_a?
is important here, because if you receive a JSON where data
does not exist, or contains at least one top-level value which isn't a Hash (like a String, Int, Bool or Array), value.each
will crash with undefined method
each' for "value":String`.
date
exist to cover common use-cases, but when you need something tailored, writing a little bit of Ruby is unavoidable and might even perform much better.
In my opinion it's better to run Beats agents (Filebeat, Metricsbeat, ...) separately from the Elastic stack.
The ELK stack is the central part of the architecture, where data is stored and processed.
Whereas Beats is a suite of lightweight agents which purpose is to collect data on various hosts, and send it to the stack for further processing.
It wouldn't make much sense to run Beats and the Elastic stack as part of the same Compose deployment, unless the host running the stack is the only host you want to collect file data from.
It wouldn't make much sense to run Beats and the Elastic stack as part of the same Compose deployment, unless the host running the stack is the only host you want to collect file data from.
Yeah, currently the agents are running on the same host but that is definitely going to change in the future