Быстрый или массовый Upsert в pymongo

Как я могу сделать массовое обновление в pymongo? Я хочу обновить кучу записей, а делать их по одной очень медленно.

Ответ на почти идентичный вопрос находится здесь: Массовое обновление/upsert в MongoDB?

Принятый ответ на самом деле не отвечает на вопрос. Он просто дает ссылку на интерфейс командной строки mongo для импорта/экспорта.

Я также был бы открыт для кого-то, кто объяснит, почему массовая рассылка невозможна / не является лучшей практикой, но, пожалуйста, объясните, какое решение является предпочтительным для такого рода проблем.


person ComputationalSocialScience    schedule 13.03.2011    source источник


Ответы (6)


Современные выпуски pymongo (выше 3.x) заключают массовые операции в согласованный интерфейс, который понижает версию там, где версия сервера не поддерживает массовые операции. Теперь это согласовано с официально поддерживаемыми драйверами MongoDB.

Поэтому предпочтительным методом кодирования является использование bulk_write() вместо этого, где вы используете UpdateOne другое другое соответствующее действие операции вместо этого. И теперь, конечно, предпочтительнее использовать списки на естественном языке, а не конкретный конструктор

Прямой перевод старой документации:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

Или классический цикл трансформации документа:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

Возвращаемый результат имеет вид BulkWriteResult, который будет содержать счетчики совпадающих и обновленных документов, а также возвращаемые значения _id для любых происходящих "upserts".

Существует некоторое неправильное представление о размере массива массовых операций. Фактический запрос, отправленный на сервер, не может превышать ограничение BSON в 16 МБ, поскольку это ограничение также применяется к «запросу», отправляемому на сервер, который также использует формат BSON.

Однако это не влияет на размер массива запросов, который вы можете создать, поскольку фактические операции в любом случае будут отправляться и обрабатываться только партиями по 1000. Единственное реальное ограничение состоит в том, что сами по себе эти 1000 рабочих инструкций фактически не создают документ BSON размером более 16 МБ. Что действительно является довольно высоким заказом.

Общая концепция массовых методов — «меньше трафика» в результате одновременной отправки многих вещей и обработки только одного ответа сервера. Сокращение накладных расходов, связанных с каждым отдельным запросом на обновление, экономит много времени.

person Neil Lunn    schedule 25.03.2016
comment
Извините, но я не понимаю, зачем вы делите свои партии в своем цикле, если вы знаете, что ваш драйвер делает это сам. Я имею в виду, что в вашем примере 1000 документов UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } }) имеют размер менее 16 МБ, и очевидно, что pymongo может разделить ваши данные, и запрос будет успешным. Зачем вы делите свои данные руками? - person Greg Eremeev; 27.04.2017
comment
@Budulianin, потому что он итерирует курсор, и это очень много и анти-шаблон для загрузки всей коллекции в память. Вот в основном, почему они упакованы в разумные размеры. Кроме того, это должно проходить через сеть, и особенно с учетом асинхронных сред (где базовая техника остается прежней), вы можете в основном отправлять данные по сети и создавать другой пакет, ожидая подтверждения. Если вы думаете, что вы что-то выиграете, сделав одну большую партию, а не несколько разумных, то в большинстве случаев я бы сказал, что это маловероятно. - person Neil Lunn; 30.06.2017
comment
Я знаю, что этому ответу уже около 4 лет, но будет ли update_many подходящей альтернативой? api.mongodb.com/python/ текущий/апи/пимонго/ - person Tomiwa; 29.10.2020

MongoDB 2.6+ поддерживает массовые операции. Это включает в себя массовые вставки, обновления, обновления и т. д. Смысл этого в том, чтобы уменьшить/устранить задержки из-за задержки в оба конца при выполнении операций «запись за записью» («документ за документом», чтобы быть правильным).

Так, как это работает? Пример на Python, потому что это то, над чем я работаю.

>>> import pymongo
>>> pymongo.version
'2.7rc0'

Чтобы использовать эту функцию, мы создаем «массовый» объект, добавляем к нему документы, затем вызываем для него выполнение, и он сразу отправляет все обновления. Предостережения: BSONsize собранных операций (сумма bsonsize) не может превышать предельный размер документа в 16 МБ. Конечно, количество операций при этом может существенно различаться, Ваш пробег может варьироваться.

Пример в Pymongo массовой операции upsert:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

Это основной метод. Дополнительная информация доступна по адресу:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Изменить: - начиная с версии 3.5 драйвера Python, initialize_ordered_bulk_op устарел. Вместо этого используйте bulk_write(). [ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

person Kevin J. Rice    schedule 25.03.2014
comment
Нет ли способа избежать выполнения поиска для каждой операции? Учтите, что я хочу обновить список объектов json, встроенных в документ, таким образом, я должен каждый раз находить этот документ и обновлять его, это не кажется эффективным. - person SpiXel; 11.07.2016
comment
Это просто способ записи MongoDB, а не моя идея. В любом upsert или update вы должны найти документы, которые хотите обновить, и эта находка указывает, с какими из них нужно действовать. ТАКЖЕ, он должен быть эффективным для больших групп действий, но не обязательно для вашего конкретного приложения, поэтому, как говорится, ваш пробег может варьироваться. - person Kevin J. Rice; 21.07.2016
comment
Синтаксис initialize_ordered_bulk_op() теперь устарел: api.mongodb.com/python/current / - person duhaime; 21.01.2018

если у вас много данных и вы хотите использовать "_id" для оценки существования данных,

можешь попробовать...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)

Мой английский очень плохой, если вы не понимаете, что я говорю, извините

person 袁華谷-Andy    schedule 25.01.2018

Ответ остается прежним: нет поддержки массовых upserts.

person Andreas Jung    schedule 13.03.2011
comment
Это больше не так. Смотрите ответ Кевина. - person Jordan Feldstein; 23.04.2016
comment
Bulk.find.upsert() docs.mongodb.com /manual/reference/method/Bulk.find.upsert/ - person Tanuj; 25.08.2017

Вы можете обновить все документы, соответствующие спецификации вашего запроса, используя multi=True.

Здесь есть ошибка о выполнении пакета команд так, как вы хотите.

person Bernie Hackett    schedule 14.03.2011

Самое быстрое массовое обновление с Python 3.5+, мотором и асинхронностью:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
person El Ruso    schedule 07.12.2016
comment
Почему по модулю 1000? Я думал, что массовые операции монго «автоматически» выполняются по 2000 операций за раз... - person duhaime; 16.03.2017
comment
@duhaime Хороший вопрос. Я могу сослаться на документацию, но выглядит как Ограничение 16 МБ для BSON будет более точным значением. К сожалению, я понятия не имею, как измерить размер запросов - person El Ruso; 16.03.2017
comment
@duhaime Кстати, в соответствии с документами последнего двигателя, мы можно вообще об этом не беспокоиться - person El Ruso; 16.03.2017