Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 24 20:07
    kvokka commented #819
  • Aug 24 18:47
    pitr-ch labeled #819
  • Aug 24 18:47
    pitr-ch commented #819
  • Aug 24 17:08
    kvokka opened #819
  • Aug 24 12:49

    pitr-ch on master

    CI: Use jruby-9.2.8.0 Merge pull request #818 from ol… (compare)

  • Aug 24 12:49
    pitr-ch closed #818
  • Aug 24 12:41
    pitr-ch labeled #818
  • Aug 24 12:38
    pitr-ch commented #808
  • Aug 24 12:37
    pitr-ch labeled #808
  • Aug 24 12:37
    pitr-ch labeled #808
  • Aug 24 12:35
    pitr-ch commented #796
  • Aug 24 12:35
    pitr-ch commented #796
  • Aug 24 12:30
    pitr-ch assigned #796
  • Aug 24 12:28

    pitr-ch on master

    Use Ruby's Etc.nprocessors if a… Always require etc for Processo… Merge pull request #814 from jh… (compare)

  • Aug 24 12:28
    pitr-ch closed #814
  • Aug 24 12:27
    pitr-ch labeled #809
  • Aug 24 12:13
    pitr-ch labeled #809
  • Aug 24 12:09
    pitr-ch labeled #814
  • Aug 24 12:09
    pitr-ch closed #815
  • Aug 24 12:09
    pitr-ch labeled #815
Charles Oliver Nutter
@headius
Subprocess control would use stdio or a unicorn socket or a fifo or something...it's easier to do this sort of thing with JRuby
Not unicorn, unix
Chalupa Petr
@pitr-ch
@janko-m Hm, I was afraid that the Throttling documentation will need improvement. I'll work on it. In the meantime you could try
NUMBER_OF_CHUNKS  = 10
CHUNK_SIZE        = 10
CONCURRENCY_LEVEL = 4
NEXT_CHUNK        = Concurrent::AtomicFixnum.new 0
RESULTS           = Concurrent::Array.new
DONE              = Concurrent::CountDownLatch.new CONCURRENCY_LEVEL

Concurrent.use_simple_logger Logger::DEBUG

WORK = -> do
  chunk_index = NEXT_CHUNK.increment
  if chunk_index > NUMBER_OF_CHUNKS
    DONE.count_down
  else
    # chunk = File.open(path)
    # chunk.seek(chunk_index * CHUNK_SIZE)
    # do work
    RESULTS.push :result

    # run next available chunk
    Concurrent.global_io_executor.post &WORK
  end
end

CONCURRENCY_LEVEL.times do
  Concurrent.global_io_executor.post(&WORK)
end

DONE.wait
puts RESULTS
Janko Marohnić
@janko
Thanks a lot, that helps :+1:
marzdgzmn
@marzdgzmn
Will it be possible to do a
at_exit do puts 'sync at_exit' if $ERROR_INFO puts 'yeaah' error = { timestamp: Time.now, message: $ERROR_INFO.message, backtrace: $ERROR_INFO.backtrace, } Rise::MirrorManager::LOG.error YAML.dump(error) end end
on each Thread to log any errors?
marzdgzmn
@marzdgzmn
thread_pool = Concurrent::FixedThreadPool.new(10, max_queue:50)
until job_list.empty?
job = job_list.shift
Concurrent::Promises.future(executor: thread_pool) { MyJob.run(job) }.
  rescue{ |e| puts e }.result
end
The raised RuntimeError doesn't seemed to be rescued. How does the rescue work though?
marzdgzmn
@marzdgzmn
Made it worked, thanks anyways :)
Chalupa Petr
@pitr-ch
@marzdgzmn you can use Concurrent.use_simple_logger Logger::DEBUG to get all errors happening in pools logged.
@marzdgzmn what was you problem with rescue? Your core looks fine.
marzdgzmn
@marzdgzmn
@pitr-ch thanks. got it all resolved now.
Although my line Concurrent::Promises.future(executor: thread_pool) { MyJob.run(job) }.
rescue{ |e| puts e }.result seems to be running the job one after the other rather than asynchronously though
marzdgzmn
@marzdgzmn
Okay, I don't understand why, but it works if I do it like this:
tasks = jobs.map { |job| Concurrent::Promises:future(job) { |job| MyJob.run(job)}.rescue { |e| puts e} }
Concurrent::Promises.zip(*tasks).value!
How do I pass in my custom thread_pool though? I defined thread_pool=Concurrent::FixedThreadPool.new(10, max_queue:50)
marzdgzmn
@marzdgzmn
I'm trying to recreate the behavior of mirrors.map { |mirror| Concurrent::Future.execute(executor: thread_pool) { Sync::run(mirror) }}
with mirrors.map { |mirror| Concurrent::Promises.future(mirror) { Sync.run(mirror) }}.rescue { |e| send_error_notification(mirror_name: mirror.name, error: e) }.result } doesn't seem to process tasks in prallel
Chalupa Petr
@pitr-ch
@marzdgzmn The #result method called at the end is blocking, it waits until there is a result available so it forces the futures run sequentially even though they run in the background on a thread pool. Using zip is the right approach. All the methods have *_on alternatives which let you pass in the default executor (which then used by any futures created of the first future which got the custom executor).
The last example has the same problem, blocking result call in the loop (of map method). You can use zip, or loop twice. futures=data.map{|v| future {process data} }; results = futures.map(&:value!)
I hope it helps.
Sebastjan Hribar
@sebastjan-hribar

Hi all, I'm completely new to ruby concurrency, and I'm facing the following problem, which I think might be solvable by using concurrent-ruby:

I have a Hanami based web application which takes users' requests for interfacing with a certain third party software on the server. The TPS takes the request adn runs in foreground (background jobs are not possible). When the action in TPS is completed, a user gets either data from the TPS or just success/fail notification if the request was not about data retrieval but only about completing some action.

The problem: currently, each subsequent request is queued and it seems to be waiting for the TPS to complete the predecessor. However, the TPS enables multi threading by itself and several sessions can be run simultaneously outside our web application.

I've been looking at the documentation, but can't really decide on the best strategy for this problem. Can someone point me in the right direction? From what I've read so far I think the FixedThreadPool might be the way to go, but I'm not sure.
Thank you.

Chris Seaton
@chrisseaton
@sebastjan-hribar could you write some pseudo-code to help illustrate the problem?
Sebastjan Hribar
@sebastjan-hribar

@chrisseaton Please see the simplified example below with commented lines for showing what happens in respective method:

# Method for script to run in the TPS
 def get_data(user_params)
    # check for the TPS-proces and start it if not running
    # open a TPS-session
    # perform task in the TPS
    # close the TPS-session
    # return data to the user or just a success/failure status
 end

# Hanami action
module ControlPanel::Controllers::Commands
  class Update
    include ControlPanel::Action

    before :check_for_signed_in_user

    def call(params)
      # validate and setup params
      get_data(params)

      # process data
      # redirect to target url

    end
  end
end

The problem is, that every http request that triggers the above get_data waits for any predecessor request which might be running. It's like it locks the TPS-process.
This happens also in my desktop app built with Shoes.

Chris Seaton
@chrisseaton
Is this Ruby, or does the TPS itself have some kind of lock that means only one instance can run at a time?
Have you already tried to make this multi-threaded?
If the TPS process does not have a lock, and you run each get_data in a separate thread, then that should be enough to allow them to run in parallel.
Sebastjan Hribar
@sebastjan-hribar
TPS is basicaly VBS, but I'm using win32ole to get around its syntax :)
The TPS doesn't have a lock so I wanted to try what you say. Each get_data should run as a thread. However, this is my first attempt at concurrency in general and to be honest I'd need a starting point how to handle this. From what I gather I shouldn't try to do it myself but use a gem since that is a lot safer. And this gets me back to my initial question if FixedThreadPool is the way to go.
Chris Seaton
@chrisseaton
Oh right, I thought you were wrestling with a more complex problem. Yes created a fixed-sized thread pool of how large you want - probably the number of cores you have - and then submit jobs to it.
I don't know how this interacts with Hanami (or what Hanami is even) so if that isn't calling call in multiple threads then this won't make any difference.
Sebastjan Hribar
@sebastjan-hribar

@chrisseaton Hanami is a ruby web framework.

So what I need to do is wrap my get_data like so:

pool = Concurrent::FixedThreadPool.new(5) # 5 threads
pool.post do
   get_data
end

However, pool should probably be initialized at the app level.

Chris Seaton
@chrisseaton
Must be, yes
But if Hanami calls get_data for each request in a single thread this isn’t going to help you.
Sebastjan Hribar
@sebastjan-hribar
Thank you, I'll check with Hanami devs as well and let you know for future reference, hopefully it can be done.
Debajit Adhikary
@debajit
I'm looking for a data structure / entity that will let me share some state across multiple threads. It looks like I could use Concurrent::Map or an Atom for this. Both reads and writes will be frequent. What is the best Concurrent Ruby solution I could use for this problem?
Christopher A. Williamson
@cawilliamson
Bit of a long shot here folks but I'm struggling to get a project called The Foreman working which relies on your RubyGem for a particular component. In short - the component relies on dynflow which relies on concurrent and concurrent-edge. The problem I'm having is - when the component initializes it says "/usr/lib/ruby/vendor_ruby/dynflow/director.rb:16:in `block in <class:Director>': uninitialized constant Concurrent::Edge::Future (NameError)" - just wondering if it's something obvious?
Chalupa Petr
@pitr-ch
@debajit How does the data look like? There is also Concurrent::Array and Concurrent::Hash.
@cawilliamson Maybe @iNecas will know? Ivan works on the Foreman project.
Trey Matteson
@hsm3
Hello - we are using the ThreadPoolExecutor to run some slow DB tasks concurrently. I'm wondering if there is any good way to 'wrap' all execution by the futures running in the pool with some common code. In our case we have some thread-local state that adds some context to our log messages. I could imagine a subclass of the Executor where I get to override a "run" method, or some middleware-ish hook that could do some setup and then pass along control.
Or maybe the best thing is to create our own variant of Concurrent::Promises.future that wraps the block - though that may involve chasing down more entry points that all need to be treated that way
Thanks for any suggestions
Mikael Henriksson
@mhenrixon
@hsm3 it is easier to suggest something if you show some code.
Димыч
@deemytch
Hi, guys.
Just a simple stupid question: why async don't work for me at all?
I just copied the example from the docs.
require 'concurrent'
class Report          
  include Concurrent::Async
  def initialize(req)
    super()
    @request = req
  end            
  def perform
    puts @request                                                                                             
    1                       
  end                   
end;
x = Report.new({"query" => '123'});
ivar = x.async.perform # nil;
puts ivar.class # NilClass;
Pls, help.
Димыч
@deemytch
Ок. I've renamed my method Report::perform to Report::arbeiten
Charles Oliver Nutter
@headius
The issue here is that some thread pool in concurrent-ruby is not setting the threads to be daemon threads (on JRuby/JVM) which prevents the JVM from shutting down cleanly
Charles Oliver Nutter
@headius
I'm not sure what executor would be doing this but I'm happy to help investigate if someone can point me to the places where executors are set up
I'll open an issue
Charles Oliver Nutter
@headius
#817 has what I've been able to find
Chalupa Petr
@pitr-ch
Thanks @headius I'll have a look.
Chalupa Petr
@pitr-ch
@hsm3 my thinking was that people will create a delegating executor which will wrap the tasks. The delegating executor just needs to delegate methods ExecutorService. Then pass the executor Promises.future_on(DelegatingExecutor.new(:io, &wrapper), *args) { |*args| do_stuff }.then { |v| is_also_wrapped }
Charles Oliver Nutter
@headius
@pitr-ch thank you!