These are chat archives for akkadotnet/akka.net

24th
Feb 2018
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 06:48

I'm playing with Stream Refs (in scala). Am I right that an offered stream must be consumed in 30 seconds, after which it's cancelled? This is what I see on consumer side:

[DEBUG] [02/24/2018 09:46:02.139] [test-akka.actor.default-dispatcher-16] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Scheduled re-delivery of demand until [117]
[DEBUG] [02/24/2018 09:46:03.157] [test-akka.actor.default-dispatcher-20] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Scheduled re-delivery of demand until [117]
[DEBUG] [02/24/2018 09:46:03.792] [test-akka.actor.default-dispatcher-3] [akka.tcp://test@127.0.0.1:2552/system/remote-watcher] Watchee terminated: [akka.tcp://test@127.0.0.1:2551/user/producer/StreamSupervisor-0/$$b-SinkRef-1]
[DEBUG] [02/24/2018 09:46:03.793] [test-akka.actor.default-dispatcher-3] [akka.tcp://test@127.0.0.1:2552/system/remote-watcher] Unwatching: [akka://test/user/consumer/StreamSupervisor-0/$$a-SourceRef-0 -> akka.tcp://test@127.0.0.1:2551/user/producer/StreamSupervisor-0/$$b-SinkRef-1]
[DEBUG] [02/24/2018 09:46:03.798] [test-akka.actor.default-dispatcher-16] [akka://test/user/consumer/StreamSupervisor-0/flow-0-0-ignoreSink] stopped

I does not seem very reliable.

Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 06:53
from the docs:
each stream reference has a default timeout (of 30 seconds), after which the origin will abort the stream offer if the target has not materialized the stream ref in time
but I did materialize the stream and successfully received elements
while the elements are received, the log looks like
[DEBUG] [02/24/2018 09:56:18.815] [test-akka.actor.default-dispatcher-4] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Received seq SequencedOnNext(115,116) from Actor[akka.tcp://test@127.0.0.1:2551/user/producer/StreamSupervisor-0/$$a-SinkRef-0#1950873775]
[DEBUG] [02/24/2018 09:56:18.815] [test-akka.actor.default-dispatcher-4] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Received seq SequencedOnNext(116,117) from Actor[akka.tcp://test@127.0.0.1:2551/user/producer/StreamSupervisor-0/$$a-SinkRef-0#1950873775]
85 <---------------- this is me logging that an element received
86
87
[DEBUG] [02/24/2018 09:56:19.814] [test-akka.actor.default-dispatcher-20] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Scheduled re-delivery of demand until [117]
88
89
90
[DEBUG] [02/24/2018 09:56:20.838] [test-akka.actor.default-dispatcher-4] [SourceRefStageImpl$$anon$1(akka://test)] [SourceRef-0] Scheduled re-delivery of demand until [117]
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 06:58
these Scheduled re-delivery of demand until look suspicious.
all these look like the consumer is unable to send messages to the publisher, any messages.
I request the source ref with:
val producer = context.actorSelection("akka.tcp://test@127.0.0.1:2551/user/producer")
producer ! RequestSource
Does the selection look ok?
Bartosz Sypytkowski
@Horusiath
Feb 24 2018 07:30
@vasily-kirichenko cannot tell if selection path is ok without code and config ;)
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 08:43
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 08:52
@Horusiath I asked the same question in https://gitter.im/akka/akka
Michael Dadashyan
@mebymyself
Feb 24 2018 15:22
Hello, is there a limit on message sizes? I am getting following error if the message get larger than certain size ( it is list of objects that I cannot control the size).
[ERROR][akka://MySystem/system/transports/akkaprotocolmanager.tcp.0/akkaProtocol-tcp%3A%2F%2FBetMaster%40%5B%3A%3Affff%3A127.0.0.1%5D%3A61010-2] Error while decoding incoming Akka PDU of length 31288 Cause: Akka.Remote.Transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length 31288 ---> Akka.Remote.Transport.PduCodecExceptio n: Decoding PDU failed ---> Google.Protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the midd le of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at Google.Protobuf.CodedInputStream.ReadRawBytes(Int32 size) at Google.Protobuf.CodedInputStream.ReadBytes() at Akka.Remote.Serialization.Proto.Msg.AkkaProtocolMessage.MergeFrom(CodedInputStream input) at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, ByteString data) at Google.Protobuf.MessageParser`1.ParseFrom(ByteString data) at Akka.Remote.Transport.AkkaPduProtobuffCodec.DecodePdu(ByteString raw) --- End of inner exception stack trace --- at Akka.Remote.Transport.AkkaPduProtobuffCodec.DecodePdu(ByteString raw) at Akka.Remote.Transport.ProtocolStateActor.DecodePdu(ByteString pdu) --- End of inner exception stack trace --- at Akka.Remote.Transport.ProtocolStateActor.DecodePdu(ByteString pdu) at Akka.Remote.Transport.ProtocolStateActor.<>c__DisplayClass11_7.<InitializeFSM>b__24(InboundPayload ip) at Akka.Case.With[TMessage](Action`1 action) at Akka.Remote.Transport.ProtocolStateActor.<InitializeFSM>b__11_21(Event`1 event) at Akka.Actor.FSM`2.ProcessEvent(Event`1 fsmEvent, Object source) at Akka.Actor.FSM`2.ProcessMsg(Object any, Object source) at Akka.Actor.FSM`2.Receive(Object message) at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message) at Akka.Actor.ActorCell.ReceiveMessage(Object message) at Akka.Actor.ActorCell.Invoke(Envelope envelope) [ERROR][2/24/2018 3:13:19 PM][Thread 0013][Akka.Remote.Transport.DotNetty.TcpServerHandler] Error caught channel [[::ffff:127.0.0.1]:60997->[::ffff:12 7.0.0.1]:61010](Id=5d9a132f) Cause: DotNetty.Codecs.TooLongFrameException: Adjusted frame length exceeds 256000: 1866691429 - discarded at DotNetty.Codecs.LengthFieldBasedFrameDecoder.Fail(Int64 frameLength) at DotNetty.Codecs.LengthFieldBasedFrameDecoder.FailIfNecessary(Boolean firstDetectionOfTooLongFrame) at DotNetty.Codecs.LengthFieldBasedFrameDecoder.Decode(IChannelHandlerContext context, IByteBuffer input) at DotNetty.Codecs.LengthFieldBasedFrameDecoder.Decode(IChannelHandlerContext context, IByteBuffer input, List`1 output) at DotNetty.Codecs.ByteToMessageDecoder.CallDecode(IChannelHandlerContext context, IByteBuffer input, List`1 output) at DotNetty.Codecs.ByteToMessageDecoder.ChannelRead(IChannelHandlerContext context, Object message) at DotNetty.Transport.Channels.AbstractChannelHandlerContext.InvokeChannelRead(Object msg)
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 15:27
akka {
    dot-netty.tcp {
        # Maximum frame size: 4 MB
        maximum-frame-size = 000000b
    }
}
Michael Dadashyan
@mebymyself
Feb 24 2018 15:33
@vasily-kirichenko Thanks for suggestion. But after applying that I am getting different error:
[Error] Transient association error (association remains live) Akka.Remote.OversizedPayloadException: Discarding oversized payload sent to [akka.tcp://MySystem@127.0.0.1:9246/user/commands#302208411]: max allowed size 128000 bytes, actual size of encoded System.Collections.Generic.List`1[MyModel] was 162353 bytes.
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 15:34
have you add that setting to the both configs?
Michael Dadashyan
@mebymyself
Feb 24 2018 15:34
yes
Vasily Kirichenko
@vasily-kirichenko
Feb 24 2018 15:35
akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    maximum-payload-bytes = 30000000 bytes
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
      message-frame-size =  30000000b
      send-buffer-size =  30000000b
      receive-buffer-size =  30000000b
      maximum-frame-size = 30000000b
    }
  }
}
:)
freshly googled ;)
add dot- of course
Michael Dadashyan
@mebymyself
Feb 24 2018 15:37
Can I set all sizes to 000000b to remove the limits all together? I cannon predict max size here
Michael Dadashyan
@mebymyself
Feb 24 2018 15:43
apparently not, setting to zero still give me the same error of max size of 128000 bytes
Also setting to large number (ex. 10000000b) and I am back to original error:
Error while decoding incoming Akka PDU of length 31252 Cause: Akka.Remote.Transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length 31252 --->Akka.Remote.Transport.PduCodecException: Decoding PDU failed ---> Google.Protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.