Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Viliam Durina
    @viliam-durina
    @DeveloperPad So you have 1-minute tumbling windows, but your measurements are every 5 minutes? That means that 4 windows have no measurement and every 5th window has 1 measurement to aggregate?
    With this sparse events you probably also hit the idle timeout, which is 60 seconds by default. You can set it by calling StreamSource.setPartitionIdleTimeout()
    Marko Topolnik
    @mtopolnik

    use native timestamps (timestamp of temperature measurement)

    These will actually be the timestamps determined by Kafka. You may want to use the timestamps from the original events for better precision.

    if they upload in 5-min intervals but aren't aligned, i suggest using the sliding window instead. You can have it emit results every minute, but aggregate over 5 or 6 minutes so that it always contains the most recent upload from each device
    you'll still have to set the event disorder to at least 5 minutes (but probably a bit more)
    Can Gencer
    @cangencer
    event lag represents how much out of order you have within the same kafka topic and partition. If within the same partition, you have items out of order then you need to set the lag larger.
    how many partitions do you use in Kafka and does your messages have a key?
    Caio Guedes
    @caioguedes
    @cangencer at https://github.com/hazelcast/hazelcast-jet/pull/2276#discussion_r431671600, Could you suggest how to mark the options as deprecated? I did not see a way via picocli. I am thinking just preppend "[DEPRECATED]" on descriptions.
    Can Gencer
    @cangencer
    it should be fine just to add a note to the description
    Lucas Kinne
    @DeveloperPad

    @viliam-durina @mtopolnik @cangencer
    We have around 100 (and rising) Raspberry Pis collecting measurements from various sensors in mostly 5 minute intervals.
    The measurements are stored in a redis database on the Pis first and uploaded in 5-minute intervals to a Spring backend.
    The backend parses the measurements (sent via HTTP Request) and sends them to Kafka.
    We thereby specify the timestamp of the measurement as the Kafka timestamp, so that timestamp is what Jet uses as the "native timestamp", isn't it?

    We have 4 kafka partitions and use a Raspberry Pi unique ID as the key, so we probably have like 25 Pis in each partition.
    Given the fact that we have multiple Pis per partition, which upload their data independently of each other, we necessarily have event disorder on partition level.

    This 1-minute tumbling window was probably chosen unfortunately in this case, I agree.
    It is just for testing though and from my understanding I thought that I should at least get one window result every 5 window aggregations then.

    So tomorrow I will test to:

    • adjust the window so that I will always have at least one measurement in each window
    • increase the partition idle timeout to more than 5 minutes (This was probably the main problem here, right?)
    • maybe increase the max lag a bit more (but I think it should be fine, otherwise Jet would have printed a "skipping event" message)
    Viliam Durina
    @viliam-durina
    We thereby specify the timestamp of the measurement as the Kafka timestamp, so that timestamp is what Jet uses as the "native timestamp", isn't it?
    yes, the native timestamp is the Kafka record timestamp
    I'm not sure but I think the Kafka record timestamp is mostly monotonic, but in edge cases it can be out of order, such as when a partition is moved to another broker after a failure or if the system clock is adjusted backwards.
    Marko Topolnik
    @mtopolnik
    you would have event disorder only if you observed the actual timestamps when the measurements were made, and assuming the measurements are made over the whole 5-minute timespan and not sampled just before sending to Kafka
    Lucas Kinne
    @DeveloperPad

    Hey guys, it is me again.
    I somehow do not get the windowing working correctly.

    In this first example I am just passing the incoming measurements (from a kafka topic) through the pipeline and sinking them in the logger. It clearly shows that measurements are coming in continuously: https://pastebin.com/Jua8Zvec

    In this second example, I try to do a simple counting aggregation with a 1 minute tumbling window over the incoming measurements grouped by the record key (unique ID of raspberry pi, of which the measurement comes from): https://pastebin.com/PYtQKWAZ

    When executing this second example, I am not getting any output (window results) at all, no matter how long I wait. What am I doing wrong here?

    Thanks in advance!

    Can Gencer
    @cangencer
    do you mean to set your window size as 60ms ?
    you say window size is seconds, but then input parameter for is in milliseconds in here: .window(WindowDefinition.sliding(WINDOW_SIZE, WINDOW_SLIDE))
    also to me it seems like your timestamp unit is in epoch seconds, not milliseconds
    Can Gencer
    @cangencer
    but your lag is in milliseconds
    the lag is defined in the same unit as the input data
    so if your timestamps are in seconds, everything else should be in seconds (except partition idle timeout - that's always in milliseconds because it uses system time)
    Marko Topolnik
    @mtopolnik
    some substantial lag here:
    <dd60bdbe-eb31-4f6a-a3c0-549224794afe> 15.318546411028365 @ 1591371455
    <85b52bfc-5d66-43a0-8178-9ea81da3ca81> 12.134935743991809 @ 1591371035
    that's way more than 5 minutes
    Can Gencer
    @cangencer
    the way it's configured, it's 5000 minutes now
    Marko Topolnik
    @mtopolnik
    So Lucas just wasn't patient enough :)
    but, more relevant is that it was supposed to stay within five minutes
    Lucas Kinne
    @DeveloperPad

    Ahh... That makes a lot of sense now. Thanks guys. :)
    So to fix this, I have to either adjust all my own time units (except source partition timeout) to values of unit seconds, or use milliseconds and adjust the measurement timestamps accordingly? That explains now, why I didn't find many information about which time unit to use for the input parameters, if it depends on the input data timestamp unit.

    It is possible that bigger lags are occurring within the measurement stream, but that is not a big deal. They are supposed to be discarded as the events are too late to be processed anyway.

    Can Gencer
    @cangencer
    we've set up a slack workspace for Hazelcast Users, which you can join here: https://hz-community-slack.herokuapp.com/ (right now it's experimental, but likely we'll phase out gitter if it works just as well or better)
    peterchenadded
    @peterchenadded
    Hi, anyone seem below error before?
    Caused by: com.hazelcast.jet.JetException: Unable to serialize instance of class java.util.HashMap$Node: there is no suitable serializer for class java.util.HashMap$Node
    peterchenadded
    @peterchenadded
    Wasnt an issue in older version of jet, error in Jet 4.1.1
    Viliam Durina
    @viliam-durina
    @peterchenadded Hmm, HashMap doesn't normally serialize HashMap$Node, that's an internal object. Can you share the full stack trace?
    peterchenadded
    @peterchenadded
    2020-08-14 17:02:25,220 [hz.dazzling_noyce.cached.thread-4] ERROR c.h.jet.impl.MasterJobContext - Execution of job '04cf-3091-ef80-0002', execution 04cf-2fb3-7ad4-0001 failed
    Start time: 2020-08-14T17:02:24.022
    Duration: 1,194 ms
    For further details enable JobConfig.storeMetricsAfterJobCompletion
    com.hazelcast.jet.JetException: Exception in SenderTasklet{ordinal=0, destinationAddress=[localhost]:5701, sourceVertexName='accumulate-properties'}: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'java.util.HashMap$Node'
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$CooperativeWorker.runTasklet(TaskletExecutionService.java:373)
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$CooperativeWorker.run(TaskletExecutionService.java:346)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'java.util.HashMap$Node'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:115)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:269)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:378)
    at com.hazelcast.jet.impl.execution.SenderTasklet.tryFillOutputBuffer(SenderTasklet.java:152)
    at com.hazelcast.jet.impl.execution.SenderTasklet.call(SenderTasklet.java:112)
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$CooperativeWorker.runTasklet(TaskletExecutionService.java:366)
    ... 3 common frames omitted
    Caused by: com.hazelcast.jet.JetException: Unable to serialize instance of class java.util.HashMap$Node: There is no suitable serializer for class java.util.HashMap$Node - Note: You can register a serializer using JobConfig.registerSerializer()
    at com.hazelcast.jet.impl.serialization.DelegatingSerializationService.serializationException(DelegatingSerializationService.java:138)
    at com.hazelcast.jet.impl.serialization.DelegatingSerializationService.serializerFor(DelegatingSerializationService.java:127)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:265)
    ... 7 common frames omitted
    Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: There is no suitable serializer for class java.util.HashMap$Node
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.serializerFor(AbstractSerializationService.java:507)
    at com.hazelcast.jet.impl.serialization.DelegatingSerializationService.serializerFor(DelegatingSerializationService.java:125)
    ... 8 common frames omitted
    HazelcastGitter
    @HazelcastGitter
    [Marko Topolnik, Hazelcast] if you have something like traverseIterable(hashMap.entrySet()) and try to emit that as the output in a flatmap satge, you'd get this error i think
    peterchenadded
    @peterchenadded
    @mtopolnik return Traversers.traverseIterable(distinctCorrelationIds.entrySet());
    @mtopolnik seems your spot on
    peterchenadded
    @peterchenadded
    @mtopolnik i guess there no default serializer for this and i would have to create my own? was working without issues in older version of jet (0.7)
    HazelcastGitter
    @HazelcastGitter
    [Marko Topolnik, Hazelcast] HashMap.entrySet() has an optimized, zero-allocation implementation, which means it exposes its internal map nodes to you. You must not let them escape the method.
    [Marko Topolnik, Hazelcast] the way to approach is it to map right away to another type, like Jet entry()
    [Marko Topolnik, Hazelcast] traverseIterable(map.entrySet()).map(e -> Util.entry(e.getKey(), e.getValue());
    peterchenadded
    @peterchenadded
    @mtopolnik ok will give that a try
    HazelcastGitter
    @HazelcastGitter
    [Marko Topolnik, Hazelcast] if you keep using and updating the correlationIDs after emitting it, then you should transform it entirely before passing on to the traverser
    [Marko Topolnik, Hazelcast] return traverseIterable(correlationIds .entrySet().stream() .map(e -> entry(e.getKey(), e.getValue())) .collect(Collectors.toList()));
    peterchenadded
    @peterchenadded
    @mtopolnik are there benefits to converts to Util.entry as there quite alot of places i would have to update. Is it simplier to add a HashMap$Node serializer?
    HazelcastGitter
    @HazelcastGitter
    [Marko Topolnik, Hazelcast] if you keep updating the map that you emit, you'll have to copy the contents anyway
    [Marko Topolnik, Hazelcast] you can always write a helper method that will be very simple to use in all places
    peterchenadded
    @peterchenadded
    @mtopolnik i checked it was too many places to manually update, have added a simple HashMap$Node serializer and errors gone now
    peterchenadded
    @peterchenadded
    @mtopolnik thanks for the awesome help, will do some performance testing to make sure no issues
    HazelcastGitter
    @HazelcastGitter
    [Marko Topolnik, Hazelcast] OK, good luck :)
    Can Gencer
    @cangencer
    Hi, we'll be decommissioning this channel soon - please join us at https://slack.hazelcast.com