Невозможно отменить цепочку Twisted Deferred (AlreadyCalledError)

Я пытаюсь отменить цепочку Deferred (основную) всякий раз, когда какая-либо отложенная цепочка (дочерняя) вызывает ошибку. Но получаю AlreadyCalledError и цепочка продолжает свою работу.

Вот код:

from twisted.internet import defer

def asyncText(s, okCallback, errCallback):
  deferred = defer.Deferred()
  deferred.addCallbacks(okCallback, errCallback)
  if s.find('STOP')>=0:
    deferred.errback(ValueError('You are trying to print the unprintable :('))
  else:
    deferred.callback(s)
  return deferred

def asyncChain():
  texts = ['Hello StackOverflow,', 
           'this is an example of Twisted.chainDeferred()',
           'which is raising an AlreadyCalledError',
           'when I try to cancel() it.',
           'STOP => I will raise error and should stop the main deferred',
           'Best regards'
           ]

  mainDeferred= defer.Deferred()

  for text in texts: 
    def onSuccess(res):
      print('>> {}'.format(res))

    def onError(err):
      print('Error!! {}'.format(err))
      mainDeferred.cancel()

    d = asyncText(text, onSuccess, onError)
    mainDeferred.chainDeferred(d)

И вот результат:

>>> asyncChain()
- Hello StackOverflow,
- this is an example of Twisted.chainDeferred()
- which is raising an AlreadyCalledError
- when I try to cancel() it.
Error!! [Failure instance: Traceback (failure with no frames): <class 'ValueError'>: You are trying to print the unprintable :(
]
- Best regards
Unhandled error in Deferred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../test.py", line 1, in asyncChain
    mainDeferred.chainDeferred(d)
  File ".../.venvs/p3/lib/python3.6/site-packages/twisted/internet/defer.py", line 435, in chainDeferred
    return self.addCallbacks(d.callback, d.errback)
  File ".../.venvs/p3/lib/python3.6/site-packages/twisted/internet/defer.py", line 311, in addCallbacks
    self._runCallbacks()
--- <exception caught here> ---
  File ".../.venvs/p3/lib/python3.6/site-packages/twisted/internet/defer.py", line 654, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File ".../.venvs/p3/lib/python3.6/site-packages/twisted/internet/defer.py", line 501, in errback
    self._startRunCallbacks(fail)
  File ".../.venvs/p3/lib/python3.6/site-packages/twisted/internet/defer.py", line 561, in _startRunCallbacks
    raise AlreadyCalledError
twisted.internet.defer.AlreadyCalledError: 

Я также пытался использовать canceller, например:

def asyncOnCancel(d):
  print('------ cancel() called ------')
  d.errback(ValueError('chain seems to be cancelled!'))
def asyncChainOnError(err):
  print('------ ERROR ON Chain {} ------'.format(err))

...
  mainDeferred= defer.Deferred(canceller= asyncOnCancel)
  mainDeferred.addErrback(asyncChainOnError)
...

Но результат тот же.

Я также пробовал откладывать дочерние вызовы .callback(s) или вызывать их после .chainDeferred(). Но я всегда получаю одно и то же поведение.

  • Действительно ли возможно отменить цепочку Deferred (и добиться отмены связанных дочерних отсрочек)?
  • Почему я получаю это AlreadyCalledError?

Я использую Python 3.6.6 и Twisted 18.9.0.

Спасибо!

******* РЕДАКТИРОВАТЬ *******

После ответа Jean-Paul и заметив, что .chainDeferred() это не то, что мне нужно, я яснее изложу здесь, что Я хочу (и как я наконец это сделал).

То, что я хочу, было довольно простым: запустить несколько отложенных "синхронно" (они должны дождаться завершения предыдущего), хотя им не нужно делиться своими результатами. Если один терпит неудачу, остальные не выполняются.

Оказывается, это довольно легко сделать с помощью @defer.inlineCallbacks и yield. Вот пример:

def asyncText(s):
  deferred = defer.Deferred()
  if s.find('STOP') >= 0:
    deferred.callback(True)
  else:
    deferred.callback(False)
  return deferred

@defer.inlineCallbacks
def asyncChain():
  texts = ['Hello StackOverflow,',
           'this is a simpler way to explain the question.',
           'I need no chainDeferred(). I need no .cancel().',
           'I just need inlineCallbacks decorator.',
           'STOP',
           'Yeah I will not be printed'
           ]
  for text in texts:
    stopHere = yield asyncText(text)
    if stopHere:
      break
    print('- {}'.format(text))

deferred= asyncChain()
>>> asyncChain()
- Hello StackOverflow,
- this is a simpler way to explain the question.
- I need no chainDeferred(). I need no .cancel().
- I just need inlineCallbacks decorator.

person Afialapis    schedule 31.05.2019    source источник


Ответы (1)


Непонятно, что вы на самом деле пытаетесь сделать, но я думаю, что ваша цепочка отличается от того, что вы ожидаете:

    d = asyncText(text, onSuccess, onError)
    mainDeferred.chainDeferred(d)

d уже сработал, когда asyncText возвращается. Но mainDeferred.chainDeferred(d) означает, что "когда mainDeferred срабатывает, передать результат в d". Поскольку d уже сработало, это неверно. Deferred может сработать только один раз.

Возможно, вы имели в виду d.chainDeferred(mainDeferred). Затем «когда срабатывает d, передать его результат в mainDeferred».

Тем не менее, проблема все еще существует, так как если вы связываете d с mainDeferred, то нет смысла отменять mainDeferred в обратном вызове на d. Результат будет распространяться от d к mainDeferred, потому что они связаны. Отмена не нужна и не полезна.

person Jean-Paul Calderone    schedule 31.05.2019
comment
Понял, теперь вижу chainDeferred не то, что мне нужно. Я хочу запустить несколько Deferred синхронно (они должны дождаться завершения предыдущего), хотя им не нужно делиться своими результатами. Я видел здесь, что, возможно, мне нужно просто от addCallback() до mainDeferred , но я еще не заставил его работать. - person Afialapis; 31.05.2019
comment
вероятно f1().addCallback(lambda ignored: f2()) ... но, возможно, примите этот ответ и откройте новый вопрос. - person Jean-Paul Calderone; 31.05.2019