by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Lichtsinnig
    @Lichtsinnig
    @Jeffail It is not clear how to use awk functions, there are no syntax examples.
    Ashley Jeffs
    @Jeffail
    sure, I've opened Jeffail/benthos#379
    Lichtsinnig
    @Lichtsinnig

    Hi, how can I add 2 numbers in a configuration?
    As I understand it, the expression is perceived as a string.

    input

    curl http://localhost:4195/t/post -d '{"fields": "ppp","EventID":"4720","ventropia":"t","summary":"test3","issuetype":"www"}' -H "Content-Type: application/json" -X POST

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - json:
            path: CCC
            operator: set
            value: 0  
        - switch:
          - condition:
            json:
              path: EventID
              operator: equals
              arg: "4720" 
            processors:
              - json:
                  operator: set
                  path: CCC
                  value: ${!json_field:CCC} + 1
    output:
      type: stdout
      stdout:
        delimiter: ""

    Result obtained
    output

    {"CCC":"0 + 1","EventID":"4720","fields":"ppp","issuetype":"www","summary":"test3","ventropia":"t"}

    Desired result
    output

    {"CCC":"1","EventID":"4720","fields":"ppp","issuetype":"www","summary":"test3","ventropia":"t"}
    3 replies
    Lichtsinnig
    @Lichtsinnig

    @Jeffail hi, how can i min. value from array field?

    input

    curl http://localhost:4195/t/post -d '{"list": [1,2,3,4], "EventID": 4720,"TargetUserNameT": "U","u":123}' -H "Content-Type: application/json" -X POST

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - json:
            path: iS
            operator: set
            value:
              jmespath:
                query: min(list)                             
    output:
      type: stdout
      stdout:
        delimiter: ""

    Result obtained
    output

    {"EventID":4720,"TargetUserNameT":"U","iS":[{"jmespath":{"query":"min(list)"}}],"list":[1,2,3,4],"u":123}

    Desired result
    output

    ```
    {"EventID":4720,"TargetUserNameT":"U","iS":1,"list":[1,2,3,4],"u":123}

    1 reply
    Lichtsinnig
    @Lichtsinnig

    Hi, does benthos have full jmespath support or limited?
    I have the feeling that jmespath in benthos doesn't know the length function

    input

    curl http://localhost:4195/t/post -d '{"list": [1,2,3,4], "EventID": 4720,"TargetUserNameT": "U","u":123}' -H "Content-Type: application/json" -X POST

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - jmespath:
            query: length(List)                                
    output:
      type: stdout
      stdout:
        delimiter: ""

    Output:

    {"list": [1,2,3,4], "EventID": 4720,"TargetUserNameT": "U","u":123}

    Expected Result:

    4
    1 reply
    Lichtsinnig
    @Lichtsinnig
    I have an array with duplicate values ​​[1,2,3,1,10,1]. How can I sort an array with unique values?
    And get [1,2,3,10]
    The only way to do this is with awk, are there any other options?
    Ashley Jeffs
    @Jeffail
    Benthos lab has been migrated and upgraded to the latest version: https://lab.benthos.dev/
    Lichtsinnig
    @Lichtsinnig

    Hi. @Jeffail.

    I get an error when I access an array element.

    input

    {"issues":[{"id":123, "key":999},{"id":444, "key":555}], "rrr":"dfg"}

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - process_field:
            codec: json
            path: issues
            processors:      
              - jmespath:
                  query:  "{issues: issues[0].id}"
    output:
      type: stdout
      stdout:
        delimiter: ""

    output error

    {"@timestamp":"2020-03-03T17:28:16+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: yaml: line 11: did not find expected ',' or '}'"}

    Expected Result:

    {"issues" : 123}

    If I do not use an appeal to an array element, I get a valid configuration

    5 replies
    Lichtsinnig
    @Lichtsinnig

    @Jeffail , hi!
    In benthos, is it possible to set a dynamic url in the output processor http_client?

    Example

    - output:
        type: http_client
        http_client:
          url: ${!json_field:issues_id}
          verb: PUT
    "message":"Sending messages via HTTP requests to: ${!json_field:issues_id}"}
    7 replies
    Lichtsinnig
    @Lichtsinnig

    Hi, when converting string-> int I get the desired result, but I am concerned about the error:

    Failed to parse interpolated value 'null' into float

    Input

    {"issues_count":"2"}

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - awk:
            program: |
                      {
                        json_set("issues_count", json_get("issues_count") + 1);
                      }
        - process_field:
            path: issues_count
            result_type: int
            processors:
              - number:
                  operator: add
                  value: "${!json_field:issues_count}"        
    output:
      type: stdout
      stdout:
        delimiter: ""

    Out + error

    {"@timestamp":"2020-03-05T12:28:15+03:00","@service":"benthos","level":"ERROR","component":"benthos.t.pipeline.processor.1.0","message":"Failed to parse interpolated value 'null' into float: strconv.ParseFloat: parsing \"null\": invalid syntax"}
    {"issues_count":3}
    Ashley Jeffs
    @Jeffail
    it's because of that number processor, it's not doing anything
    Lichtsinnig
    @Lichtsinnig
    How am I better off converting string to int?
    Ashley Jeffs
    @Jeffail
    just remove the processors from process_field
    Lichtsinnig
    @Lichtsinnig
    I will try
    or, if that's the only thing you're doing with awk then do this: https://lab.benthos.dev/l/08YuUFbjN-V
    Lichtsinnig
    @Lichtsinnig

    In the working configuration, it turned out

    I am writing examples for working with Benthos that you can post on a cookbook

    Lichtsinnig
    @Lichtsinnig

    Hello!
    I do not understand the switch behavior of the processor with the key fallthrough: true

    Expected Result

    {"a":[1],"aa":1,"bb":"time"}

    It seems that the next block is just executed without checking the condition

    Example

    Input

    {"aa": 1,"bb":"time"}

    Config

    input:
      type: benthos_lab
    buffer:
      type: none
      none: {}
    pipeline:
      processors:
      - switch:
        - condition:
            json:
              path: aa
              operator: equals
              arg: 1
          processors:
            - json:
                path: a
                operator: append
                value: 1
          fallthrough: true     
        - condition:
            json:
              path: bb
              operator: equals
              arg: "tim2e"
          processors:
            - json:
                path: b
                operator: append
                value: 1
          fallthrough: true     
    output:
      type: benthos_lab

    Result

    {"a":[1],"aa":1,"b":[1],"bb":"time"}

    Save session https://lab.benthos.dev/l/pIq2Vxhtc_a

    Ashley Jeffs
    @Jeffail
    fallthrough means the next case is executed if the current case resolves true
    if you just want to check two separate conditions then follow switch with the first case with a second switch of the second case
    Lichtsinnig
    @Lichtsinnig
    Clear.
    Thus, I need to use several conditional or switch statements for multiple data checks.
    Lichtsinnig
    @Lichtsinnig

    Hi, here is an example of combining arrays

    input

    {"i":[1,2,3],"b":[5,6,7]}

    Config

    - process_field:
        path: L
        result_type: object
        processors:
          - jmespath:
              query: '[i[], b[]]|[]'

    result

    [1,2,3,5,6,7]
    Lichtsinnig
    @Lichtsinnig
    Hello!
    Can you give an example of how to correctly register a constant in benthos configuration?
    7 replies
    Lichtsinnig
    @Lichtsinnig

    Hey. How do I use "ref" in stream configurations?

    I run benthos in stream mode and indicate the folder where the configurations are located

    benthos --streams --streams-dir ./streams

    If my configuration has a reference, I get an error

                    - $ref: "./ref/severity.yaml#processors/1"

    Error

    {"@timestamp":"2020-03-13T15:48:24+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: failed to resolve $ref fragment 'processors/1' in config 'streams/ref/severity.yaml': failed to resolve JSON pointer: path must begin with '/'"}

    If I set the absolute path

                    - $ref: "/Users/user/refeverity.yaml#processors/1"

    Error

    {"@timestamp":"2020-03-13T15:53:06+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: failed to resolve $ref fragment 'processors/1' in config '/Users/user/ref/severity.yaml': failed to resolve JSON pointer: path must begin with '/'"}
    1 reply
    Lichtsinnig
    @Lichtsinnig
    Hey. After updating from version 9 to version 11 I get errors
    MB:runtime user$ benthos --streams --streams-dir ./streams
    SIGILL: illegal instruction
    PC=0x106a230 m=0 sigcode=1
    
    goroutine 1 [running, locked to thread]:
    runtime.asyncPreempt()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/preempt_amd64.s:8 fp=0xc0006bfec0 sp=0xc0006bfeb8 pc=0x106a230
    github.com/Jeffail/benthos/v3/lib/input.init()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/input/broker.go:23 fp=0xc0006bfec8 sp=0xc0006bfec0 pc=0x1f49310
    runtime.doInit(0x3a779c0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5414 +0x8a fp=0xc0006bfef8 sp=0xc0006bfec8 pc=0x104522a
    runtime.doInit(0x3a6c5c0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff28 sp=0xc0006bfef8 pc=0x10451f7
    runtime.doInit(0x3a6ebe0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff58 sp=0xc0006bff28 pc=0x10451f7
    runtime.doInit(0x3a5c4a0)
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:5409 +0x57 fp=0xc0006bff88 sp=0xc0006bff58 pc=0x10451f7
    runtime.main()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:190 +0x1ce fp=0xc0006bffe0 sp=0xc0006bff88 pc=0x103862e
    runtime.goexit()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc0006bffe8 sp=0xc0006bffe0 pc=0x10689c1
    
    goroutine 21 [select]:
    go.opencensus.io/stats/view.(*worker).start(0xc00009fb80)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:154 +0x100
    created by go.opencensus.io/stats/view.init.0
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:32 +0x57
    
    rax    0x3a779c0
    rbx    0x33
    rcx    0x0
    rdx    0x3a77b58
    rdi    0xc0005cc7d8
    rsi    0x1f49310
    rbp    0xc0006bfee8
    rsp    0xc0006bfeb8
    r8     0xc0005cc750
    r9     0x4b
    r10    0x0
    r11    0x1
    r12    0xff
    r13    0x0
    r14    0x2c47fe4
    r15    0x0
    rip    0x106a230
    rflags 0x10206
    cs     0x2b
    fs     0x0
    gs     0x0
    MB:runtime user$ benthos --streams --streams-dir ./streams
    SIGILL: illegal instruction
    PC=0x106a230 m=5 sigcode=1
    
    goroutine 1 [running]:
    runtime.asyncPreempt()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/preempt_amd64.s:8 fp=0xc000618dd0 sp=0xc000618dc8 pc=0x106a230
    encoding/json.(*decodeState).rescanLiteral(0xc0001556b0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:360 +0xef fp=0xc000618e00 sp=0xc000618dd0 pc=0x110883f
    encoding/json.(*decodeState).objectInterface(0xc0001556b0, 0xc000588fd0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:1104 +0x7c fp=0xc000618e88 sp=0xc000618e00 pc=0x110f52c
    encoding/json.(*decodeState).object(0xc0001556b0, 0x23b7600, 0xc000588fd0, 0x194, 0xc0001556d8, 0x7b)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:638 +0x1ead fp=0xc000619118 sp=0xc000618e88 pc=0x110bf3d
    encoding/json.(*decodeState).value(0xc0001556b0, 0x23b7600, 0xc000588fd0, 0x194, 0xc00006cae0, 0x94)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:387 +0x6d fp=0xc000619180 sp=0xc000619118 pc=0x110898d
    encoding/json.(*decodeState).object(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0x16, 0xc0001556d8, 0x7b)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:782 +0x12e5 fp=0xc000619410 sp=0xc000619180 pc=0x110b375
    encoding/json.(*decodeState).value(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0x16, 0xc0006194c0, 0x1119ac1)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:387 +0x6d fp=0xc000619478 sp=0xc000619410 pc=0x110898d
    encoding/json.(*decodeState).unmarshal(0xc0001556b0, 0x22cada0, 0xc0000114b0, 0xc0001556d8, 0x0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:180 +0x1f0 fp=0xc000619500 sp=0xc000619478 pc=0x11080e0
    encoding/json.Unmarshal(0xc00068e000, 0x305b, 0x3500, 0x22cada0, 0xc0000114b0, 0x0, 0x0)
        /usr/local/Cellar/go/1.14/libexec/src/encoding/json/decode.go:107 +0x112 fp=0xc000619548 sp=0xc000619500 pc=0x1107a92
    github.com/Jeffail/benthos/v3/lib/output.sanitiseConfig(0x26d4a28, 0x6, 0x2706ece, 0x22, 0x1, 0x26e31b5, 0x10, 0x0, 0x26d401a, 0x6, ...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/output/constructor.go:214 +0x149 fp=0xc00061f2d0 sp=0xc000619548 pc=0x2128da9
    github.com/Jeffail/benthos/v3/lib/output.SanitiseConfig(...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/output/constructor.go:204
    github.com/Jeffail/benthos/v3/lib/config.Type.Sanitised(0x26dbf34, 0xc, 0x26d0890, 0x2, 0x26d69ad, 0x8, 0x0, 0x26d3394, 0x5, 0x2706ece, ...)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/config/config.go:76 +0x135 fp=0xc000621bf8 sp=0xc00061f2d0 pc=0x21b3b65
    github.com/Jeffail/benthos/v3/lib/service.Run()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/lib/service/service.go:506 +0x716 fp=0xc00063ff78 sp=0xc000621bf8 pc=0x21ccd26
    main.main()
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/src/github.com/Jeffail/benthos/cmd/benthos/main.go:8 +0x20 fp=0xc00063ff88 sp=0xc00063ff78 pc=0x21cf520
    runtime.main()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203 +0x212 fp=0xc00063ffe0 sp=0xc00063ff88 pc=0x1038672
    runtime.goexit()
        /usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00063ffe8 sp=0xc00063ffe0 pc=0x10689c1
    
    goroutine 5 [select]:
    go.opencensus.io/stats/view.(*worker).start(0xc0001140a0)
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:154 +0x100
    created by go.opencensus.io/stats/view.init.0
        /private/tmp/benthos-20200310-60282-vti0nm/benthos-3.11.0/pkg/mod/go.opencensus.io@v0.22.2/stats/view/worker.go:32 +0x57
    
    
    rax    0x3
    rbx    0x3a
    rcx    0xc0001556b0
    rdx    0xc0002275c0
    rdi    0xc0001556d8
    rsi    0x0
    rbp    0xc000618df0
    rsp    0xc000618dc8
    r8     0x22
    r9     0xbb
    r10    0x0
    r11    0x1
    r12    0xffffffffffffffff
    r13    0xb1
    r14    0xb0
    r15    0x200
    rip    0x106a230
    rflags 0x10202
    cs     0x2b
    fs     0x0
    gs     0x0

    Clean installation did not help

    Golang updated to version 1.14

    OS Mac OS

    Ashley Jeffs
    @Jeffail
    yikes, looks like it's a regression in Go 1.14: golang/go#37459
    thanks for raising this, best roll back for now
    Neil Volungis
    @nvolungis
    hi all! hoping you can help me evaluate if benthos could be a good fit for an analytics pipeline we're setting up. most of our transformations are stateless, but we have a filtering component which drops messages if they come from ips on a blocklist. the messages and blocklist updates are published to the same topic, so as new blocklist messages come in, we update some state and filter messages accordingly.
    we were planning on using flink for our processors as it has built in support for stateful processing, but our org doesn't really have much java expertise and honestly, it's not a func ecosystem to work in.
    the question is, does this like something we could accomplish with benthos? the cache mechanism seems like the most likely place for this data to be stored, but there seems to be a scenario where if a consumer fails and needs to reprocess messages, state could become out of sync.
    Neil Volungis
    @nvolungis
    any guidance you could provide would be greatly appreciated!
    19 replies
    Patrick Robinson
    @patrobinson
    Seeing a weird issue, probably holding it wrong.
    When I use json processor inside a switch and set a field, it overwrites the json blob
    But when I use it as a normal processor using set appends the field
    Patrick Robinson
    @patrobinson

    Fixed it. Changed

    json:
      operator: set
      value:
        Message: "my message"

    To:

    json:
      operator: set
      path: Message
      value: "my message"
    Patrick Robinson
    @patrobinson
    Maybe that is the default behaviour but because I was using it inside a process_dag, that appends
    Ema
    @Tahrima

    Hi! My data is coming in this json format from Graphite, and I'm trying to use the awk processor to split the value in target. I'm unable to get this to work, and wanted help on whether I'm accessing the json correctly. Thanks!

    [
       { 
          "target": "folder.structure.name", 
          "datapoints": [
             [825516.0, 1586212620]
          ]
       }
    ]

    Here is the processor code:

     - awk:
          codec: text
          program: |
            {
              split($0, target_split, ".")
              json_set(0.target_names, target_split)
            }
    30 replies
    xiaoshui1994
    @xiaoshui1994
    Hi @Jeffail I need to write data to the same output at an uncertain time in the plugin (requires only one input and output). How do you do it?
    19 replies
    xiaoshui1994
    @xiaoshui1994
    image.png
    Ashley Jeffs
    @Jeffail
    Benthos v3.12.0 is out: https://github.com/Jeffail/benthos/releases/tag/v3.12.0, comes with a much improved CLI and a swanky new language for function interpolations, you can read more about bloblang here: https://www.benthos.dev/blog/2020/04/18/sneak-peek-at-bloblang
    Ashley Jeffs
    @Jeffail
    It now performs much more strict parsing of your configs, so if you find after an upgrade that your config doesnt run and you dont have time yet to fix it you can skip the more strict checking with benthos --chilled -c yourconfig.yaml
    Grant Edmunds
    @grantallen7538_gitlab

    I'm having trouble getting a json document into ElasticSearch. The error I get is: Error 400 (Bad Request): failed to parse field [parameters.value] of type [text] in document with id '1-1589230096'. Preview of field's value: '{dataType=2, name=Device.Services.STBService.1.Components.VideoDecoderNumberOfEntries, value=1}' [type=mapper_parsing_exception]"}.

    The json document is too big to paste into this blog! https://jsonformatter.org/json-parser shows that the document is valid json.

    17 replies
    Alex Dempsey
    @avdempsey
    For kinesis inputs, I see I can set a limit on the number of records I receive in a request, but it's not obvious how I might limit the rate that I call kinesis. My neighbors on the stream are complaining. I see the rate_limit resource is not available for kinesis.
    14 replies
    dmoeff
    @dmoeff
    I am using Kafka input to connect to Azure Event Hub. Azure Event Hub has Kafka endpoint that allows Kafka clients to connect. I am able to get the messages, but there is a problem with metadata. Azure Event Hub headers are converted into Kafka metadata and because the headers are AMQP encoded, Kafka consumer is unable to properly decode them which results in something like this: '\xa1\aSN12345' where the real output should be: 'SN12345'. Better explanation of the issue is here: Azure/azure-event-hubs-for-kafka#56 . What is the best way in Benthos to deal with this issue?
    7 replies
    David Reno
    @dareno
    noob question: looks like the http_server input doesn't support oauth and the http processor is not designed for processing the incoming request. Am I right that this is not supported? Is this by design (e.g. it wouldn't make sense to support this use case) or is it just not done yet and maybe on the roadmap?
    1 reply
    dmoeff
    @dmoeff
    How I can persist in message metadata a result from cache Get operation?
    8 replies
    Ashley Jeffs
    @Jeffail
    has anyone found any issues or fundamental problems with bloblang? I'm putting a release out this weekend and am considering labelling it as stable
    dmoeff
    @dmoeff
    image.png
    18 replies