Я написал простую демонстрацию производителя-потребителя с использованием Thread (ов) и Queue, и он работает.
Затем я переписал ее с помощью
Async, однако он не работает из-за deadlock. Пытался использовать Async::Reactor вместо цикла while, и это тоже не помогло.
I, [2021-03-27T14:36:31.695540 #30532] INFO -- : Consumer
Traceback (most recent call last):
2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
код с Thread - рабочийrequire 'logger'
def has_jobs(item)
!item.nil?
end
def run(log)
delay = 0.2
times = 3
q = Queue.new
# consumer
t1 = Thread.new do
log.info('Consumer')
while (has_jobs(job = q.deq))
log.info("consume #{job}")
end
log.info('Consumer exited')
end
# producer
t2 = Thread.new do
log.info('Producer')
(1..times).each do |i|
log.info("produce #{i}")
q.enq(i)
sleep(delay)
end
q.enq(nil)
q.close
log.info('Producer exited')
end
# Ensure we wait for all tasks to complete before continuing:
[t1, t2].each(&:join)
end
log = Logger.new(STDOUT)
t = Time.now
run(log)
puts(Time.now-t)
log.info('Done')
код с Async - нерабочийrequire 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'
def has_jobs(item)
!item.nil?
end
def run(log)
delay = 0.2
times = 3
q = Queue.new
Async do
barrier = Async::Barrier.new
semaphore = Async::Semaphore.new(2, parent: barrier)
# consumer
semaphore.async do
Async do |task|
log.info('Consumer')
while (has_jobs(job = q.deq))
log.info("consume #{job}")
end
# Async::Reactor.run do
# job = q.deq
# if has_jobs(job)
# log.info("consume #{job}")
# end
# end
log.info('Consumer exited')
end
end
# producer
semaphore.async do
Async do |task|
log.info('Producer')
(1..times).each do |i|
log.info("produce #{i}")
q.enq(i)
task.sleep(delay)
end
q.enq(nil)
q.close
log.info('Producer exited')
end
end
# Ensure we wait for all tasks to complete before continuing:
barrier.wait
end
end
log = Logger.new(STDOUT)
t = Time.now
run(log)
puts(Time.now-t)
log.info('Done')
Как исправить, не подскажете?
Добавлено
Похоже, проблема не воспроизводится для версии Ruby 3.0