@marcingrzejszczak this is my sample code
// sender
sender = KafkaSender.create(senderOptions);
processor = DirectProcessor.<Tuple2<String,String>>create().serialize();
sink = processor.sink();
sender.send(processor.map(objects -> SenderRecord.create(new ProducerRecord<>("topic1", objects.getT1(), objects.getT2()),objects.getT2())))
.doOnNext(senderResult -> log.info(senderResult.correlationMetadata()))
.subscribe();
// receiver
receiver = KafkaReceiver.create(receiverOptions);
receiver
.receive()
.doOnNext(receiverRecord -> log.info("key: {}, value: {}", receiverRecord.key(), stringStringReceiverRecord.value()))
.subscribe();
send on rest call
sink.next(Tuples.of("key", ""+System.nanoTime()));
log:
2020-05-29 00:07:43.063 INFO [demo,,,] 31028 --- [ single-1] com.example.demo.DemoApplication : 88843267100400
2020-05-29 00:07:43.064 INFO [demo,,,] 31028 --- [ parallel-2] com.example.demo.DemoApplication : key: key, value: 88843267100400
spring.sleuth.remote-fields
vs spring.sleuth.baggage.remote-fields
baggage.
whereas in the documentation without