Как предотвратить дедлок с использованием Async?

Я написал простую демонстрацию производителя-потребителя с использованием Thread (ов) и Queue, и он работает.
Затем я переписал ее с помощью Async, однако он не работает из-за deadlock. Пытался использовать Async::Reactor вместо цикла while, и это тоже не помогло.

I, [2021-03-27T14:36:31.695540 #30532]  INFO -- : Consumer
Traceback (most recent call last):
	2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
	1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
   rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
   /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'


код с Thread - рабочий
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    # consumer
    t1 = Thread.new do
        log.info('Consumer')
        while (has_jobs(job = q.deq))
            log.info("consume #{job}")
        end
        log.info('Consumer exited')
    end

    # producer
    t2 = Thread.new do
        log.info('Producer')
        (1..times).each do |i|
            log.info("produce #{i}")
            q.enq(i)
            sleep(delay)
        end
        q.enq(nil)
        q.close
        log.info('Producer exited')
    end

    # Ensure we wait for all tasks to complete before continuing:
    [t1, t2].each(&:join)
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')

код с Async - нерабочий
require 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    Async do
        barrier = Async::Barrier.new
        semaphore = Async::Semaphore.new(2, parent: barrier)

        # consumer
        semaphore.async do
            Async do |task|
                log.info('Consumer')
                while (has_jobs(job = q.deq))
                    log.info("consume #{job}")
                end
                # Async::Reactor.run do
                #     job = q.deq
                #     if has_jobs(job)
                #         log.info("consume #{job}")
                #     end
                # end
                log.info('Consumer exited')
            end
        end

        # producer
        semaphore.async do
            Async do |task|
                log.info('Producer')
                (1..times).each do |i|
                    log.info("produce #{i}")
                    q.enq(i)
                    task.sleep(delay)
                end
                q.enq(nil)
                q.close
                log.info('Producer exited')
            end
        end
    
        # Ensure we wait for all tasks to complete before continuing:
        barrier.wait
    end
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')

Как исправить, не подскажете?

Добавлено
Похоже, проблема не воспроизводится для версии Ruby 3.0
  • Вопрос задан
  • 76 просмотров
Решения вопроса 1
2ord
@2ord Автор вопроса
Процитирую автора gem async:
Ruby 3.0 exposes hooks for blocking operations and Async can handle them.

If you want to write something which is based on async, use Async::Queue. Otherwise, if you want to use thread primtives, use them. But if you mix it, and expect concurrency, you better use Ruby 3.

In other words, if you use thread primitives with async on Ruby 2, you will block the reactor.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы