Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Robert D. Blanchet Jr.
    @blanchet4forte
    Any help would be appreciated
    Robert D. Blanchet Jr.
    @blanchet4forte
    Also another question, as I'm new to Akka Streams in general. Whats the difference between using a RestartSource vs having a materializer setting with a decider that resumes/restarts the stream, aside from the materializer decider allowing me to configure when to resume/restart is there anything else different about a RestartSource and a materializer restarter?
    doohan
    @doohan
    @johanandren thanks for that, that certainly helps clear things up a bit
    Mayank
    @mayankbansal93

    Hi,
    I have created 4 partitions and consuming the packet using
    Consumer.committablePartitionedSource(consumerSettings, subscription).flatMapMerge(4, Pair::second)

    Packets are consuming sequentially (I think Packets are getting consumed in Round-robin order) , but not in parallel. Right now, application is running on one instance only.
    How can I achieve parallelism?
    and
    How would it behave when application will run on multiple instances?

    Robert D. Blanchet Jr.
    @blanchet4forte
    Can anyone answer my question above? Please
    Johan Andrén
    @johanandren
    @blanchet4forte The supervision strategy attribute is up to each operator if they support or not, and works on a per operator-level, think .map throwing an error and resume. The restart source/flow/sink will restart and potentially back off a whole section of a stream when a failure occurs, think restart my streaming database client and then whole beginning of the stream where I have a buffer and some aggregation going on.
    Robert D. Blanchet Jr.
    @blanchet4forte
    In our project we're using a Consumer.plainSource and tracking our own offsets. When we recreate a Consumer and give it the Subscriptions.assignmentWithOffset value we're seeing the Consumer repeat that message. Should we be doing something like lastOffsetReceived + 1?
    Srepfler Srdan
    @schrepfler
    I'm struggling to find some good examples of source/sink where we can pass/read kafka headers metadata.
    is this referenced in the docs somewhere?
    alian
    @lianfulei

    Have you ever encountered this problem? The client uses librdkafka to connect Kafka to report errors

    2019-07-18 08:33:24,418 - [ERROR] : [kafka] consumer error 192.168.1.3:9092/3: Disconnected (after 307230ms in state UP)
    2019-07-18 08:33:24,419 - [ERROR] : [kafka] consumer error 192.168.1.2:9092/2: Disconnected (after 307230ms in state UP)
    2019-07-18 08:33:24,419 - [ERROR] : [kafka] comsuming from : topic,partition 0,offset 0, Error Local: Broker transport failure.
    2019-07-18 08:33:24,419 - [ERROR] : [kafka] consumer error 192.168.1.1:9092/1: Disconnected (after 307229ms in state UP)
    2019-07-18 08:33:24,419 - [ERROR] : [kafka] consumer error 3/3 brokers are down
    2019-07-18 08:33:25,418 - [ERROR] : [kafka] consumer error 192.168.1.3:9092/3: Connect to ipv4#192.168.1.3:9092 failed: 套接字操作尝试一个无法连接的主机。.. (after 999ms in state CONNECT)
    2019-07-18 08:33:25,418 - [ERROR] : [kafka] consumer error 192.168.1.1:9092/1: Connect to ipv4#192.168.1.3:9092 failed: 套接字操作尝试一个无法连接的主机。.. (after 998ms in state CONNECT)
    2019-07-18 08:33:25,419 - [ERROR] : [kafka] consumer error 192.168.1.2:9092/2: Connect to ipv4#192.168.1.3:9092 failed: 套接字操作尝试一个无法连接的主机。.. (after 999ms in state CONNECT)

    tayvs
    @tayvs
    hi! How to ensure event order by timestamp when consuming from few topics? The Problem is that one topic produce created event and second produce updated events. Updated event timestamp always greater then created event, but they come into stream out of order.
    Robert D. Blanchet Jr.
    @blanchet4forte
    I'm struggling with a particular issue. We're using dynamic topic creation instead of relying pre configured topics. We're also externally managing our offsets for consumers. With auto.offset.reset set to the standard value of latest if a new consumer is created for a topic that doesn't yet exist that topic is created, as we like. However, the consumer is dropping the first message that's published to it. If we change auto.offset.reset to earliest it picks up the first message.. but every time that consumer is recreated it is playing back the full retained log for that topic.. which is not what I want.
    Robert D. Blanchet Jr.
    @blanchet4forte
    We're giving the consumer the offsets to start consuming from so I'm not even sure why this setting is overriding that.
    Robert D. Blanchet Jr.
    @blanchet4forte
    This seems to be really broken so if I could get an answer I would greatly appreciate it. The Alpakka docs state that if we are using plain consumer we can manage our own offsets externally from Kafka/Zookeeper. But where we tell the consumer to begin consuming from seems to be completely overridden by the auto.offset.reset value.
    Martynas Mickevičius
    @2m
    @/all artifacts for Alpakka Kafka 1.0.5 are in the Maven Central: https://doc.akka.io/docs/alpakka-kafka/current/release-notes/1.0.x.html#1-0-5
    Srepfler Srdan
    @schrepfler
    What’s flowwithcontext?
    tayvs
    @tayvs
    @schrepfler it's like regular flow but has two values. First one is data as in Flow. Second one is context that propagate with each data but behind the scene. In this case, data is a ConsumerMessage and context is CommitableOffset
    Srepfler Srdan
    @schrepfler
    is this something which travels alongside the kafka message, like to support read/write to kafka headers or is this something in the flow at runtime? If the latter, why just the offset, potentially more stuff can go there (example mapping akka context value back and forth with each element to pass matadata into headers, could be used for tracing)
    tayvs
    @tayvs
    Under the hood it's tuple (ConsumerMessage, CommitableOffset) but functions applied only for ConsumerMsg. It's akka stream feature so you can propagate your own value
    Srepfler Srdan
    @schrepfler
    sure, but what's the use case for alpakka-kafka
    Srepfler Srdan
    @schrepfler
    looking at https://github.com/akka/alpakka-kafka/blob/220022c6a5b6cb4670c1427c3e1a000844cb85f0/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala is metadataFromRecord something which maps to kafka message headers? why mapping it to single string?
    tayvs
    @tayvs

    My English is not so good for explanation. Example:

          Consumer
            .sourceWithOffsetContext(topic, consumerSettings)
            .mapAsync(4)(someBusinessFunction)
            .to(Committer.sinkWithOffsetContext(committerSettings))

    vs

          Consumer
            .committableSource(topic, consumerSettings)
            .mapAsync(4) { case CommittableMessage(record, committableOffset) => someBusinessFunction(record).map(_ => committableOffset) }
            .to(Committer.sink(committerSettings))
    Srepfler Srdan
    @schrepfler
    what would .map(_ => committableOffset) achieve?
    commit the offset as read?
    tayvs
    @tayvs
    Committer.sinkexpect CommitableOffset and we should provide it explicitly
    Srepfler Srdan
    @schrepfler
    I see
    Srepfler Srdan
    @schrepfler
    it feels a bit weird we haven't tapped into kafka headers so far guys, any reason we're postponing it? seems like something which can be trivially returned as a side value next to the value in a tuple
    MayankKhemka94
    @MayankKhemka94
    Hi I am using alpakka with basic config properties with my own custom dispatcher to read from kafka. But due to high CPU utilization, I analysed thread dump and got to know streams is using custom dispatcher and default dispatchers as well ("akka.kafka.default-dispatcher" for "kafka source" and "default-akka.actor" with "fork-join executor"). I am using thread pool executor in my actor system but somehow it is still using fork-join executor. Can anyone help on this?
    Martynas Mickevičius
    @2m
    @MayankKhemka94 can you sample what code fork-join executor is running? That would help to determine which part of your application is configured to use that executor.
    MayankKhemka94
    @MayankKhemka94

    "0x00000007aee4ecf0
    ForkJoinExecutorConfigurator$AkkaForkJoinPool

    Threads waiting for notification on lock:
    default-akka.actor.default-dispatcher-4 " This type of logs i am getting in thread dump analyzer.

    Martynas Mickevičius
    @2m
    Did you change the akka.actor.default-dispatcher.executor https://github.com/akka/akka/blob/v2.5.23/akka-actor/src/main/resources/reference.conf#L359 ?
    doohan
    @doohan
    Nope still using the default executor
    Should I change that?
    doohan
    @doohan
    Sorry wrong windows:)
    MayankKhemka94
    @MayankKhemka94
    @2m yes i made that change in application.conf. But still it is creating fork-join executor dispatcher.
    Martynas Mickevičius
    @2m
    MayankKhemka94
    @MayankKhemka94
    So , can you suggest what changes should i make in config to avoid default executor as it led to high CPU utilization?
    Martynas Mickevičius
    @2m
    Do you have Future transformations in your code that take implicit execution context?
    If so, you can give the system.dispatcher instead, which will be the one, that you configured for Akka to use.
    MayankKhemka94
    @MayankKhemka94
    @2m no i don't have any future transformations in my code.
    Rotem Fogel
    @rotemfo_twitter
    Hi All,
    Trying to stream Consumer events to Akka HTTP SSE (Server Side Events).
    Would appreciate any help
    Enno
    @ennru
    @rotemfo_twitter Can you be more specific about what you wonder about?
    Gabriel Raineri
    @graineri
    Hey guys, does any of you know how to combine a RestartSource with a DrainingControl. Here's a snippet of what I'm trying to achieve. Any help will be greatly appreciated.
            RestartSource
              .onFailuresWithBackoff(
                minBackoff = 3.seconds,
                maxBackoff = 30.seconds,
                randomFactor = 0.2
              ) { () =>
                Consumer
                  .committableSource[String, String](consumerSettings, subscription)
                  .mapAsyncUnordered(parallelism) { msg =>
                    Future.successful().map(_ => msg.committableOffset)
                  }
                  .withAttributes(resumeOnExceptionsStrategy(resumeOnExceptions))
                  .toMat(Committer.sink(committerSettings))(Keep.both)
                  .mapMaterializedValue(DrainingControl.apply)
                  .run()
    tayvs
    @tayvs
    @graineri I think you can find this example useful
    Zburator
    @Zburator
    hi guys, Could someone please explain me what does the consumer property auto.offset.reset does?
    Tim Moore
    @TimMoore
    @Zburator it determines whether a new consumer with no stored offset starts consuming from the beginning or the end of a topic with existing messages in it.
    Salil Kanetkar
    @salilkanetkar
    Hi,
    I had a question about using the Alpakka Kafka connector. When using the Producer API, is there a persistent connection kept open to Kafka? If yes, does the application need to have logic to reconnect if the connection breaks?