Как ray работает с переменными объемлющей области видимости?

Рассмотрим следующий пример:

import numpy as np
import ray
import time

A = np.array([42] * 4200)   

@ray.remote
def foo1(x):
    x[5]**2

@ray.remote
def foo2(j):
    A[j]**2

ray.init()

#
# some warmup for ray
#

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start

print(time_foo1, time_foo2)

ray.shutdown()

Похоже, что time_foo2 значительно меньше, чем time_foo1. Мое наивное предположение состояло бы в том, что ray сериализует A каждый раз, когда вызывается foo1. Однако, даже если я вручную помещу A в хранилище объектов и передам ссылку на объект foo1, я не вижу никакого улучшения. Может ли кто-нибудь просветить меня, что здесь происходит за кулисами?


ray
person jimmyundjonny    schedule 03.09.2020    source источник


Ответы (1)


Когда я запускаю ваш код, я получаю 0.8745803280000004 0.672677727. Итак, foo2 меньше, но ненамного (возможно, A в исходном коде было больше?). При этом, вот объяснение того, что делает Рэй.

Когда функция аннотируется с помощью ray.remote, она сериализуется, чтобы ее можно было отправить удаленным процессам для запуска. Рэй использует cloudpickle для сериализации. Когда функция сериализуется, ее глобальные зависимости также сериализуются.

В следующем примере A является примером зависимости от глобальной переменной, которую необходимо сериализовать.

@ray.remote
def foo2(j):
    A[j]**2

Когда вызывается удаленная функция, Рэй должен передать аргументы удаленной функции. Есть оптимизации для маленьких объектов, но для больших объектов логика примерно такая:

for each arg:
    if arg is an ObjectRef,
        do nothing
    else,
        replace arg with ray.put(arg)

На удаленном работнике, когда вызывается удаленная функция, мы вызываем ray.get для всех ObjectRef перед фактическим вызовом функции (опять же, мы фокусируемся только на больших объектах). ray.get может выиграть от таких оптимизаций, как кэширование или чтение с нулевым копированием, поэтому часто это намного дешевле, чем ray.put.

На практике это означает, что следующий код

@ray.remote
def foo(arg):
    # At this point ray.get(arg_ref) has already happened

A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now

В то время как если мы явно вызовем ray.put, мы можем сохранить себе put

A_ref = np.put(A) 
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called

Когда я запускаю эти примеры с матрицей из 1 миллиона записей для A, я получаю следующее время (вот мой пример кода< /а>):

Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999

Обратите внимание, что, хотя сериализация A была быстрой, это плохая практика и не рекомендуется. Это связано с тем, что объекты помещаются в хранилище объектов, а сериализованные функции помещаются в хранилище элементов управления, а хранилище элементов управления не предназначено для передачи больших объемов данных.

person Alex    schedule 06.09.2020
comment
Большое спасибо за этот исчерпывающий ответ! - person jimmyundjonny; 10.09.2020