Я пишу модульные тесты для задачи сельдерея, используя django-nose. Это довольно типично; пустая тестовая база данных (REUSE_DB=0), которая предварительно заполняется через фикстуру во время тестирования.
Моя проблема заключается в том, что, хотя TestCase загружает фикстуру, и я могу получить доступ к объектам из тестового метода, тот же запрос завершается ошибкой при выполнении в рамках асинхронной задачи celery.
Я проверил, что settings.DATABASES["default"]["name"] совпадают как в тестовом методе, так и в тестируемой задаче. Я также подтвердил, что тестируемая задача ведет себя правильно, когда вызывается как обычный вызов метода.
И это о том, где я из идей.
Вот пример:
class MyTest(TestCase):
fixtures = ['test_data.json']
def setUp(self):
settings.CELERY_ALWAYS_EAGER = True # seems to be required; if not I get socket errors for Rabbit
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # exposes errors in the code under test.
def test_city(self):
self.assertIsNotNone(City.objects.get(name='brisbane'))
myTask.delay(city_name='brisbane').get()
# The following works fine: myTask('brisbane')
from celery.task import task
@task()
def myTask(city_name):
c = City.objects.count() # gives 0
my_city = City.objects.get(name=city_name) # raises DoesNotExist exception
return