Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    or, if that's the only thing you're doing with awk then do this: https://lab.benthos.dev/l/08YuUFbjN-V
    Lichtsinnig
    @Lichtsinnig

    In the working configuration, it turned out

    I am writing examples for working with Benthos that you can post on a cookbook

    Lichtsinnig
    @Lichtsinnig

    Hello!
    I do not understand the switch behavior of the processor with the key fallthrough: true

    Expected Result

    {"a":[1],"aa":1,"bb":"time"}

    It seems that the next block is just executed without checking the condition

    Example

    Input

    {"aa": 1,"bb":"time"}

    Config

    input:
      type: benthos_lab
    buffer:
      type: none
      none: {}
    pipeline:
      processors:
      - switch:
        - condition:
            json:
              path: aa
              operator: equals
              arg: 1
          processors:
            - json:
                path: a
                operator: append
                value: 1
          fallthrough: true     
        - condition:
            json:
              path: bb
              operator: equals
              arg: "tim2e"
          processors:
            - json:
                path: b
                operator: append
                value: 1
          fallthrough: true     
    output:
      type: benthos_lab

    Result

    {"a":[1],"aa":1,"b":[1],"bb":"time"}

    Save session https://lab.benthos.dev/l/pIq2Vxhtc_a

    Ashley Jeffs
    @Jeffail
    fallthrough means the next case is executed if the current case resolves true
    if you just want to check two separate conditions then follow switch with the first case with a second switch of the second case
    Lichtsinnig
    @Lichtsinnig
    Clear.
    Thus, I need to use several conditional or switch statements for multiple data checks.
    Lichtsinnig
    @Lichtsinnig

    Hi, here is an example of combining arrays

    input

    {"i":[1,2,3],"b":[5,6,7]}

    Config

    - process_field:
        path: L
        result_type: object
        processors:
          - jmespath:
              query: '[i[], b[]]|[]'

    result

    [1,2,3,5,6,7]
    Lichtsinnig
    @Lichtsinnig
    Hello!
    Can you give an example of how to correctly register a constant in benthos configuration?
    7 replies
    Lichtsinnig
    @Lichtsinnig

    Hey. How do I use "ref" in stream configurations?

    I run benthos in stream mode and indicate the folder where the configurations are located

    benthos --streams --streams-dir ./streams

    If my configuration has a reference, I get an error

                    - $ref: "./ref/severity.yaml#processors/1"

    Error

    {"@timestamp":"2020-03-13T15:48:24+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: failed to resolve $ref fragment 'processors/1' in config 'streams/ref/severity.yaml': failed to resolve JSON pointer: path must begin with '/'"}

    If I set the absolute path

                    - $ref: "/Users/user/refeverity.yaml#processors/1"

    Error

    {"@timestamp":"2020-03-13T15:53:06+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: failed to resolve $ref fragment 'processors/1' in config '/Users/user/ref/severity.yaml': failed to resolve JSON pointer: path must begin with '/'"}
    1 reply
    Lichtsinnig
    @Lichtsinnig
    Hey. After updating from version 9 to version 11 I get errors
    MB:runtime user$ benthos --streams --streams-dir ./streams
    SIGILL: illegal instruction
    PC=0x106a230 m=0 sigcode=1
    
    goroutine 1 [running, locked to thread]:
    runtime.asyncPreempt()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/preempt_amd64.s:8 fp=0xc0006bfec0 sp=0xc0006bfeb8 pc=0x106a230
    github.com/Jeffail/benthos/v3/lib/input.init()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/input/broker.go:23 fp=0xc0006bfec8 sp=0xc0006bfec0 pc=0x1f49310
    runtime.doInit(0x3a779c0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5414 +0x8a fp=0xc0006bfef8 sp=0xc0006bfec8 pc=0x104522a
    runtime.doInit(0x3a6c5c0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff28 sp=0xc0006bfef8 pc=0x10451f7
    runtime.doInit(0x3a6ebe0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff58 sp=0xc0006bff28 pc=0x10451f7
    runtime.doInit(0x3a5c4a0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff88 sp=0xc0006bff58 pc=0x10451f7
    runtime.main()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:190 +0x1ce fp=0xc0006bffe0 sp=0xc0006bff88 pc=0x103862e
    runtime.goexit()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc0006bffe8 sp=0xc0006bffe0 pc=0x10689c1
    
    goroutine 21 [select]:
    go.opencensus.io/stats/view.(*worker).start(0xc00009fb80)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:154 +0x100
    created by go.opencensus.io/stats/view.init.0
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:32 +0x57
    
    rax    0x3a779c0
    rbx    0x33
    rcx    0x0
    rdx    0x3a77b58
    rdi    0xc0005cc7d8
    rsi    0x1f49310
    rbp    0xc0006bfee8
    rsp    0xc0006bfeb8
    r8     0xc0005cc750
    r9     0x4b
    r10    0x0
    r11    0x1
    r12    0xff
    r13    0x0
    r14    0x2c47fe4
    r15    0x0
    rip    0x106a230
    rflags 0x10206
    cs     0x2b
    fs     0x0
    gs     0x0
    MB:runtime user$ benthos --streams --streams-dir ./streams
    SIGILL: illegal instruction
    PC=0x106a230 m=5 sigcode=1
    
    goroutine 1 [running]:
    runtime.asyncPreempt()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/preempt_amd64.s:8 fp=0xc000618dd0 sp=0xc000618dc8 pc=0x106a230
    encoding/json.(*decodeState).rescanLiteral(0xc0001556b0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:360 +0xef fp=0xc000618e00 sp=0xc000618dd0 pc=0x110883f
    encoding/json.(*decodeState).objectInterface(0xc0001556b0, 0xc000588fd0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:1104 +0x7c fp=0xc000618e88 sp=0xc000618e00 pc=0x110f52c
    encoding/json.(*decodeState).object(0xc0001556b0, 0x23b7600, 0xc000588fd0, 0x194, 0xc0001556d8, 0x7b)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:638 +0x1ead fp=0xc000619118 sp=0xc000618e88 pc=0x110bf3d
    encoding/json.(*decodeState).value(0xc0001556b0, 0x23b7600, 0xc000588fd0, 0x194, 0xc00006cae0, 0x94)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:387 +0x6d fp=0xc000619180 sp=0xc000619118 pc=0x110898d
    encoding/json.(*decodeState).object(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0x16, 0xc0001556d8, 0x7b)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:782 +0x12e5 fp=0xc000619410 sp=0xc000619180 pc=0x110b375
    encoding/json.(*decodeState).value(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0x16, 0xc0006194c0, 0x1119ac1)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:387 +0x6d fp=0xc000619478 sp=0xc000619410 pc=0x110898d
    encoding/json.(*decodeState).unmarshal(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0xc0001556d8, 0x0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:180 +0x1f0 fp=0xc000619500 sp=0xc000619478 pc=0x11080e0
    encoding/json.Unmarshal(0xc00068e000, 0x305b, 0x3500, 0x22cada0, 0xc0000114b0, 0x0, 0x0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:107 +0x112 fp=0xc000619548 sp=0xc000619500 pc=0x1107a92
    github.com/Jeffail/benthos/v3/lib/output.sanitiseConfig(0x26d4a28, 0x6, 0x2706ece, 0x22, 0x1, 0x26e31b5, 0x10, 0x0, 0x26d401a, 0x6, ...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/output/constructor.go:214 +0x149 fp=0xc00061f2d0 sp=0xc000619548 pc=0x2128da9
    github.com/Jeffail/benthos/v3/lib/output.SanitiseConfig(...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/output/constructor.go:204
    github.com/Jeffail/benthos/v3/lib/config.Type.Sanitised(0x26dbf34, 0xc, 0x26d0890, 0x2, 0x26d69ad, 0x8, 0x0, 0x26d3394, 0x5, 0x2706ece, ...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/config/config.go:76 +0x135 fp=0xc000621bf8 sp=0xc00061f2d0 pc=0x21b3b65
    github.com/Jeffail/benthos/v3/lib/service.Run()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/service/service.go:506 +0x716 fp=0xc00063ff78 sp=0xc000621bf8 pc=0x21ccd26
    main.main()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/cmd/benthos/main.go:8 +0x20 fp=0xc00063ff88 sp=0xc00063ff78 pc=0x21cf520
    runtime.main()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203 +0x212 fp=0xc00063ffe0 sp=0xc00063ff88 pc=0x1038672
    runtime.goexit()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00063ffe8 sp=0xc00063ffe0 pc=0x10689c1
    
    goroutine 5 [select]:
    go.opencensus.io/stats/view.(*worker).start(0xc0001140a0)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:154 +0x100
    created by go.opencensus.io/stats/view.init.0
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:32 +0x57
    
    
    rax    0x3
    rbx    0x3a
    rcx    0xc0001556b0
    rdx    0xc0002275c0
    rdi    0xc0001556d8
    rsi    0x0
    rbp    0xc000618df0
    rsp    0xc000618dc8
    r8     0x22
    r9     0xbb
    r10    0x0
    r11    0x1
    r12    0xffffffffffffffff
    r13    0xb1
    r14    0xb0
    r15    0x200
    rip    0x106a230
    rflags 0x10202
    cs     0x2b
    fs     0x0
    gs     0x0

    Clean installation did not help

    Golang updated to version 1.14

    OS Mac OS

    Ashley Jeffs
    @Jeffail
    yikes, looks like it's a regression in Go 1.14: golang/go#37459
    thanks for raising this, best roll back for now
    Neil Volungis
    @nvolungis
    hi all! hoping you can help me evaluate if benthos could be a good fit for an analytics pipeline we're setting up. most of our transformations are stateless, but we have a filtering component which drops messages if they come from ips on a blocklist. the messages and blocklist updates are published to the same topic, so as new blocklist messages come in, we update some state and filter messages accordingly.
    we were planning on using flink for our processors as it has built in support for stateful processing, but our org doesn't really have much java expertise and honestly, it's not a func ecosystem to work in.
    the question is, does this like something we could accomplish with benthos? the cache mechanism seems like the most likely place for this data to be stored, but there seems to be a scenario where if a consumer fails and needs to reprocess messages, state could become out of sync.
    Neil Volungis
    @nvolungis
    any guidance you could provide would be greatly appreciated!
    19 replies
    Patrick Robinson
    @patrobinson
    Seeing a weird issue, probably holding it wrong.
    When I use json processor inside a switch and set a field, it overwrites the json blob
    But when I use it as a normal processor using set appends the field
    Patrick Robinson
    @patrobinson

    Fixed it. Changed

    json:
      operator: set
      value:
        Message: "my message"

    To:

    json:
      operator: set
      path: Message
      value: "my message"
    Patrick Robinson
    @patrobinson
    Maybe that is the default behaviour but because I was using it inside a process_dag, that appends
    Ema
    @Tahrima

    Hi! My data is coming in this json format from Graphite, and I'm trying to use the awk processor to split the value in target. I'm unable to get this to work, and wanted help on whether I'm accessing the json correctly. Thanks!

    [
       { 
          "target": "folder.structure.name", 
          "datapoints": [
             [825516.0, 1586212620]
          ]
       }
    ]

    Here is the processor code:

     - awk:
          codec: text
          program: |
            {
              split($0, target_split, ".")
              json_set(0.target_names, target_split)
            }
    30 replies
    xiaoshui1994
    @xiaoshui1994
    Hi @Jeffail I need to write data to the same output at an uncertain time in the plugin (requires only one input and output). How do you do it?
    19 replies
    xiaoshui1994
    @xiaoshui1994
    image.png
    Ashley Jeffs
    @Jeffail
    Benthos v3.12.0 is out: https://github.com/Jeffail/benthos/releases/tag/v3.12.0, comes with a much improved CLI and a swanky new language for function interpolations, you can read more about bloblang here: https://www.benthos.dev/blog/2020/04/18/sneak-peek-at-bloblang
    Ashley Jeffs
    @Jeffail
    It now performs much more strict parsing of your configs, so if you find after an upgrade that your config doesnt run and you dont have time yet to fix it you can skip the more strict checking with benthos --chilled -c yourconfig.yaml
    Grant Edmunds
    @grantallen7538_gitlab

    I'm having trouble getting a json document into ElasticSearch. The error I get is: Error 400 (Bad Request): failed to parse field [parameters.value] of type [text] in document with id '1-1589230096'. Preview of field's value: '{dataType=2, name=Device.Services.STBService.1.Components.VideoDecoderNumberOfEntries, value=1}' [type=mapper_parsing_exception]"}.

    The json document is too big to paste into this blog! https://jsonformatter.org/json-parser shows that the document is valid json.

    17 replies
    Alex Dempsey
    @avdempsey
    For kinesis inputs, I see I can set a limit on the number of records I receive in a request, but it's not obvious how I might limit the rate that I call kinesis. My neighbors on the stream are complaining. I see the rate_limit resource is not available for kinesis.
    14 replies
    dmoeff
    @dmoeff
    I am using Kafka input to connect to Azure Event Hub. Azure Event Hub has Kafka endpoint that allows Kafka clients to connect. I am able to get the messages, but there is a problem with metadata. Azure Event Hub headers are converted into Kafka metadata and because the headers are AMQP encoded, Kafka consumer is unable to properly decode them which results in something like this: '\xa1\aSN12345' where the real output should be: 'SN12345'. Better explanation of the issue is here: Azure/azure-event-hubs-for-kafka#56 . What is the best way in Benthos to deal with this issue?
    7 replies
    David Reno
    @dareno
    noob question: looks like the http_server input doesn't support oauth and the http processor is not designed for processing the incoming request. Am I right that this is not supported? Is this by design (e.g. it wouldn't make sense to support this use case) or is it just not done yet and maybe on the roadmap?
    1 reply
    dmoeff
    @dmoeff
    How I can persist in message metadata a result from cache Get operation?
    8 replies
    Ashley Jeffs
    @Jeffail
    has anyone found any issues or fundamental problems with bloblang? I'm putting a release out this weekend and am considering labelling it as stable
    dmoeff
    @dmoeff
    image.png
    18 replies
    Amin Alid
    @aminalid_gitlab
    Hi all, i am trying to use http_client as an input for an API resource, this API on success returns an empty body, code 200 and some needed values in the headers. My issue is that it looks like in case of an empty body payload input never flushes the response down to the pipeline. I have tried to do several things including playing with all http_client variables (stream, successful_on) but always get the same result where input is just waiting. Appreciate any thoughts on this )
    4 replies
    dmoeff
    @dmoeff

    How to convert the following json:

    {
    "MeasurementRecords": {
    "HistoryRecords": [
    {
    "List": {
    "HistoryRecord": {
    "State": "Valid",
    "TimeStamp": "2020-06-09T14:31:52Z",
    "Value": "5610.47"
    }
    },
    "ValueItemId": "100_IND-000000000001_000000000001_Resource-1.WHr"
    }
    ]
    }
    }

    into the following json

    {
    "State": "Valid",
    "TimeStamp": "2020-06-09T14:31:52Z",
    "Value": "5610.47"
    "ValueItemId": "100_IND-000000000001_000000000001_Resource-1.WHr"
    }

    8 replies
    Andrew Wang
    @awangc
    Is there TLS config support for output to elasticsearch (i.e. to an Elasticsearch https endpoint)?
    3 replies
    Cameron Braid
    @cameronbraid
    Hi, I have just discovered Benthos. it looks really cool. A question on correlating messages. I have a single kafka stream that has foo and bar messages and they both have a id field. I want to correlate each foo with their bar based on the id. they may arrive in either order and are typicaly very close to each other in time. Can this be done ?
    10 replies
    Cameron Braid
    @cameronbraid
    Looking for alternatives to kafka streams for writing pipelines. I understand that kafka streams does exactly once processing (https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/), so just trying to understand how this differs from Benthos. From what I have read its 'at least once' if you don't use a buffer
    If I use a kafka input and kafka output, can Benthos also do exactly once ?
    2 replies
    Raghu Vamshi Challa
    @raghu999
    Hi Team,
    I am new to benthos and I am trying to parse key/value from a json event can someone help me with the config
    Log Event:
    {
      "_index": "ingest",
      "_type": "_doc",
      "_id": "4-96813136",
      "_version": 1,
      "_score": 0,
      "_source": {
        "container_id": "5c8ae598bb7ba825aee730f691127a9c26ee74348785bddb927534ed07bbf9b862",
        "container_name": "/ecs-api-ea8bdc89e0e4b3c56b00",
        "date": 1596813131,
        "ecs_cluster": "arn:aws:ecs:us-east-2:example:cluster/api",
        "ecs_task_arn": "arn:aws:ecs:us-east-2:example:cluster/api/ea8bdc89e0e4b3c56b00",
        "ecs_task_definition": "example-api-cw-test:39",
        "log": "{\"@timestamp\":\"2020-08-07T15:12:11.333Z\", \"log.level\":\"DEBUG\", \"message\":\"Autowiring by type from bean name 'upstreamHealthIndicator' via constructor to bean named 'service-discovery-com.config.ServiceDiscovery'\", \"service.name\":\"example\",\"event.dataset\":\"example.log\",\"process.thread.name\":\"main\",\"log.logger\":\"org.springframework.beans.factory.support.DefaultListableBeanFactory\"}",
        "source": "stdout"
      }
    }
    Benthos config:
        # Common config fields, showing default values
        input:
          http_server:
            address: ""
            path: /post
            ws_path: /post/ws
            timeout: 5s
            rate_limit: ""
        pipeline:
          threads: 2
          processors:
            - unarchive:
                format: json_array
            - type: split
            - parse_log:
                format: syslog_rfc5424
                codec: json
        # Config fields, showing default values
        output:
          broker:
            pattern: greedy
            outputs:
            - stdout:
                delimiter: ""
    8 replies
    Andrew Wang
    @awangc

    A few questions on benthos metrics, I am reading this page https://www.benthos.dev/docs/components/metrics/about/ but:

    • I cannot see some metrics, e.g., input_latency, while I can see other input metrics. Is there any configuration I'm missing?

    • I cannot see buffer not pipeline type of metrics, but see some metrics named output_broker_outputs_1_batch_sent. Is there any correlation between them? If not, what are the broker type of metrics? Any configuration I'm missing for the former two types?

    I configured benthos to export Prometheus style metrics. Thanks in advance!

    11 replies
    Maurizio Abba
    @AxelrodG_twitter

    Hey All! I just wanted to let the community know that WhiteOps (https://www.whiteops.com) started sponsoring this great project! We are doing Bot Detection for advertisment, marketing, applications. Feel free to reach out for more information!

    We are using Benthos as part of our data pipeline to deliver notifications to our customers.

    Keep up the good work! :tada:

    Ashley Jeffs
    @Jeffail
    :heart: thanks @AxelrodG_twitter
    Maurizio Abba
    @AxelrodG_twitter

    Hey Ashley, I saw 3.26.0rc1 is out! Thank you for the max_in_flight to the dynamic output, we will use that a lot!!

    What is your take on pre-releases and official releases? What is your checklist that need to be ticked before making a candidate official release?

    1 reply
    Ashley Jeffs
    @Jeffail
    v3.26.0 officially released: https://github.com/Jeffail/benthos/releases/tag/v3.26.0, I've put a blog post up outlining some cool updates around workflows: https://www.benthos.dev/blog/2020/08/30/improved-workflows
    Ashley Jeffs
    @Jeffail
    Hey everyone, I've updated the community page to include a link to a discord server I've set up: https://www.benthos.dev/community, I'll still be active on gitter but discord seems to be easier for people to access and works a lot better
    nishantkarve
    @nishantkarve
    Hey everyone!
    I am trying to connect to a Kafka Broker that is SSL enabled. What should I include (the extensions, .Pem, .key, .cer etc) in the following fields when I am trying to use Kafka as an Input source
    root_cas_file: ""
    I think client_certs is described in the documents as .PEM and .KEY for cert_file and key_file respectively