Странное поведение clojure ref

У меня есть 100 рабочих (агентов), которые совместно используют один ref, содержащий набор задач. Пока в этой коллекции есть задачи, каждый воркер получает одну задачу из этой коллекции (в блоке dosync), распечатывает ее и иногда кладет обратно в коллекцию (в блоке dosync):

(defn have-tasks?
  [tasks]
  (not (empty? @tasks)))

(defn get-task
  [tasks]
  (dosync
    (let [task (first @tasks)]
      (alter tasks rest)
      task)))

(defn put-task
  [tasks task]
  (dosync (alter tasks conj task))
  nil)

(defn worker
  [& {:keys [tasks]}]
  (agent {:tasks tasks}))

(defn worker-loop
  [{:keys [tasks] :as state}]
  (while (have-tasks? tasks)
    (let [task (get-task tasks)]
      (println "Task: " task)
      (when (< (rand) 0.1)
        (put-task tasks task))))
  state)

(defn create-workers
  [count & options]
  (->> (range 0 count)
       (map (fn [_] (apply worker options)))
       (into [])))

(defn start-workers
  [workers]
  (doseq [worker workers] (send-off worker worker-loop)))

(def tasks (ref (range 1 10000000)))

(def workers (create-workers 100 :tasks tasks))

(start-workers workers)
(apply await workers)

Когда я запускаю этот код, последнее значение, напечатанное агентами (после нескольких попыток): 435445, 4556294, 1322061, 3950017. Но никогда не 9999999 то, что я ожидаю. И каждый раз коллекция действительно пуста в конце. Что я делаю неправильно?

Изменить:

Я переписал рабочий цикл как можно проще:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop []
    (when-let [task (get-task tasks)]
      (println "Task: " task)
      (recur)))
  state)

Но проблема все еще существует. Этот код ведет себя так, как ожидалось, при создании одного и только одного рабочего.


person Sergei Koledov    schedule 20.08.2016    source источник
comment
Является ли println потокобезопасным?   -  person Shannon Severance    schedule 23.08.2016
comment
@ShannonSeverance Нет. Его нужно использовать, например, как (locking :out (println "...")), чтобы получить разборчивый вывод.   -  person Terje D.    schedule 23.08.2016


Ответы (3)


Проблема здесь не в агентах и ​​почти не в лени. Вот несколько сокращенная версия исходного кода, которая все еще демонстрирует проблему:

(defn f [init]
  (let [state (ref init)
        task (fn []
               (loop [last-n nil]
                 (if-let [n (dosync
                              (let [n (first @state)]
                                (alter state rest)
                                n))]
                   (recur n)
                   (locking :out
                     (println "Last seen:" last-n)))))
        workers (->> (range 0 5)
                     (mapv (fn [_] (Thread. task))))]
    (doseq [w workers] (.start w))
    (doseq [w workers] (.join w))))

(defn r []
  (f (range 1 100000)))

(defn i [] (f (->> (iterate inc 1)
                   (take 100000))))

(defn t []
  (f (->> (range 1 100000)
          (take Integer/MAX_VALUE))))

Запуск этого кода показывает, что и i, и t, обе ленивые, надежно работают, тогда как r надежно не работает. На самом деле проблема заключается в ошибке параллелизма в классе, возвращаемом вызовом range. Действительно, эта ошибка описана в этом билете Clojure и исправлена ​​начиная с Clojure. версия 1.9.0-alpha11.

Краткое описание бага на случай, если тикет по какой-то причине недоступен: во внутренностях rest вызова результата range была небольшая возможность для условия гонки: "флаг", в котором указано "следующее значение уже вычислено" было устанавливается перед самим фактическим значением, что означает, что второй поток может видеть этот флаг как истинный, даже если "следующее значение" по-прежнему равно nil. Затем вызов alter зафиксирует это значение nil в ссылке. Это было исправлено путем перестановки двух строк назначения.

В тех случаях, когда результат range либо принудительно реализовывался в одном потоке, либо заключался в другую ленивую последовательность, эта ошибка не появлялась.

person Gary Verhaegen    schedule 28.08.2017

Я задал этот вопрос на Clojure Google Group, и это помогло мне найти ответ.

Проблема в том, что я использовал ленивую последовательность внутри транзакции STM.

Когда я заменил этот код:

(def tasks (ref (range 1 10000000)))

этим:

(def tasks (ref (into [] (range 1 10000000))))

это сработало, как и ожидалось!

В моем производственном коде, где возникла проблема, я использовал платформу Korma, которая также возвращает ленивую коллекцию кортежей, как в моем примере.

Вывод. Избегайте использования ленивых структур данных в транзакции STM.

person Sergei Koledov    schedule 25.08.2016
comment
Поскольку вы используете только одну ссылку для своего состояния: вы пробовали использовать атом вместо ссылки? Кажется, это дает тот же результат и значительное (ca 10 x) сокращение времени выполнения. - person Terje D.; 26.08.2016
comment
Да, я думал об этом, но я не знаю, как написать согласованную атомарную функцию get-task в этом случае. Какую функцию я должен передать в своп! функция? - person Sergei Koledov; 26.08.2016
comment
Либо (defn get-taks [tasks] (let [my-tasks @tasks] (if (compare-and-set! tasks my-tasks (rest my-tasks)) (first my-tasks) (recur tasks)))), либо запускать задачи с фиктивным 0 и делать (defn get-tasks [tasks] (first (swap! tasks rest))) - person Terje D.; 26.08.2016
comment
Да, оба ваших варианта работают стабильно и намного быстрее! Спасибо. Однако проблема с ленивыми последовательностями все еще остается. Будь осторожен. - person Sergei Koledov; 26.08.2016

Когда достигается последнее число в диапазоне, у рабочих остаются еще более старые числа. Некоторые из них будут возвращены в очередь для повторной обработки.

Чтобы лучше видеть, что происходит, вы можете изменить worker-loop, чтобы вывести последнюю задачу, обработанную каждым воркером:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop [last-task nil]
    (if (have-tasks? tasks)
      (let [task (get-task tasks)]
        ;; (when (< (rand) 0.1)
        ;;   (put-task tasks task)
        (recur task))
      (when last-task
        (println "Last task:" last-task))))
  state)

Это также показывает состояние гонки в коде, когда задачи, видимые have-tasks?, часто берутся другими, когда get-task вызывается ближе к концу обработки задач.

Состояние гонки можно решить, удалив have-tasks? и вместо этого используя возвращаемое значение nil из get-task в качестве сигнала о том, что больше нет доступных задач (на данный момент).

Обновлено:

Как видно, эти условия гонки не объясняют проблему.

Также проблема не решается удалением возможного состояния гонки в get-task следующим образом:

(defn get-task [tasks]
  (dosync
   (first (alter tasks rest))))

Однако изменение get-task для использования явной блокировки, похоже, решает проблему:

 (defn get-task [tasks]  
   (locking :lock
     (dosync
       (let [task (first @tasks)]
         (alter tasks rest)
         task))))
person Terje D.    schedule 20.08.2016
comment
Я не думаю, что это причина. Я могу закомментировать выражение (when (< (rand)... и не возвращать никакие задачи обратно в очередь, и все равно обрабатывается только часть. Кроме того, в среднем он возвращает только 10% всех задач, а последние номера задач, которые нужно напечатать перед остановкой, иногда не составляют даже половины всей очереди, поэтому теория на самом деле не имеет смысла. Я посмотрел на это сегодня и надеюсь, что смогу найти ответ, или что кто-то может. Это очень хороший вопрос. - person Josh; 21.08.2016
comment
Да, вы правы насчет состояния гонки в моем коде, спасибо. Я переписал свой код как можно проще, но проблема осталась. - person Sergei Koledov; 22.08.2016