создание демона с использованием Python libtorrent для получения метаданных более 100 тыс. торрентов

Я пытаюсь получить метаданные около 10 000 торрентов в день, используя python libtorrent.

Это текущий поток кода

  1. Запустите сеанс libtorrent.
  2. Получите общее количество торрентов, которые нам нужны метаданные для загрузки за последний 1 день.
  3. получить хеши торрентов из БД кусками
  4. создайте магнитную ссылку, используя эти хэши, и добавьте эти магнитные URI в сеанс, создав дескриптор для каждого магнитного URI.
  5. засыпайте на секунду, пока извлекаются метаданные, и продолжайте проверять, найдены ли метаданные или нет.
  6. Если метаданные получены, добавьте их в БД, иначе проверьте, искали ли мы метаданные около 10 минут, если да, то удалите дескриптор, т.е. больше не ищите метаданные.
  7. делать выше бесконечно. и сохранить состояние сеанса на будущее.

до сих пор я пробовал это.

#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri

import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity 
import os
from datetime import date, timedelta

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"

if(os.path.isfile(session_save_filename)):

    fileread = open(session_save_filename, 'rb')
    session.load_state(lt.bdecode(fileread.read()))
    fileread.close()
    print('session loaded from file')
else:
    print('new session started')

session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)

session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()

alive = True
while alive:

    db_conn = MySQLdb.connect(  host = '',  user = '',  passwd = '',    db = '',    unix_socket='/mysql/mysql.sock') # Open database connection
    #print('reconnecting')
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ;

    yesterday = date.today() - timedelta(1)
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
    #print(yesterday)

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
    #print(total_count_query)
    try:
        total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
        total_count_cursor.execute(total_count_query) # Execute the SQL command
        total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
        total_count = total_count_results[0]
        print(total_count)
    except:
            print "Error: unable to select data"

    total_pages = total_count/subset_count
    #print(total_pages)

    current_page = 1
    while(current_page <= total_pages):
        from_count = (current_page * subset_count) - subset_count

        #print(current_page)
        #print(from_count)

        hashes = []

        get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
        #print(get_mysql_data_query)
        try:
            get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
            get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
            get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
            for row in get_mysql_data_results:
                hashes.append(row[0].upper())
        except:
            print "Error: unable to select data"

        #print(hashes)

        handles = []

        for hash in hashes:
            tempdir = tempfile.mkdtemp()
            add_magnet_uri_params = {
                'save_path': tempdir,
                'duplicate_is_error': True,
                'storage_mode': lt.storage_mode_t(2),
                'paused': False,
                'auto_managed': True,
                'duplicate_is_error': True
            }
            magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
            #print(magnet_uri)
            handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
            handles.append(handle) #push handle in handles list

        #print("handles length is :")
        #print(len(handles))

        while(len(handles) != 0):
            for h in handles:
                #print("inside handles for each loop")
                if h.has_metadata():
                    torinfo = h.get_torrent_info()
                    final_info_hash = str(torinfo.info_hash())
                    final_info_hash = final_info_hash.upper()
                    torfile = lt.create_torrent(torinfo)
                    torcontent = lt.bencode(torfile.generate())
                    tfile_size = len(torcontent)
                    try:
                        insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
                        insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""",  [final_info_hash , torcontent] )
                        db_conn.commit()
                        #print "data inserted in DB"
                    except MySQLdb.Error, e:
                        try:
                            print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
                        except IndexError:
                            print "MySQL Error: %s" % str(e)    


                    shutil.rmtree(h.save_path())    #   remove temp data directory
                    session.remove_torrent(h) # remove torrnt handle from session   
                    handles.remove(h) #remove handle from list

                else:
                    if(h.status().active_time > 600):   # check if handle is more than 10 minutes old i.e. 600 seconds
                        #print('remove_torrent')
                        shutil.rmtree(h.save_path())    #   remove temp data directory
                        session.remove_torrent(h) # remove torrnt handle from session   
                        handles.remove(h) #remove handle from list
                sleep(1)        
                #print('sleep1')

        #print('sleep10')
        #sleep(10)
        current_page = current_page + 1

        #save session state
        filewrite = open(session_save_filename, "wb")
        filewrite.write(lt.bencode(session.save_state()))
        filewrite.close()


    print('sleep60')
    sleep(60)

    #save session state
    filewrite = open(session_save_filename, "wb")
    filewrite.write(lt.bencode(session.save_state()))
    filewrite.close()

Я попытался сохранить вышеприведенный скрипт, работающий на ночь, и обнаружил, что только метаданные 1200 торрентов найдены в ночном сеансе. поэтому я ищу улучшение производительности скрипта.

Я даже пытался расшифровать файл save_state и заметил, что есть более 700 DHT nodes, к которым я подключен. так что дело не в том, что DHT не работает,

Что я планирую сделать, так это keep the handles active в сеансе на неопределенный срок, пока метаданные не будут получены. и не собираюсь удалять дескрипторы через 10 минут, если в течение 10 минут не будут получены метаданные, как я сейчас это делаю.

У меня есть несколько вопросов относительно привязок Python к lib-torrent.

  1. Сколько ручек я могу продолжать работать? есть ли ограничения на запуск дескрипторов?
  2. Замедлит ли запуск 10k+ или 100k дескрипторов мою систему? или жрать ресурсы? если да то какие ресурсы? Я имею в виду оперативную память, сеть?
  3. Я нахожусь за брандмауэром, может ли быть заблокированный входящий порт, вызывающий медленную скорость выборки метаданных?
  4. может ли сервер DHT, такой как router.bittorrent.com или любой другой, заблокировать мой IP-адрес за отправку слишком большого количества запросов?
  5. Могут ли другие одноранговые узлы заблокировать мой IP-адрес, если узнают, что я делаю слишком много запросов только для получения метаданных?
  6. могу ли я запустить несколько экземпляров этого скрипта? или может быть многопоточности? даст ли это лучшую производительность?
  7. при использовании нескольких экземпляров одного и того же скрипта каждый скрипт получит уникальный идентификатор узла в зависимости от используемого IP-адреса и порта, является ли это жизнеспособным решением?

Есть ли лучший подход? для достижения того, что я пытаюсь?


person Community    schedule 30.09.2015    source источник


Ответы (1)


Я не могу ответить на вопросы, касающиеся API libtorrent, но некоторые из ваших вопросов относятся к bittorrent в целом.

Замедлит ли запуск 10k+ или 100k дескрипторов мою систему? или жрать ресурсы? если да то какие ресурсы? я имею в виду оперативную память, сеть?

Загрузки метаданных не должны использовать много ресурсов, поскольку они еще не являются полными загрузками торрентов, то есть они не могут выделить фактические файлы или что-то в этом роде. Но им потребуется некоторое пространство на оперативной памяти/диске для самих метаданных, как только они получат первую их часть.

Я нахожусь за брандмауэром, может ли быть заблокированный входящий порт, вызывающий медленную скорость выборки метаданных?

да, за счет уменьшения количества пиров, которые могут устанавливать соединения, становится сложнее получать метаданные (или вообще устанавливать какое-либо соединение) в роях с небольшим количеством пиров.

NAT может вызвать ту же проблему.

может ли сервер DHT, такой как router.bittorrent.com или любой другой, заблокировать мой IP-адрес за отправку слишком большого количества запросов?

router.bittorrent.com — это загрузочный узел, а не сервер как таковой. Поиски не запрашивают один узел, они запрашивают множество разных (среди миллионов). Но да, отдельные узлы могут заблокировать вас или, что более вероятно, ограничить скорость.

Это можно смягчить, ища случайно распределенные идентификаторы, чтобы распределить нагрузку по пространству ключей DHT.

Могу ли я запустить несколько экземпляров этого скрипта? или может быть многопоточности? даст ли это лучшую производительность?

AIUI libtorrent достаточно неблокирующий или многопоточный, поэтому вы можете запланировать несколько торрентов одновременно.

Я не знаю, есть ли у libtorrent ограничение на скорость исходящих запросов DHT.

при использовании нескольких экземпляров одного и того же скрипта каждый скрипт получит уникальный идентификатор узла в зависимости от используемого IP-адреса и порта, является ли это жизнеспособным решением?

Если вы имеете в виду идентификатор узла DHT, то он получен из IP-адреса (согласно BEP 42). ), а не порт. Хотя включен некоторый случайный элемент, поэтому для каждого IP можно получить ограниченное количество идентификаторов.

Кое-что из этого также может быть применимо к вашему сценарию: http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

И еще один вариант — моя собственная реализация DHT, которая включает интерфейс командной строки для массовой загрузки торрентов.

person the8472    schedule 03.10.2015