Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jun 22 20:08
    chrisseaton commented #605
  • Jun 22 16:48
    brauliobo commented #605
  • Jun 22 16:47
    brauliobo commented #605
  • May 04 21:20
    chrisseaton reopened #808
  • May 04 21:20
    chrisseaton commented #808
  • May 04 03:34
    blizzart commented #808
  • Apr 29 16:41
    cwli24 closed #957
  • Apr 29 16:41
    cwli24 commented #957
  • Apr 29 16:01
    cwli24 commented #957
  • Apr 29 15:45
    cwli24 edited #957
  • Apr 29 14:50
    cwli24 edited #957
  • Apr 29 14:48
    cwli24 opened #957
  • Apr 11 04:22
    mhenrixon commented #926
  • Apr 10 20:19
    chrisseaton commented #926
  • Apr 10 19:21
    mhenrixon commented #926
  • Apr 10 08:46
    eregon commented #926
  • Apr 09 13:11
    mhenrixon commented #926
  • Apr 06 15:05
    c960657 commented #956
  • Apr 06 15:00
    jdantonio commented #956
  • Apr 06 14:38
    c960657 commented #956
Shawn Anderson
@shawn42
It's blocking my use of concurrent-ruby in production now (which makes me sad, because concurrent-ruby is awesome!)
Aditya C S
@adityacs
Noticed a strange behaviour. Channel.select doesn't block on channels and when used within loop it consumes full cpu(single core). Created an issue here ruby-concurrency/concurrent-ruby#883.
PikachuEXE
@PikachuEXE

Hi guys!
I have read the doc about high level abstracts but I am still unable to find out what to use.
Here is my use case:

I have a sidekiq worker class that run for quite long sometimes.
Jobs and their states are persisted in database records.

class Worker
  include Sidekiq::Worker

  def perform
    prepare_job_record

    do_work_that_might_be_long

    mark_job_completed
  end
end

However since sometimes worker processes are restarted due to code release
The job record state is "stuck" on "started"
My idea is to raise error when sidekiq is detected to be "quiet" and rescue that error to update job status.
Can someone provide some advice/direction on the implementaion?

Tried Concurrent::TimerTask but my sample code would not raise error on main thread
timer_task = Concurrent::TimerTask.new(execution_interval: 1, run_now: false) do |task|
  task.execution_interval.to_i.times{ STDOUT.puts 'Boom! ' }
  STDOUT.print "\n"
  task.execution_interval += 1
  if task.execution_interval > 2
    STDOUT.puts 'Stopping...'
    raise RuntimeError.new("bye")
  end
end
timer_task.with_observer do |_time, _result, ex|
  STDOUT.puts "in observer"
  next unless ex
  STDOUT.puts ex
  if ex.is_a?(RuntimeError)
    STDOUT.puts "ex is Interrupt"
    timer_task.shutdown
    raise ex
  end
end.execute
PikachuEXE
@PikachuEXE
main_thread = ::Thread.current
timer_task = Concurrent::TimerTask.new(execution_interval: 1, run_now: false) do |task|
  $sidekiq_is_quiet.tap do |quiet|
    task.shutdown if quiet
  end
end
timer_task.with_observer do |_time, quiet|
  puts "with_observer >>"
  next unless quiet

  puts "raising"
  main_thread.raise "Sidekiq is quiet!"
end.execute
Just got this and it seems working
Slamet Kristanto (Kris)
@drselump14
ls
Ameisen
@ameisen
Hi there!
I'm wondering if it's possible to pass an additional lock/mutex to something such as Concurrent::Array in order to more easily perform atomic tasks on multiple collections at once
such as where I need to remove something from one and add it to another, atomically
Ameisen
@ameisen
Also, is there a proper means by which to promote a ReadWriteLock in read-lock state to write-lock (and demote)?
Alex Maslakov
@GildedHonour
Why is it not found?
irb(main):001:0> require 'concurrent'
=> true

irb(main):002:0> puts "Main thread: #{Thread.current}"
Main thread: #<Thread:0x00005581cd29b428 run>
=> nil

irb(main):004:1* Concurrent::Channel.go do
irb(main):005:1*   puts "Goroutine thread: #{Thread.current}"
irb(main):006:0> end

Traceback (most recent call last):
        4: from /home/alex/.rubies/ruby-3.0.0/bin/irb:23:in `<main>'
        3: from /home/alex/.rubies/ruby-3.0.0/bin/irb:23:in `load'
        2: from /home/alex/.rubies/ruby-3.0.0/lib/ruby/gems/3.0.0/gems/irb-1.3.0/exe/irb:11:in `<top (required)>'
        1: from (irb):3:in `<main>'
NameError (uninitialized constant Concurrent::Channel)
Ben Sheldon (he/him)
@bensheldon_twitter
If channels are still "edge" it needs to be required directly eg require "concurrent/channels"
Oops, that should be "concurrent/channel" (singular)
Andy Maleh
@AndyObtiva

I'd hate to bother you, but I recently used 'concurrent-ruby' inside Glimmer and blogged about an example of taking advantage of it over here, leveraging thread pools: https://andymaleh.blogspot.com/2021/02/glimmer-dsl-for-swt-mandelbrot-fractal.html

Java 8 and later versions offer something even superior to thread pools called parallel streams (which build on thread pools behind the scenes as a higher abstraction), enabling my original loop implementation before parallelization (not shown in link) to work by changing a single keyword only (in Java, you switch stream() to parallelStream()) instead of instantiating a thread pool, distributing work, and waiting for it to finish.

What is the closet equivalent to Java parallel streams in "concurrent-ruby"?

Thanks and Godspeed.

Ben Sheldon (he/him)
@bensheldon_twitter

@AndyObtiva that's cool! I think the closest equivalent is a Future: https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future

A future will implicitly run on a global thread pool, but it's also possible to pass it an explicit thread pool too.

Ben Dean
@b-dean

I have some code that is leaking forked processes and I think maybe concurrent-ruby can help. I'm looking for advice on the best way to do this.

so I have some crappy old database system that the only way to communicate with it is via a pipe, so I'm using popen3 to have an input stream to send commands to the interactive command prompt on the database system. so it's currently something like:

require 'open3'

class ForkedConnectionThingy
  def initialize(name)
    @name = name
  end

  def self.open(name, &b)
    conn = ForkedConnectionThingy.new(name)
    conn.establish_connection

    yield conn
  ensure
    conn.close
  end

  def send_command(command)
    @input.puts command
    @output.readline
  end

  def establish_connection
    @pid = fork do
      # some stuff to switch uid/gid maybe

      @input, @output, @error = Open3.popen3("dbconsole_whatsit #{name}
    end
  end

  def close
    @input.puts "quit"
    @input.close
    @output.close
    @error.close
    Process.kill('TERM', @pid)
  end
end

I want to make something that is multithreaded and has a pool of these things and maybe an at_exit hook to close them all when ruby exits. That way I don't make so many of them and I can make sure they all get closed.

Any thoughts on what sort of stuff to use for this? I was looking at Concurrent::Map indexed by the @name but not sure how to deal with knowing when to make a new one or not. Also just wondering if I'm thinking of this all wrong and there's some more elegant solution to this problem

Ben Sheldon [he/him]
@bensheldon
This sounds like a connection pool (https://github.com/mperham/connection_pool) to manage the forked out processes.
You could create a connection pool to manage the forks. AND then use a Future to multithread the work of checking out a fork, doing the work, and checking it back into the pool.
Ben Dean
@b-dean
that does look like it could work
Tom Lahti
@uidzip
Some years ago (4-5) it seemed like Actors in concurrent-ruby achieved parallelism even with MRI. It seems like today they do not, the GIL applies to all Actors spawned collectively. Do I remember incorrectly?
2 replies
Hardik Joshi
@mrhardikjoshi
Hello all,
I have a question related to Concurrent::Promises::Future.
I have multiple on_rejection! callbacks registered on my future object. By default these callbacks gets called by last in first out order (like stack).
But I need them to be call in first in first out order (like queue). I couldn't find anywhere in documentation or web, regarding ordering of callback or how to change them.
Is there anyway I can achieve this?
Thank you
Ben Sheldon [he/him]
@bensheldon
Sorry, not a solution, but a suggestion: You can walk through the code about how observers are added. Here is where the observers attribute is initialized: https://github.com/ruby-concurrency/concurrent-ruby/blob/50bc1eec4856e2495b6883ce84dd528e4b98bcbc/lib/concurrent-ruby/concurrent/ivar.rb#L157
Chris Seaton
@chrisseaton
Hi I'm the new maintainer of CR. I'll try to hang around here every now and again, but difficult to keep yet-another-chat-app open full time. I'll be responsive to issues on GH.
Ben Sheldon [he/him]
@bensheldon
@chrisseaton congrats and happy to have you here when you can be. It's pretty quiet most of the time.
Kris Leech
@krisleech
hey, I trying to figure out how I might have a thread-local, instance-local var, I posted on Stackoverflow earlier, but after looking at Concurrent, this might be the place to ask.
https://stackoverflow.com/questions/71926522/thread-local-instance-local-variable-in-ruby
4 replies
Ben Sheldon [he/him]
@bensheldon
If you're in rails, I think you can simply use thread_cattr_accessor in your class, which creates a thread-local class variable, that you'd assign in a before-action, and gets auto cleaned up by rails after the request completes.
5 replies