Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.
Hi all. I have a problem when converting a js function to ruby, using concurrent-ruby gem.
JS code:
async function test_js(redis) {
const arrPromises = [];
arrPromises.push(
redis.delAsync("key1")
);
arrPromises.push(
redis.delAsync("key2")
);
await Promise.all(arrPromisses);
logger.info('remove success');
}
Ruby code:
promises = []
promises << Concurrent::Promise.new do
redis.del("key1")
end
promises << Concurrent::Promise.new do
redis.del("key2")
end
promises.each { |j| j.execute }
Concurrent::Promise.all?(*promises).
then{|result| puts "remove success" }.
rescue { |reason| puts reason }.execute
I'm quite confused cause Promise.all(arrPromisses)
will execute all promisses in this array at the same time, while promises.each { |j| j.execute }
delay a litle bit cause they're in loop.
Is there any better way to convert this js function to ruby code?
Thank you (bow)
Concurrent::Promises.zip(*tasks).value!.flatten.compact.uniq
def tasks
files.map do |original_file|
Concurrent::Promises.future(original_file) { |file| process_file(file) }.rescue do |exception|
Rails.logger.error(exception)
Bugsnag.notify(exception)
[]
end
end
end
def process_file(file)
ProcessFile.perform(file: file, dir: dir, declaration_uuid: declaration_uuid)
end
Promises
api to provide better support for the exact thing you are trying to achieve.
Channel.select
doesn't block on channels and when used within loop
it consumes full cpu(single core). Created an issue here ruby-concurrency/concurrent-ruby#883.
Hi guys!
I have read the doc about high level abstracts but I am still unable to find out what to use.
Here is my use case:
I have a sidekiq worker class that run for quite long sometimes.
Jobs and their states are persisted in database records.
class Worker
include Sidekiq::Worker
def perform
prepare_job_record
do_work_that_might_be_long
mark_job_completed
end
end
However since sometimes worker processes are restarted due to code release
The job record state is "stuck" on "started"
My idea is to raise error when sidekiq is detected to be "quiet" and rescue that error to update job status.
Can someone provide some advice/direction on the implementaion?
Concurrent::TimerTask
but my sample code would not raise error on main threadtimer_task = Concurrent::TimerTask.new(execution_interval: 1, run_now: false) do |task|
task.execution_interval.to_i.times{ STDOUT.puts 'Boom! ' }
STDOUT.print "\n"
task.execution_interval += 1
if task.execution_interval > 2
STDOUT.puts 'Stopping...'
raise RuntimeError.new("bye")
end
end
timer_task.with_observer do |_time, _result, ex|
STDOUT.puts "in observer"
next unless ex
STDOUT.puts ex
if ex.is_a?(RuntimeError)
STDOUT.puts "ex is Interrupt"
timer_task.shutdown
raise ex
end
end.execute
main_thread = ::Thread.current
timer_task = Concurrent::TimerTask.new(execution_interval: 1, run_now: false) do |task|
$sidekiq_is_quiet.tap do |quiet|
task.shutdown if quiet
end
end
timer_task.with_observer do |_time, quiet|
puts "with_observer >>"
next unless quiet
puts "raising"
main_thread.raise "Sidekiq is quiet!"
end.execute
Just got this and it seems working
Concurrent::Array
in order to more easily perform atomic tasks on multiple collections at once
irb(main):001:0> require 'concurrent'
=> true
irb(main):002:0> puts "Main thread: #{Thread.current}"
Main thread: #<Thread:0x00005581cd29b428 run>
=> nil
irb(main):004:1* Concurrent::Channel.go do
irb(main):005:1* puts "Goroutine thread: #{Thread.current}"
irb(main):006:0> end
Traceback (most recent call last):
4: from /home/alex/.rubies/ruby-3.0.0/bin/irb:23:in `<main>'
3: from /home/alex/.rubies/ruby-3.0.0/bin/irb:23:in `load'
2: from /home/alex/.rubies/ruby-3.0.0/lib/ruby/gems/3.0.0/gems/irb-1.3.0/exe/irb:11:in `<top (required)>'
1: from (irb):3:in `<main>'
NameError (uninitialized constant Concurrent::Channel)
I'd hate to bother you, but I recently used 'concurrent-ruby' inside Glimmer and blogged about an example of taking advantage of it over here, leveraging thread pools: https://andymaleh.blogspot.com/2021/02/glimmer-dsl-for-swt-mandelbrot-fractal.html
Java 8 and later versions offer something even superior to thread pools called parallel streams (which build on thread pools behind the scenes as a higher abstraction), enabling my original loop implementation before parallelization (not shown in link) to work by changing a single keyword only (in Java, you switch stream()
to parallelStream()
) instead of instantiating a thread pool, distributing work, and waiting for it to finish.
What is the closet equivalent to Java parallel streams in "concurrent-ruby"?
Thanks and Godspeed.
@AndyObtiva that's cool! I think the closest equivalent is a Future: https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future
A future will implicitly run on a global thread pool, but it's also possible to pass it an explicit thread pool too.
I have some code that is leaking forked processes and I think maybe concurrent-ruby can help. I'm looking for advice on the best way to do this.
so I have some crappy old database system that the only way to communicate with it is via a pipe, so I'm using popen3 to have an input stream to send commands to the interactive command prompt on the database system. so it's currently something like:
require 'open3'
class ForkedConnectionThingy
def initialize(name)
@name = name
end
def self.open(name, &b)
conn = ForkedConnectionThingy.new(name)
conn.establish_connection
yield conn
ensure
conn.close
end
def send_command(command)
@input.puts command
@output.readline
end
def establish_connection
@pid = fork do
# some stuff to switch uid/gid maybe
@input, @output, @error = Open3.popen3("dbconsole_whatsit #{name}
end
end
def close
@input.puts "quit"
@input.close
@output.close
@error.close
Process.kill('TERM', @pid)
end
end
I want to make something that is multithreaded and has a pool of these things and maybe an at_exit
hook to close them all when ruby exits. That way I don't make so many of them and I can make sure they all get closed.
Any thoughts on what sort of stuff to use for this? I was looking at Concurrent::Map
indexed by the @name
but not sure how to deal with knowing when to make a new one or not. Also just wondering if I'm thinking of this all wrong and there's some more elegant solution to this problem
Concurrent::Promises::Future
.on_rejection!
callbacks registered on my future object. By default these callbacks gets called by last in first out order (like stack).