Python: объект thread._local не имеет атрибута todo

В настоящее время я программирую сервер дейтаграмм на основе python, используя потоки и все такое.

Я столкнулся со следующей проблемой: я использую несколько потоков распределения для распределения входящих пакетов между различными потоками обработки. Внутри потоков обработки я использую threading.local() для отслеживания локальных переменных потока.

В настоящее время я тестирую, как мой сервер реагирует на высокую нагрузку (2000 пакетов за ~ 2 секунды), и столкнулся со странным поведением объекта local().

Кажется, какое-то время он работает нормально, а затем в какой-то момент выдает исключение:

Exception in thread 192.168.1.102: # <-- This is the Processing Thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

# The following three lines are debug
# This is the Allocation thread that has called the Processing thread
<Thread(192.168.1.102, started 1106023568)> 
# This confirms that the queue it tries to access exists
<Queue.Queue instance at 0x40662b48>
# This is the Processing thread, which has stopped executing on exception
<Thread(192.168.1.102, stopped 1106023568)> 

Exception in thread pyAlloc-0: # <-- This is the Allocation thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

Часть потока обработки

# Imports
import threading
import time
import Queue

class Thread(threading.Thread):
    def __init__(self,pThread):
        threading.Thread.__init__(self)
        self.loc = threading.local()
        self.loc.todo = Queue.Queue()
        self.loc.father = pThread
        print "Konstruktor ausgefuehrt"
        print self.loc.todo

    def run(self):
        self.loc.run = True;
        print self.loc.father
        while (self.loc.run):
        try:
            task = self.loc.todo.get(True, 1)
            print "processing..."
            if(task == self):
                self.loc.run=False
            else:
                print task
        except Queue.Empty:
            pass
        self.loc.father.threadTerminated(self)
        print self.name, "terminating..."

    def addTask(self, pTask):
        print self
        print self.loc.todo
        self.loc.todo.put(pTask)

И поток распределения:

import threading
import pyProcThread # My processing Thread
import Queue
import time
class Thread(threading.Thread):
    # Lock-Objects
    threadListLock = threading.Lock()
    waitListLock = threading.Lock()

    alive = True;

    taskQueue = Queue.Queue()
    # Lists
    # List of all running threads
    threads = []

    def threadExists(self,pIP):
        """Checks if there is already a thread with the given Name"""
        for x in self.threads:
            if x.name == pIP:
                return x
        return None

    def threadTerminated(self,pThread):
        """Called when a Processing Thread terminates"""
        with self.threadListLock:
            self.threads.remove(pThread)
            print "Thread removed"

    def threadRegistered(self,pThread):
        """Registers a new Thread"""
        self.threads.append(pThread)

    def killThread(self):
        self.alive = False

    def run(self):
        while(self.alive):
            # print "Verarbeite Nachricht ", self.Message
            # print "Von ", self.IP
            try:
                data, addtemp = self.taskQueue.get(True, 1)
                addr, _junk = addtemp
                with self.threadListLock:
                    foundThread=self.threadExists(str(addr))
                    # print "Thread " + self.name + " verarbeitet " + data
                    if (foundThread!=None):
                        #print "recycling thread"
                        foundThread.addTask(str(data))
                    else:
                        print "running new Thread"
                        t = pyProcThread.Thread(self)
                        t.name = str(addr)
                        t.addTask(str(data))
                        t.start()
                        self.threadRegistered(t)
                self.taskQueue.task_done()
            except Queue.Empty:
                pass
        print self.name, "terminating..."
        with self.threadListLock:
            for thread in self.threads:
                thread.addTask(thread)

Полный вывод, включая все отладочные отпечатки:

running new Thread
Konstruktor ausgefuehrt
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, initial)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
Exception in thread 192.168.1.102:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, stopped 1106023568)>
Exception in thread pyAlloc-0:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

Terminating main
192.168.1.102
DEINEMUDDA  sent to  192.168.1.102 : 8082
pyAlloc-1 terminating...
<Thread(192.168.1.102, stopped 1106023568)>
<Queue.Queue instance at 0x40662b48>

Как вы можете видеть в логе, какое-то время все работает нормально, хотя никогда не достигает print "processing в основной функции потока обработки.

Я искал в Интернете и StackOverflow похожие проблемы, но не смог их найти. Заранее спасибо за любую помощь и извините за мой стиль кодирования, я программирую Python всего несколько дней и все еще изучаю некоторые его функции.

РЕДАКТИРОВАТЬ: Конечно, на этом сервере есть нечто большее, чем это. Существует основной поток, который получает пакеты и отправляет их в поток распределения, а также куча других вещей в фоновом режиме. Кроме того, сервер далек от завершения, но я хочу, чтобы получение работало, прежде чем я начну другие вещи.


person malexmave    schedule 25.02.2012    source источник


Ответы (1)


threading.local — это класс, представляющий локальные данные потока. Локальные данные потока — это данные, значения которых зависят от потока и поэтому доступны только для локального потока (потока, создавшего объект local()). Другими словами, только поток, установивший значение, видит это значение.

Поскольку вы устанавливаете значение loc.parent в методе __init__ вашего потока обработки, но выполняется внутри потока распределителя, локальные параметры потоков обработки будут доступны только в потоке распределителя. Таким образом, он не будет работать в run() потоке обработки, так как он будет выполняться другим потоком (не распределителем).

person Mariusz Jamro    schedule 25.02.2012
comment
Это интересно, спасибо. Если я уберу print self.loc.parent, у него тоже будет проблема с self.loc.todo. Я предполагаю, что это имеет ту же причину. Как я могу заставить это работать (получить атрибуты parent и todo в loc, которые оба доступны из потока обработки)? И есть ли справочная страница python, которая подробно объясняет всю эту механику? Спасибо еще раз. - person malexmave; 25.02.2012
comment
Вы можете установить значения как обычные атрибуты объекта Thread, поэтому self.father = pThread. Я считаю, что отец не выполняет конкретный поток, а конкретный экземпляр потока. - person Mariusz Jamro; 25.02.2012
comment
Что ж, на самом деле это было мое рабочее решение, прежде чем я попытался сделать это правильно с локальными объектами. Но поскольку мне кажется, что self.attribute кажется правильным способом, я могу продолжать его использовать (мне это показалось грязным обходным путем, так как я не мог заставить локальные объекты работать с первой попытки) . Спасибо еще раз. - person malexmave; 25.02.2012