Hey guys,
I am facing a problem with event discorder and max lag.
My jet job is supposed to:
My problem is that the devices/sensors upload their measurements to the kafka topic (source for jet job) in parallel and only in 5-minute intervals.
This results in event disorder. I thought that I could fix this by setting withNativeTimestamps(300_000) (5 min) to counteract the upload interval.
No events are skipped now, but I am not getting any window result either.
What am I doing wrong? Do I use a wrong time unit for conversion or is my understand of the max lag concept wrong?
Thanks in advance!
Sincerely,
Pad
StreamSource.setPartitionIdleTimeout()
@viliam-durina @mtopolnik @cangencer
We have around 100 (and rising) Raspberry Pis collecting measurements from various sensors in mostly 5 minute intervals.
The measurements are stored in a redis database on the Pis first and uploaded in 5-minute intervals to a Spring backend.
The backend parses the measurements (sent via HTTP Request) and sends them to Kafka.
We thereby specify the timestamp of the measurement as the Kafka timestamp, so that timestamp is what Jet uses as the "native timestamp", isn't it?
We have 4 kafka partitions and use a Raspberry Pi unique ID as the key, so we probably have like 25 Pis in each partition.
Given the fact that we have multiple Pis per partition, which upload their data independently of each other, we necessarily have event disorder on partition level.
This 1-minute tumbling window was probably chosen unfortunately in this case, I agree.
It is just for testing though and from my understanding I thought that I should at least get one window result every 5 window aggregations then.
So tomorrow I will test to:
Hey guys, it is me again.
I somehow do not get the windowing working correctly.
In this first example I am just passing the incoming measurements (from a kafka topic) through the pipeline and sinking them in the logger. It clearly shows that measurements are coming in continuously: https://pastebin.com/Jua8Zvec
In this second example, I try to do a simple counting aggregation with a 1 minute tumbling window over the incoming measurements grouped by the record key (unique ID of raspberry pi, of which the measurement comes from): https://pastebin.com/PYtQKWAZ
When executing this second example, I am not getting any output (window results) at all, no matter how long I wait. What am I doing wrong here?
Thanks in advance!
.window(WindowDefinition.sliding(WINDOW_SIZE, WINDOW_SLIDE))
Ahh... That makes a lot of sense now. Thanks guys. :)
So to fix this, I have to either adjust all my own time units (except source partition timeout) to values of unit seconds, or use milliseconds and adjust the measurement timestamps accordingly? That explains now, why I didn't find many information about which time unit to use for the input parameters, if it depends on the input data timestamp unit.
It is possible that bigger lags are occurring within the measurement stream, but that is not a big deal. They are supposed to be discarded as the events are too late to be processed anyway.
entry()
traverseIterable(map.entrySet()).map(e -> Util.entry(e.getKey(), e.getValue())
;