Пул соединений Cassandra Pycassa, как правильно использовать?

Чтобы ускорить вставку Cassandra, я использую многопоточность, она работает нормально, но если я добавлю больше потоков, это не будет иметь никакого значения, я думаю, что не генерирую больше соединений, я думаю, может быть, мне следует использовать пул. execute(f, *args, **kwargs) но я не знаю как им пользоваться, документация довольно скудная. Вот мой код до сих пор..

import connect_to_ks_bp
from connect_to_ks_bp import ks_refs
import time
import pycassa
from datetime import datetime 
import json
import threadpool
pool = threadpool.ThreadPool(20)
count = 1
bench = open("benchCassp20_100000.txt", "w")

def process_tasks(lines):

    #let threadpool format your requests into a list
    requests = threadpool.makeRequests(insert_into_cfs, lines)

    #insert the requests into the threadpool
    for req in requests:
        pool.putRequest(req) 

    pool.wait()

def read(file):
    """read data from json and insert into keyspace"""
    json_data=open(file)
    lines = []
    for line in json_data:
        lines.append(line)
    print len(lines)
    process_tasks(lines)


def insert_into_cfs(line):
    global count
    count +=1
    if count > 5000:
            bench.write(str(datetime.now())+"\n")
            count = 1
    #print count
    #print kspool.checkedout()
    """
    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet')
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName')
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet')
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower')
    """
    tweet_data = json.loads(line)
    """Format the tweet time as an epoch seconds int value"""
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000")
    tweet_time  = int(time.mktime(tweet_time))

    new_user_tweet(tweet_data['from_user_id'],tweet_time,tweet_data['id'])
    new_user_name(tweet_data['from_user_id'],tweet_data['from_user_name'])
    new_tweet(tweet_data['id'],tweet_data['text'],tweet_data['to_user_id'])

    if tweet_data['to_user_id'] != 0:
        new_user_follower(tweet_data['from_user_id'],tweet_data['to_user_id'])


""""4 functions below carry out the inserts into specific column families"""        
def new_user_tweet(from_user_id,tweet_time,id):
    ks_refs.user_tweet_cf.insert(from_user_id,{(tweet_time): id})

def new_user_name(from_user_id,user_name):
    ks_refs.user_name_cf.insert(from_user_id,{'username': user_name})

def new_tweet(id,text,to_user_id):
    ks_refs.tweet_cf.insert(id,{
    'text': text
    ,'to_user_id': to_user_id
    })  

def new_user_follower(from_user_id,to_user_id):
    ks_refs.user_follower_cf.insert(from_user_id,{to_user_id: 0})   

    read('tweets.json')
if __name__ == '__main__':

Это просто другой файл..

import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily

"""This is a static class I set up to hold the global database connection stuff,
I only want to connect once and then the various insert functions will use these fields a lot"""
class ks_refs():
    pool = ConnectionPool('TweetsKS',use_threadlocal = True,max_overflow = -1)

    @classmethod
    def cf_connect(cls, column_family):
        cf = pycassa.ColumnFamily(cls.pool, column_family)
        return cf

ks_refs.user_name_cfo = ks_refs.cf_connect('UserName')
ks_refs.user_tweet_cfo = ks_refs.cf_connect('UserTweet')
ks_refs.tweet_cfo = ks_refs.cf_connect('Tweet')
ks_refs.user_follower_cfo = ks_refs.cf_connect('UserFollower')

#trying out a batch mutator whihc is supposed to increase performance
ks_refs.user_name_cf = ks_refs.user_name_cfo.batch(queue_size=10000)
ks_refs.user_tweet_cf = ks_refs.user_tweet_cfo.batch(queue_size=10000)
ks_refs.tweet_cf = ks_refs.tweet_cfo.batch(queue_size=10000)
ks_refs.user_follower_cf = ks_refs.user_follower_cfo.batch(queue_size=10000)

person Guye Incognito    schedule 12.12.2012    source источник


Ответы (1)


Несколько мыслей:

  • Размер партии в 10 000 слишком велик. Попробуйте 100.
  • Сделайте размер пула подключений как минимум равным количеству потоков, используя параметр pool_size. Значение по умолчанию — 5. Переполнение пула следует использовать только в том случае, если количество активных потоков может меняться с течением времени, а не при фиксированном количестве потоков. Причина в том, что это приведет к большому количеству ненужных открытий и закрытий новых соединений, что является довольно дорогостоящим процессом.

После того, как вы решили эти проблемы, изучите следующие:

  • Я не знаком с библиотекой потоков, которую вы используете. Убедитесь, что если вы уберете вставки в Cassandra из картины, вы увидите увеличение производительности при увеличении количества потоков.
  • Сам Python имеет ограничение на количество полезных потоков из-за GIL. Обычно он не должен превышать 20, но может быть, если вы делаете что-то с интенсивным использованием процессора или что-то, что требует много интерпретации Python. Тест, который я описал в предыдущем пункте, также покроет это. Возможно, вам следует рассмотреть возможность использования модуля multiprocessing, но для этого вам потребуются некоторые изменения кода (а именно, отказ от совместного использования ConnectionPools, CF или чего-либо еще между процессами).
person Tyler Hobbs    schedule 31.12.2012