Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Caio Guedes
    @caioguedes
    in this case, submit -n is a valued option, so we could use -cn in this case
    also, -a is used for --all in list-jobs, so we need other name for addresses in this case
    Can Gencer
    @cangencer
    We can use -c for cluster name and -a for "addresses". It will break list-jobs, but that's probably OK, we can only have --all in list-jobs. Another option is to use something like -t ("targets")
    441061753
    @441061753
    Why is 4.0 IMap cannot be resolved to a type
    com.hazelcast.core.IMap to com.hazelcast.map.IMap
    Can Gencer
    @cangencer
    are you using an older version? the package was moved to com.hazelcast.map.IMap
    Caio Guedes
    @caioguedes

    We can use -c for cluster name and -a for "addresses". It will break list-jobs, but that's probably OK, we can only have --all in list-jobs. Another option is to use something like -t ("targets")

    unfortunately -c is used to --class in submit too ahahah, I liked the -t to address, also remove -a in list-job. Any other suggestion for cluster name?

    Can Gencer
    @cangencer
    we can use -k like kluster :) another option is to do something like jet -t jet@127.0.0.1:5701
    Caio Guedes
    @caioguedes
    ohh jet -t jet@127.0.0.1:5701 is really good! I will stick with that, we don't use -t anywhere, and seems logical because both parameters just work if provided together. Cool with multiples address jet -t jet@127.0.0.1:5701,127.0.0.1:5702 \o/
    Can Gencer
    @cangencer
    yeah, and you could default to jet if @ part is missing
    Sunil Jain
    @Sunil-Jain
    Hi @Holmistr , regarding #2085, I will move the constant to JetConfig class keeping in mind KISS and YAGNI principles. let me know if you guys think If there is a better solution.
    Tomasz Gawęda
    @TomaszGaweda
    Hi! I've got a question for you. Our customer wants one node to be only a "front-end" node (well, it's with UI for controlling the app), rest of the nodes are nodes that hold data and run processing. Frontend node is a lite member. We want to do the same for Jet module, but there's a problem: we cannot drop files for "front-end" node, because it's a lite member; reading processor will be sent only to other nodes. Is there a way to allow sources-only to be run on lite Jet members?
    Marko Topolnik
    @mtopolnik
    that would have to be a new feature
    Can Gencer
    @cangencer
    maybe you can setup some kind of shared folder/ file system?
    or rsync
    Tomasz Gawęda
    @TomaszGaweda
    yes, that's the one option we've think about, just wanted to know if there's some built-in feature I'm now aware of. Thank you :)
    Lucas Kinne
    @DeveloperPad

    Hey guys,

    I am facing a problem with event discorder and max lag.

    My jet job is supposed to:

    • consume data from a kafka topic (temperature values from different devices/sensors)
    • use native timestamps (timestamp of temperature measurement)
    • group by key (device/sensor from which the measurement is from)
    • do window aggregation (averaging double values with window size = 60 (1 min), window slide = 60 (1 min)
    • write window results to another kafka topic

    My problem is that the devices/sensors upload their measurements to the kafka topic (source for jet job) in parallel and only in 5-minute intervals.
    This results in event disorder. I thought that I could fix this by setting withNativeTimestamps(300_000) (5 min) to counteract the upload interval.
    No events are skipped now, but I am not getting any window result either.

    What am I doing wrong? Do I use a wrong time unit for conversion or is my understand of the max lag concept wrong?

    Thanks in advance!

    Sincerely,
    Pad

    Viliam Durina
    @viliam-durina
    @DeveloperPad So you have 1-minute tumbling windows, but your measurements are every 5 minutes? That means that 4 windows have no measurement and every 5th window has 1 measurement to aggregate?
    With this sparse events you probably also hit the idle timeout, which is 60 seconds by default. You can set it by calling StreamSource.setPartitionIdleTimeout()
    Marko Topolnik
    @mtopolnik

    use native timestamps (timestamp of temperature measurement)

    These will actually be the timestamps determined by Kafka. You may want to use the timestamps from the original events for better precision.

    if they upload in 5-min intervals but aren't aligned, i suggest using the sliding window instead. You can have it emit results every minute, but aggregate over 5 or 6 minutes so that it always contains the most recent upload from each device
    you'll still have to set the event disorder to at least 5 minutes (but probably a bit more)
    Can Gencer
    @cangencer
    event lag represents how much out of order you have within the same kafka topic and partition. If within the same partition, you have items out of order then you need to set the lag larger.
    how many partitions do you use in Kafka and does your messages have a key?
    Caio Guedes
    @caioguedes
    @cangencer at https://github.com/hazelcast/hazelcast-jet/pull/2276#discussion_r431671600, Could you suggest how to mark the options as deprecated? I did not see a way via picocli. I am thinking just preppend "[DEPRECATED]" on descriptions.
    Can Gencer
    @cangencer
    it should be fine just to add a note to the description
    Lucas Kinne
    @DeveloperPad

    @viliam-durina @mtopolnik @cangencer
    We have around 100 (and rising) Raspberry Pis collecting measurements from various sensors in mostly 5 minute intervals.
    The measurements are stored in a redis database on the Pis first and uploaded in 5-minute intervals to a Spring backend.
    The backend parses the measurements (sent via HTTP Request) and sends them to Kafka.
    We thereby specify the timestamp of the measurement as the Kafka timestamp, so that timestamp is what Jet uses as the "native timestamp", isn't it?

    We have 4 kafka partitions and use a Raspberry Pi unique ID as the key, so we probably have like 25 Pis in each partition.
    Given the fact that we have multiple Pis per partition, which upload their data independently of each other, we necessarily have event disorder on partition level.

    This 1-minute tumbling window was probably chosen unfortunately in this case, I agree.
    It is just for testing though and from my understanding I thought that I should at least get one window result every 5 window aggregations then.

    So tomorrow I will test to:

    • adjust the window so that I will always have at least one measurement in each window
    • increase the partition idle timeout to more than 5 minutes (This was probably the main problem here, right?)
    • maybe increase the max lag a bit more (but I think it should be fine, otherwise Jet would have printed a "skipping event" message)
    Viliam Durina
    @viliam-durina
    We thereby specify the timestamp of the measurement as the Kafka timestamp, so that timestamp is what Jet uses as the "native timestamp", isn't it?
    yes, the native timestamp is the Kafka record timestamp
    I'm not sure but I think the Kafka record timestamp is mostly monotonic, but in edge cases it can be out of order, such as when a partition is moved to another broker after a failure or if the system clock is adjusted backwards.
    Marko Topolnik
    @mtopolnik
    you would have event disorder only if you observed the actual timestamps when the measurements were made, and assuming the measurements are made over the whole 5-minute timespan and not sampled just before sending to Kafka
    Lucas Kinne
    @DeveloperPad

    Hey guys, it is me again.
    I somehow do not get the windowing working correctly.

    In this first example I am just passing the incoming measurements (from a kafka topic) through the pipeline and sinking them in the logger. It clearly shows that measurements are coming in continuously: https://pastebin.com/Jua8Zvec

    In this second example, I try to do a simple counting aggregation with a 1 minute tumbling window over the incoming measurements grouped by the record key (unique ID of raspberry pi, of which the measurement comes from): https://pastebin.com/PYtQKWAZ

    When executing this second example, I am not getting any output (window results) at all, no matter how long I wait. What am I doing wrong here?

    Thanks in advance!

    Can Gencer
    @cangencer
    do you mean to set your window size as 60ms ?
    you say window size is seconds, but then input parameter for is in milliseconds in here: .window(WindowDefinition.sliding(WINDOW_SIZE, WINDOW_SLIDE))
    also to me it seems like your timestamp unit is in epoch seconds, not milliseconds
    Can Gencer
    @cangencer
    but your lag is in milliseconds
    the lag is defined in the same unit as the input data
    so if your timestamps are in seconds, everything else should be in seconds (except partition idle timeout - that's always in milliseconds because it uses system time)
    Marko Topolnik
    @mtopolnik
    some substantial lag here:
    <dd60bdbe-eb31-4f6a-a3c0-549224794afe> 15.318546411028365 @ 1591371455
    <85b52bfc-5d66-43a0-8178-9ea81da3ca81> 12.134935743991809 @ 1591371035
    that's way more than 5 minutes
    Can Gencer
    @cangencer
    the way it's configured, it's 5000 minutes now
    Marko Topolnik
    @mtopolnik
    So Lucas just wasn't patient enough :)
    but, more relevant is that it was supposed to stay within five minutes
    Lucas Kinne
    @DeveloperPad

    Ahh... That makes a lot of sense now. Thanks guys. :)
    So to fix this, I have to either adjust all my own time units (except source partition timeout) to values of unit seconds, or use milliseconds and adjust the measurement timestamps accordingly? That explains now, why I didn't find many information about which time unit to use for the input parameters, if it depends on the input data timestamp unit.

    It is possible that bigger lags are occurring within the measurement stream, but that is not a big deal. They are supposed to be discarded as the events are too late to be processed anyway.

    Can Gencer
    @cangencer
    we've set up a slack workspace for Hazelcast Users, which you can join here: https://hz-community-slack.herokuapp.com/ (right now it's experimental, but likely we'll phase out gitter if it works just as well or better)
    peterchenadded
    @peterchenadded
    Hi, anyone seem below error before?
    Caused by: com.hazelcast.jet.JetException: Unable to serialize instance of class java.util.HashMap$Node: there is no suitable serializer for class java.util.HashMap$Node
    peterchenadded
    @peterchenadded
    Wasnt an issue in older version of jet, error in Jet 4.1.1
    Viliam Durina
    @viliam-durina
    @peterchenadded Hmm, HashMap doesn't normally serialize HashMap$Node, that's an internal object. Can you share the full stack trace?