простая потоковая реализация и быстрая реализация пула памяти?

После переосмысления дизайна и некоторого вклада от Пэдди я придумал что-то вроде этого, но мне интересно, правильно ли это, когда я запускаю его, кажется, что все в порядке... Идея состоит в том, что предварительно распределенные объекты наследуются от следующего:

struct Node
{
    void* pool;
};

Таким образом, мы вводим в каждый выделенный объект указатель на его пул для последующего его освобождения. Тогда у нас есть:

template<class T, int thesize>
struct MemPool
{
    T* getNext();
    void free(T* ptr);

    struct ThreadLocalMemPool
    {
        T* getNextTL();
        void freeTL();

        int size;
        vector<T*> buffer;
        vector<int> freeList;
        int freeListIdx;
        int bufferIdx;
        ThreadLocalMemPool* nextTlPool; //within a thread's context a linked list
    };

    int size;
    threadlocal ThreadLocalMemPool* tlPool; //one of these per thread
};

Итак, в основном я говорю MemPool<Cat, 100>, и это дает мне мемпул, который для каждого потока, который его getNexts, будет создавать экземпляр локального мемпула потока. Размеры я округляю до ближайшей степени двойки для удобства по модулю (что для простоты я опускаю). Поскольку getNext() является локальным для каждого потока, он не требует блокировки, и я пытаюсь использовать атомарные методы для освобождающей части следующим образом:

T* ThreadLocalMemPool::getNextTL()
{
    int iHead = ++bufferIdx % size;
    int iTail = freeListIdx % size;

    if (iHead != iTail)  // If head reaches tail, the free list is empty.
    {
        int & idx = freeList[iHead];
        while (idx == DIRTY) {}
        return buffer[idx];
    }
    else
    {
        bufferIdx--; //we will recheck next time
        if (nextTLPool)
            return nextTLPool->getNextTL();
        else
            //set nextTLPool to a new ThreadLocalMemPool and return getNextTL() from it..
    }
}

void ThreadLocalMemPool::free(T* ptr)
{
    //the outer struct handles calling this in the right ThreadLocalMemPool

    //we compute the index in the pool from which this pool came from by subtracting from
    //its address the address of the first pointer in this guys buffer
    int idx = computeAsInComment(ptr);

    int oldListIdx = atomic_increment_returns_old_value(freeListIdx);
    freeList[oldListIdx % size] = idx;
}

Теперь идея состоит в том, что freeListIdx всегда будет следовать за bufferIdx в пуле, потому что вы не можете (я предполагаю правильное использование) освободить больше, чем вы выделили. Вызовы free синхронизируют порядок, в котором они возвращают буферные индексы в свободный список, и getNext уловит это при циклическом возврате. Я немного подумал об этом и не вижу ничего семантически неправильного в логике, кажется ли это правильным или есть что-то тонкое, что могло бы ее сломать?


person Palace Chan    schedule 17.09.2012    source источник
comment
простой, потокобезопасный и быстрый. Разве это не было бы здорово?   -  person Ed S.    schedule 18.09.2012
comment
Простой, потокобезопасный и быстрый — выберите любые два.   -  person Pete Becker    schedule 18.09.2012
comment
В целом преимущество использования локального потока заключается в том, что вы можете использовать его в одном потоке без блокировки. Если вы собираетесь перемещать память из одного потока в другой, преимущества почти нет.   -  person David Rodríguez - dribeas    schedule 18.09.2012


Ответы (1)


Потокобезопасная проблема требует блокировки. Если вы хотите ослабить это, вам нужно ограничение, что только один поток когда-либо использует пул. Вы можете расширить это до двух потоков, если будете использовать круговой список свободных ресурсов, который я опишу ниже, с условием, что один поток отвечает за выделение, а другой — за освобождение.

Что касается использования вектора без какого-либо другого управления, это плохая идея... Как только вы начинаете фрагментироваться, ваши распределения получают удар.

Хороший способ реализовать это — просто выделить большой блок T. Затем создать циклическую очередь, достаточно большую, чтобы она указывала на каждый из этих блоков. Это ваш «бесплатный список». Вы можете просто использовать индексы. Если вы ограничиваете каждый пул до 65536 элементов, вы можете выбрать неподписанный шорт, чтобы сэкономить место (на самом деле это 65535, чтобы обеспечить эффективное управление циклической очередью)

Используя циклическую очередь, вы разрешаете выделение и освобождение с постоянным временем независимо от фрагментации. Вы также знаете, когда ваш пул заполнен (т. е. список свободных ресурсов пуст), и вы можете создать другой пул. Очевидно, что при создании пула необходимо заполнить свободный список.

Итак, ваш класс будет выглядеть примерно так:

template<class T, size_t initSize>
class MemPool
{
    vector<T> poolBuffer;              // The memory pool
    vector<unsigned short> freeList;   // Ring-buffer (indices of free items)
    unsigned short nHead, nTail;       // Ring-buffer management
    int nCount;                        // Number of elements in ring-buffer
    MemPool<T,initSize> *nextPool;     // For expanding memory pool

    // etc...
};

Теперь о блокировке. Если у вас есть доступ к инструкциям атомарного увеличения и уменьшения и вы достаточно осторожны, вы можете поддерживать свободный список с потокобезопасностью. Единственная требуемая блокировка в стиле мьютекса — это когда вам нужно выделить новый пул памяти.

Я изменил свое первоначальное мышление. Вам как бы нужны две атомарные операции, и вам нужно зарезервированное значение индекса (0xffff), чтобы вращаться для неатомарных операций в очереди:

// I'll have the functions atomic_incr() and atomic_decr().  The assumption here
// is that they do the operation and return the value as it was prior to the
// increment/decrement.  I'll also assume they work correctly for both int and
// unsigned short types.
unsigned short atomic_incr( unsigned short & );
int atomic_incr( int & );
int atomic_decr( int & );

Таким образом, распределение происходит примерно так:

T* alloc()
{
    // Check the queue size.  If it's zero (or less) we need to pass on
    // to the next pool and/or allocate a new one.
    if( nCount <= 0 ) {
        return alloc_extend();
    }

    int count = atomic_decr(nCount);
    if( count <= 0 ) {
        T *mem = alloc_extend();
        atomic_incr(nCount);     // undo
        return mem;
    }

    // We are guaranteed that there is at least 1 element in the list for us.
    // This will overflow naturally to achieve modulo by 65536.  You can only
    // deal with queue sizes that are a power of 2.  If you want 32768 values,
    // for example, you must do this: head &= 0x7fff;
    unsigned short head = atomic_incr(nHead);

    // Spin until the element is valid (use a reference)
    unsigned short & idx = freeList[head];
    while( idx == 0xffff );

    // Grab the pool item, and blitz the index from the queue
    T * mem = &poolBuffer[idx];
    idx = 0xffff;

    return mem;
};

В приведенном выше примере используется новая частная функция-член:

T * alloc_extend()
{
    if( nextPool == NULL ) {
        acquire_mutex_here();
        if( nextPool == NULL ) nextPool = new MemPool<T>;
        release_mutex_here();
        if( nextPool == NULL ) return NULL;
    }
    return nextPool->alloc();
}

Когда вы хотите освободиться:

void free(T* mem)
{
    // Find the right pool to free from.
    if( mem < &poolBuffer.front() || mem > &poolBuffer.back() )
    {
        if( nextPool ) nextPool->free(mem);
        return;
    }

    // You might want to maintain a bitset that indicates whether the memory has
    // actually been allocated so you don't corrupt your pool here, but I won't
    // do that in this example...

    // Work out the index.  Hope my pointer arithmetic is correct here.
    unsigned short idx = (unsigned short)(mem - &poolBuffer.front());

    // Push index back onto the queue.  As with alloc(), you might want to
    // use a mask on the tail to achieve modulo.
    int tail = atomic_incr(nTail);
    freeList[tail] = idx;

    // Don't need to check the list size.  We assume everything is sane. =)
    atomic_incr(nCount);
}

Обратите внимание, что я использовал значение 0xffff, фактически как NULL. Установка, очистка и вращение этого значения предназначены для предотвращения ситуации гонки. Вы не можете гарантировать, что безопасно оставлять старые данные в очереди, когда несколько потоков могут вызывать free, в то время как у вас есть другие потоки, вызывающие alloc. Ваша очередь будет проходить циклически, но данные в ней могут быть еще не установлены.

Вместо индексов, конечно, можно было бы просто использовать указатели. Но это 4 байта (или 8 байтов в 64-разрядном приложении), и накладные расходы памяти могут не стоить того, в зависимости от размера данных, которые вы объединяете. Лично я бы использовал указатели, но почему-то в этом ответе мне показалось проще использовать индексы.

person paddy    schedule 17.09.2012
comment
Мои извинения, если я промахнулся. Я как бы проигнорировал вещь «threadlocal». На самом деле, я раньше не встречал это ключевое слово =/ Надеюсь, мой ответ все еще полезен. - person paddy; 18.09.2012
comment
Таким образом, круговой буфер просто имеет индексы освобожденных элементов, которые я могу получить, я вижу ... и если это не удастся, я бы увеличил poolBuffer и, предположительно, изменил размер кругового буфера ... у меня есть атомарные приращения и декременты, каков общий механизм использования их здесь? О, и threadlocal я написал для ясности псевдокода, в коде я фактически написал __thread - person Palace Chan; 18.09.2012
comment
Нет, вы бы не увеличили круговой буфер. Вы просто выделяете новый MemPool и передаете ему запросы на выделение. При дальнейшем размышлении круговой буфер на самом деле не так прост, как просто атомарное приращение для указателя на начало и конец. Во-первых, вы не можете сделать это по модулю, если не установите размер пула в степень двойки. Во-вторых, вам нужно проверить голову и хвост одновременно с атомарным приращением. Если вы не можете этого сделать, вам также необходимо поддерживать размер списка (в виде подписанного целого числа) и выполнять для него атомарное incr/decr. Если он упадет ниже нуля... Что ж, я немного отредактирую свой ответ... - person paddy; 18.09.2012
comment
Сделано... э-э... как я уже сказал, вы должны быть очень осторожны. может я не правильно понял. Надеюсь, что это поможет в любом случае. - person paddy; 18.09.2012
comment
Мне нравится общая идея, с локальным потоком он может (я думаю) упростить часть распределения (каждый поток будет иметь свой собственный пул, но освобождение может произойти из любого потока). Я отредактирую свой ответ позже, когда перенесу то, что я думаю, в сочетании с некоторыми вашими предложениями в код! - person Palace Chan; 18.09.2012
comment
Мне кажется, все в порядке. Я новичок в этом локальном синтаксисе потока, но, похоже, это немного упрощает работу. Я полагаю, что недостатком этого является то, что если вам нужен огромный размер пула по умолчанию, и только один поток выполняет распределение, у вас будет много больших пустых пулов в других потоках. Вы сделали пару ошибок. Я отредактирую ваш вопрос, чтобы исправить. - person paddy; 19.09.2012
comment
Я отредактировал. Посмотрите, есть ли в этом смысл. Я понял, что bufferIdx — это твоя голова, а freeListIdx — твой хвост. Также предполагается, что когда они равны, список пуст. Вы не можете предполагать, что список пуст только потому, что свободный индекс ГРЯЗЕН (это просто означает, что в настоящее время он освобождается другим потоком). Поэтому я добавил ГРЯЗНУЮ петлю вращения. Если вы предпочитаете просто привязать к следующему пулу (или создать новый) в этом случае, затем измените, верните этот тест и выньте цикл... Но убедитесь, что вы сохранили сравнение головы и хвоста, потому что это будет указать, когда ваш бассейн исчерпан. - person paddy; 19.09.2012
comment
Локальные мемпулы потока создаются только при первом запросе к getNext из потока... поэтому только поток, который выделяет, будет иметь пулы памяти. Я немного запутался с добавлением спина. Ход моих мыслей был примерно таким: вы выделяете любой индекс в freeList, проходя по нему с помощью bufferIdx++; Парни, которые освобождают, делают это, записывая в freeList также с помощью атомарного freeListIdx++ (атомарного, потому что многие потоки могут освобождаться в один и тот же мемпул, но только один может выделять). - person Palace Chan; 19.09.2012
comment
В любой момент, когда освобождающий поток выполняет запись в freeList, сохраняется инвариант freeListIdx ‹ bufferIdx (по модулю). Это связано с тем, что вы можете освободить только то, что было выделено (и поэтому bufferIdx всегда будет как минимум на единицу впереди вас при проверке выделения). Это правда, что если вы зацикливаетесь, а освобождающий поток находится в середине освобождения грязной позиции, которую вы можете прочитать грязной, тогда освобождающий поток быстро записывает туда какой-то действительный idx... и вы выделяете новый пул... но это хорошо, в следующий раз поймаешь.. по крайней мере мне кажется безобидной гонкой.. или есть худший сценарий? - person Palace Chan; 19.09.2012
comment
Дело в том, что если у вас есть один или несколько потоков, освобождающих память в пуле другого потока, и этот пул в основном используется, вполне возможно, что, пока вы пытаетесь выделить память, может показаться, что в свободном списке есть элементы, но индексы не таки восстановлен. В этом случае выбор состоит в том, чтобы вращаться по значению (это будет очень небольшое количество вращения) или, как вы говорите, просто использовать следующий пул и знать, что в следующий раз индекс, вероятно, будет восстановлен. Вы, вероятно, имеете право выбрать последний вариант. Чем больше растет ваш пул, тем реже это будет происходить. - person paddy; 20.09.2012
comment
Вам не нужно делить на sizeof(T) в этой строке: unsigned short idx = (unsigned short)(mem - &poolBuffer.front()); ? - person Lennart Rolland; 02.03.2014
comment
@LennartRolland Нет, это вычисление индекса в векторе, тип данных элемента которого равен T. Арифметика указателя правильная. Возможно, вам нужно освежиться, написав себе тестовую программу, которая выводит результат арифметических операций с указателями для разных типов данных. - person paddy; 03.03.2014