Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Dominator-3000
    @Dominator-3000

    Hello.

    We are planning to try this wundertool. Tell me, please, benthos already used in real cases? What load (events per second) was applied to one benthos instance? What about clustering? In our case, it is necessary to process ~ 50k EPS.

    Ashley Jeffs
    @Jeffail
    hey @Dominator-3000, I know of a lot of high volume work being done at Meltwater, there's a write up here: https://underthehood.meltwater.com/blog/2019/08/26/enriching-450m-docs-daily-with-a-boring-stream-processor/
    in terms of clustering it depends on what you're doing with it. If you're using a queue system like Kafka then clustering is as simple as using consumer groups, most behavior within benthos is stateless so they don't have anything to collide over
    so you can just run as many as you need and kill them as often as you like
    Dominator-3000
    @Dominator-3000
    Thanks👍🏼
    Lichtsinnig
    @Lichtsinnig

    Hi, I can not start the stream.

    My configuration:

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - switch:
          - condition:
              check_field:
                path: id
                condition:
                  text:
                    operator: enum
                    arg:
                      - "1"
                      - "2"
            processors:
            - json:
                operator: set
                path: condition
                value: "true"
    output:
      type: http_server

    Start stream command:

    benthos --streams --streams-dir ./stream

    or

    benthos --streams -c ./t.yaml

    When I execute the request for loading threads, I do not get a response :(

    MB:~ user$ curl http://localhost:4195/streams
    MB:~ user$
    I also used the configurations from the examples https://docs.benthos.dev/streams/using_config_files/, but I don’t get a response about creating a stream from restapi.
    What am I doing wrong?
    Lichtsinnig
    @Lichtsinnig
    It looks fine, but I don’t get a response from restapi
    MB:~ user$ benthos --streams --streams-dir ./stream
    {"@timestamp":"2020-02-13T11:26:49+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Launching benthos in streams mode, use CTRL+C to close."}
    {"@timestamp":"2020-02-13T11:26:49+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
    Ashley Jeffs
    @Jeffail
    hey @Lichtsinnig , what are the contents of ./stream?
    Lichtsinnig
    @Lichtsinnig
    Sorry, my mistake. I called ./stream, but
    folder name ./streams
    MB:~ user$ curl http://localhost:4195/streams
    {"t":{"active":true,"uptime":35.253584282,"uptime_str":"35.253584495s"}}
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, i’m probably tired of you with my questions, but I want to understand the intricacies of your CEP.

    MB:~ user$ ls ./streams/
    t.yaml

    MB:~ user$ cat ./streams/t.yaml

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - switch:
          - condition:
              json:
                    path: id
                    operator: equals
                    arg: 1
            processors:
            - json:
                operator: set
                path: condition
                value: "true"
    output:
      type: http_server

    Start stream:

    MB:~ user$ benthos --streams --streams-dir ./streams
    {"@timestamp":"2020-02-13T12:43:11+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Launching benthos in streams mode, use CTRL+C to close."}
    {"@timestamp":"2020-02-13T12:43:11+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Created 1 streams from directory: ./streams"}
    {"@timestamp":"2020-02-13T12:43:11+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}

    Send message:

    MB:~ user$ curl http://localhost:4195/streams/t -d '{"id":1, "key2":"value2"}' -H "Content-Type: application/json" -X POST
    Stream already exists

    I have error

    {"@timestamp":"2020-02-13T12:43:17+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Stream 't' config: line 1: path '': Key 'id' found but is ignored"}
    {"@timestamp":"2020-02-13T12:43:17+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Stream 't' config: line 1: path '': Key 'key2' found but is ignored"}
    Ashley Jeffs
    @Jeffail
    the /streams endpoint is for creating new streams, if you want to send data to your t stream you need to post to /t/post
    I think I need to improve the logging here, Benthos should be telling you when the streams_dir doesn't exist, and the http_server input should be logging that it's listening for requests on the endpoint
    Lichtsinnig
    @Lichtsinnig
    It works, thanks
    curl http://localhost:4195/t/post -d '{"id":1, "key2":"value2"}' -H "Content-Type: application/json" -X POST
    MB:~ user$ benthos --streams --streams-dir ./streams
    {"@timestamp":"2020-02-13T13:39:50+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Launching benthos in streams mode, use CTRL+C to close."}
    {"@timestamp":"2020-02-13T13:39:50+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Created 1 streams from directory: ./streams"}
    {"@timestamp":"2020-02-13T13:39:50+03:00","@service":"benthos","level":"INFO","component":"benthos","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
    {"condition":"true","id":1,"key2":"value2"}
    Ashley Jeffs
    @Jeffail
    I've opened Jeffail/benthos#375 for improving logs around streams mode
    Lichtsinnig
    @Lichtsinnig
    Jeffil, please include my example in the documentation.
    In documentation there is no information on sending given in http listener if you want to send data to your t stream you need to post to /t/post
    Lichtsinnig
    @Lichtsinnig
    Ashley, Hi!
    Lichtsinnig
    @Lichtsinnig

    Hi, I understand the data join functionality.
    Based on the article https://docs.benthos.dev/cookbooks/joining-streams/.
    This article has the config:

    input:
      kafka_balanced:
        addresses:
        - TODO
        topics:
        - articles
        consumer_group: benthos_articles_group
    
    pipeline:
      processors:
      # Reduce document into only fields we wish to cache.
      - jmespath:
          query: '{"article": article}'
    
      # Store reduced articles into our cache.
      - cache:
          operator: set
          cache: hydration_cache
          key: "$ {!json_field: article.id}"
          value: "$ {!content}"
    
    # Drop all articles after they are cached.
    output:
      type: drop
    
    resources:
      caches:
        hydration_cache:
          redis:
            expiration: 168h
            retries: 3
            retry_period: 500ms
            url: TODO

    I'm interested in a block with a cache

      - cache:
          operator: set
          cache: hydration_cache
          key: "$ {!json_field: article.id}"
          value: "$ {!content}"

    The article describes the data that is fed to the incoming stream.

    {
      "type": "article",
      "article": {
        "id": "123foo",
        "title": "Dope article",
        "content": "this is a totally dope article"
      },
      "user": {
        "id": "user1"
      }
    }

    key: "$ {! json_field: article.id}" - is object {"json_field": "123foo"}?

    What is the "!" Symbol used for?

    How do we get content in the value variable?

    value: "$ {! content}"
    Ashley Jeffs
    @Jeffail
    hey @Lichtsinnig, the "${!foo}" syntax is an interpolation function, they're listed in this doc: https://www.benthos.dev/docs/configuration/interpolation
    btw it looks like you're still using https://docs.benthos.dev, there's a newer and better structured docs site at https://www.benthos.dev
    I'm leaving docs.benthos.dev open for a migration period but eventually I plan to redirect it to the new site
    Lichtsinnig
    @Lichtsinnig
    Thanks, i used https://docs.benthos.dev
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, hi!

    Help me please.
    I created a configuration and want to do data enrichment.
    But constantly getting an error "cache not found".
    What am I doing wrong?

    Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
      - process_map:
          premap:
            k1: id
          processors:
            - cache:
                operator: get
                cache: Mcache
                key: "${!json_field:k1}"
          postmap:
    output:
      type: stdout
      stdout:
        delimiter: ""
    resources:
      caches:
        Mcache:
          type: memory
          memory:
            ttl: 60
            init_values:
              1: "t1"
              2: "t2"

    Error

    MB:~ user$ benthos --streams --streams-dir ./streams
    {"@timestamp":"2020-02-17T11:16:03+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to create stream (t): failed to create processor 'process_map': cache not found"}
    Ashley Jeffs
    @Jeffail
    looks like broken YAML, the line postmap doesn't have any fields
    oh and also a stream config can't specify its own resources
    they need to be configured at the root level as they're shared by all streams
    create a root config called something like root.yaml with the contents:
    resources:
      caches:
        Mcache:
          type: memory
          memory:
            ttl: 60
            init_values:
              1: "t1"
              2: "t2"
    and then run benthos -c ./root.yaml --streams --streams-dir ./streams
    Lichtsinnig
    @Lichtsinnig
    Yes, it works!
    Thanks!
    @Jeffail, what objects should be taken out in root config, except for resources?
    Ashley Jeffs
    @Jeffail
    if you have settings for http like debug_endpoints those should be in the root as well
    oh and metricsand logger, a stream config should only contain input, buffer, pipeline and output
    I should probably make that clearer in the docs
    Lichtsinnig
    @Lichtsinnig
    Yes it would be cool!
    Because I did not find this information
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, Hi.
    I want to get a value of the form key \ param.
    For conversion I used JMESPath expression <mark>[key, param] | join ('\', @)</ mark>, which returned a valid result.
    Can I use this expression inside the process_map construct?

    When I try to execute, I get an error.

    Input

    '{"sid":"003", "key2":"value2", "key":"key000","param":"testers"}'
    - process_map:
                  premap:
                    k1: sid
                    k2: key2
                    k3: 
                      - jmespath:
                          query: "[key,param] | join('\\', @)"

    Error:

    MB:~ user$ benthos -c ./root.yaml --streams --streams-dir ./streams
    {"@timestamp":"2020-02-17T16:48:21+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to load stream configs: line 7: line 21: yaml: unmarshal errors:\n  line 26: cannot unmarshal !!seq into string"}

    Full Config

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
        - switch:
          - condition:
              and:
                - json:
                    path: key
                    operator: equals
                    arg: "key000"
                - check_field:
                    path: param
                    condition:
                      text:
                        operator: regexp_partial
                        arg: test*
            processors:
              - process_map:
                  premap:
                    k1: sid
                    k2: key2
                    k3: 
                      - jmespath:
                          query: "[key,param] | join('\\', @)"
                  postmap:
              - jmespath:
                  query: '{"article": @}'
              - process_map:
                  premap:
                    id: article.k1
                    art: article
                  processors:
                    - cache:
                        operator: set
                        cache: Mcache
                        key: "${!json_field:id}"
                        value: "${!content}"        
    output:
      type: stdout
      stdout:
        delimiter: ""
    Ashley Jeffs
    @Jeffail
    not inside the premap args, but you can replace that process_map for just a jmespath expression mapping k1,k2 and k3
    Lichtsinnig
    @Lichtsinnig
    Do I understand correctly, do I need to first prepare the values ​​with the jmespath processor or others, and then send it to the process_map processor?
    Ashley Jeffs
    @Jeffail
    yeah, but it doesnt look like there's any child processors within that particular process_map, so if you're only using it to map values across you can just use jmespath for the whole lot
    Lichtsinnig
    @Lichtsinnig
    Thanks, I will try
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, is it possible to easily create custom fields in json?
    For example:
    custom = key1 \ key.

    It is very simple to create designs in custom format. Using the following models:

    "{0} bla bla bla {1}" -f "param1", "param2"
    or
    custom = (key1, key 2) -join

    Result:
    param1 bla bla bla param2

    Ashley Jeffs
    @Jeffail
    @Lichtsinnig, hey, not sure I follow, can you elaborate on the use case here?
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, hi!
    Your product really lacks field processors, in particular:
    1) there is no simple union of fields.
    2) dynamic field creation

    I will give examples to make it clear:

    Json:

    {"name": "Olympia", "state": "WA", "a": "dd"}

    jmespath expression:

    {www: ([a, state] | join ('\\', @)), p: @}

    Result:

    {
      "www": "dd \\\\ WA",
      "p": {
        "name": "Olympia",
        "state": "WA",
        "a": "dd"
      }
    }

    When we have a json document with more than 1000 lines and the number of formatted “www” fields becomes large, then using the jmespath processor is not a simple solution.

    It would be cool if you added 2 processors

    1) join_fields processor

    Json:

    {"name": "Olympia", "state": "WA", "a": "dd"}

    join_fields processor

      processors:
        - join_fields:
            fieldname: fff
            delimiter: '*'
            keys:
              - text
              - "${!json_field: name}"

    Result:

    {
      "fff": "text*Olympia",
      "name": "Olympia",
      "state": "WA",
      "a": "dd"
    }

    2) format processor

    Json:

    {"name": "Olympia", "state": "WA", "a": "comes", "b": "year"}

    format_fields processor

      processors:
        - format_fields:
            fieldname: fff
            expression: 'Christmas {0} but {1} a {2}'
            keys:
              - "${!json_field: a}"
              - once
              - "${!json_field: b}"

    Result:

    {
      "fff": "Christmas comes but once a year",
      "name": "Olympia",
      "state": "WA",
      "a": "dd"
    }
    Lichtsinnig
    @Lichtsinnig

    @Jeffail, hi. how can I escape the character " ' "?

    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
          - jmespath:
              query: "{www:([to_string(sid),to_string(key2)]| join ('\\',@)), event:@}"
    output:
      type: stdout
      stdout:
        delimiter: ""

    Error:

    MB:~ user$ benthos --streams --streams-dir ./streams
    {"@timestamp":"2020-02-18T11:09:41+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to create stream (t): failed to create processor 'jmespath': failed to compile JMESPath query: SyntaxError: Unclosed delimiter: '"}
    input:
      type: http_server
    buffer:
      type: memory
    pipeline:
      processors:
          - jmespath:
              query: "{www:([to_string(sid),to_string(key2)]| join (''\\'',@)), event:@}"
    output:
      type: stdout
      stdout:
        delimiter: ""

    Error:

    MB:~ user$ benthos --streams --streams-dir ./streams
    {"@timestamp":"2020-02-18T11:12:30+03:00","@service":"benthos","level":"ERROR","component":"benthos","message":"Failed to create stream (t): failed to create processor 'jmespath': failed to compile JMESPath query: SyntaxError: Unknown char: '\\\\'"}
    Dominator-3000
    @Dominator-3000
    @Jeffail 450m docs per day really nice. In this case data stream processed on a single node?
    Do you have any technical recommendations (CPU/RAM/SSD) for benthos nodes?
    Lichtsinnig
    @Lichtsinnig

    @ Jeffy, hi!
    Can you add the functionality of the described modules to benthos?

    @Jeffail, hi!
    Your product really lacks field processors, in particular:
    1) there is no simple union of fields.
    2) dynamic field creation

    I will give examples to make it clear:

    Json:

    {"name": "Olympia", "state": "WA", "a": "dd"}

    jmespath expression:

    {www: ([a, state] | join ('\\', @)), p: @}

    Result:

    {
      "www": "dd \\\\ WA",
      "p": {
        "name": "Olympia",
        "state": "WA",
        "a": "dd"
      }
    }

    When we have a json document with more than 1000 lines and the number of formatted “www” fields becomes large, then using the jmespath processor is not a simple solution.

    It would be cool if you added 2 processors

    1) join_fields processor

    Json:

    {"name": "Olympia", "state": "WA", "a": "dd"}

    join_fields processor

      processors:
        - join_fields:
            fieldname: fff
            delimiter: '*'
            keys:
              - text
              - "${!json_field: name}"

    Result:

    {
      "fff": "text*Olympia",
      "name": "Olympia",
      "state": "WA",
      "a": "dd"
    }

    2) format processor

    Json:

    {"name": "Olympia", "state": "WA", "a": "comes", "b": "year"}

    format_fields processor

      processors:
        - format_fields:
            fieldname: fff
            expression: 'Christmas {0} but {1} a {2}'
            keys:
              - "${!json_field: a}"
              - once
              - "${!json_field: b}"

    Result:

    {
      "fff": "Christmas comes but once a year",
      "name": "Olympia",
      "state": "WA",
      "a": "dd"
    }
    Lichtsinnig
    @Lichtsinnig
    @Jeffail The documentation of awk processor is poorly described. Can you give some examples?
    Lichtsinnig
    @Lichtsinnig
    @Jeffail It is not clear how to use awk functions, there are no syntax examples.
    Ashley Jeffs
    @Jeffail
    sure, I've opened Jeffail/benthos#379