Как загружать изображения асинхронно с RxJ и выполнять метод, когда все загружено

Я пытаюсь преобразовать свой код, основанный на обещаниях, в RxJ, но мне трудно понять Rx, особенно RxJ.

У меня есть массив с путями.

var paths = ["imagePath1","imagePath2"];

И мне нравится загружать изображения в Javascript

var img = new Image();
img.src = imagePath;
image.onload // <- when this callback fires I'll add them to the images array

и когда все изображения загружены, мне нравится выполнять метод.

я знаю, что есть

Rx.Observable.fromArray(imagepathes)

тоже есть что-то вроде

Rx.Observable.fromCallback(...)

и есть что-то вроде flatMapLatest(...) И Rx.Observable.interval или планировщик по времени

Основываясь на своем исследовании, я бы предположил, что это будут ингредиенты для решения этой проблемы, но я не могу заставить композицию работать.

Итак, как мне загрузить изображения из путей массива, и когда все изображения загружены, я выполняю метод, основанный на интервале?

Спасибо за любую помощь.


person silverfighter    schedule 12.07.2015    source источник
comment
Я использовал forkJoiin для аналогичной нужды (если я правильно понял вашу).   -  person PhiLho    schedule 18.02.2016


Ответы (7)


Сначала вам нужна функция, которая создаст Observable или Promise для отдельного изображения:

function loadImage(imagePath){
   return Rx.Observable.create(function(observer){
     var img = new Image();
     img.src = imagePath;
     img.onload = function(){
       observer.onNext(img);
       observer.onCompleted();
     }
     img.onError = function(err){
       observer.onError(err);
     }
   });
}

Чем вы можете использовать его для загрузки всех изображений

Rx.Observable
  .fromArray(imagepathes)
  .concatMap(loadImage) // or flatMap to get images in load order
  .toArray()
  .subscribe(function(images){
    // do something with loaded images
  })
person Bogdan Savluk    schedule 12.07.2015

function loadImage(url){
    var img = new Image;
    img.src = url;
    var o = new Rx.Subject();
    img.onload = function(){ o.onNext(img); o.onCompleted(); };
    img.onerror = function(e){ o.onError(e); }; // no fromEvent for err handling
    return o;
}

var imageUrls = ['url1', 'url2', 'url3'];
var joined = Rx.Observable.merge(imageUrls.map(loadImage));

// consume one by one:
joined.subscribe(function(item){
    // wait for item
});

joined.toArray().subscribe(function(arr){
    // access results array in arr
});

Или короче:

var imageUrls = ['url1', 'url2', 'url3'];
fromArray(imageUrls).map(url => {
    var img = new Image;
    img.src = url;
    return fromEvent(img, "load");
}).toArray().subscribe(function(arr){
    // access results here
});
person Benjamin Gruenbaum    schedule 12.07.2015
comment
Ага, так что в основном очень похоже на версию Promises. +1. - person Madara's Ghost; 12.07.2015
comment
@MadaraUchiha Я только что добавил «сахарную» версию внизу. - person Benjamin Gruenbaum; 12.07.2015
comment
Использование Subject является излишним, также в случае ошибки вы не сможете использовать оператор retry для результата loadImage. Использование merge для объединения результатов также немного неправильно - вы получите изображения в порядке загрузки, а не в порядке из исходного списка URL-адресов. - person Bogdan Savluk; 12.07.2015
comment
@BogdanSavluk Я предоставил ответ с темой и ответ без нее, используя fromEvent для обработки ошибок. Вы можете обернуть loadImage таким образом, чтобы его можно было легко повторить, но это не очень важно. На практике я бы, скорее всего, использовал код fromEvent. Порядок не важен, и если бы я был просто reduce или flatMap, это не важно, поскольку OP всегда может использовать .src для результатов и знать, что есть что. - в любом случае ключевой частью является .toArray, который преобразует последовательность из n событий в одно событие для массива n результатов. - person Benjamin Gruenbaum; 12.07.2015
comment
Похоже, fromEvent не ожидает обратного вызова, поэтому возвращает AnonymousObservable с неопределенным источником. fromCallback вроде работает, но вместо изображения возвращает обратный вызов. Можете ли вы подтвердить это или я что-то неправильно понял? - person silverfighter; 13.07.2015

Я не думаю, что вы можете легко сделать это с наблюдаемыми, поскольку там нет ничего, что указывало бы на завершение (если у вас нет начального размера). Посмотрите другие ответы для версии Rx.

Однако вы можете использовать массив промисов:

/**
 * Loads an image and returns a promise
 * @param {string} url - URL of image to load
 * @return {Promise<Image>} - Promise for an image once finished loading.
 */
function loadImageAsync(url) {
    return new Promise(function(resolve, reject) {
        var img = new Image();
        img.src = imagePath;
        image.onload = function() { resolve(img); };
        image.onerror = reject;
    });
}

И с этим вы можете легко сделать что-то вроде этого:

var imageUrls = ['url1', 'url2', 'url3'];
Promise.all(imageUrls.map(loadImageAsync))
    .then(function(arrayOfImageElements) {
        // All done!
    });
person Madara's Ghost    schedule 12.07.2015
comment
Несмотря на то, что я сказал это, я был бы очень рад, если бы кто-то доказал, что я неправ, и показал путь с наблюдаемыми Rx. - person Madara's Ghost; 12.07.2015
comment
Не могли бы вы использовать startAsync и объединить добиться того же с полным обратным вызовом? - person marekful; 12.07.2015
comment
@marekful Разве это не было бы точно так же, только с дополнительными накладными расходами на инициализацию Rx? - person Madara's Ghost; 12.07.2015
comment
ухх... У наблюдаемых RxJS есть обратные вызовы завершения. Конечно, вы можете указать конец. - person Benjamin Gruenbaum; 12.07.2015
comment
@BenjaminGruenbaum Как вы можете указать на завершение загрузки набора изображений? Если я правильно понял, в его текущем коде обработчик onload помещает новое значение в наблюдаемое. Как вы можете сказать, когда все они закончились? - person Madara's Ghost; 12.07.2015
comment
@MadaraUchiha Я добавил ответ, показывающий, как это сделать. - person Benjamin Gruenbaum; 12.07.2015
comment
Спасибо за ответ, он у меня уже работает, и Promises специально хочет преобразовать его в RX. Изображение представляет собой ‹img /› только из кода, который срабатывает при загрузке. Может быть, мне нужно написать собственный наблюдатель? - person silverfighter; 12.07.2015
comment
@silverfighter Взгляните на другие ответы для наблюдаемого примера. По сути, вы создаете небольшую наблюдаемую для каждого изображения, а затем объединяете их, чтобы сформировать большую наблюдаемую. - person Madara's Ghost; 12.07.2015
comment
@silverfighter совершенно нормально смешивать и сочетать Rx с обещаниями. Rx поддерживает это. - person Benjamin Gruenbaum; 12.07.2015

Другие решения на основе RX мне не подошли. Вариант Богдана Савлюка вообще не работал. Версия Бенджамина Грюнбаума ждет загрузки изображения, прежде чем начать загрузку следующего изображения, поэтому оно становится очень медленным (поправьте меня, если я ошибаюсь). Вот мое решение, которое просто сравнивает общее количество изображений с количеством уже загруженных изображений, и если они равны, метод onNext() возвращаемого Observable вызывается с массивом изображений в качестве аргумента:

var imagesLoaded = function (sources) {

  return Rx.Observable.create(function (observer) {

    var numImages = sources.length
    var loaded = 0
    var images = []

    function onComplete (img) {
      images.push(img)
      console.log('loaded: ', img)

      loaded += 1
      if (loaded === numImages) {
        observer.onNext(images)
        observer.onCompleted()
      }
    }

    sources.forEach(function (src) {
      var img = new Image()
      img.onload = function () {
        onComplete(img)
      }
      console.log('add src: ' + src)
      img.src = src
      if (img.complete) {
        img.onload = null
        onComplete(img)
      }

    })

  })

}

Применение:

console.time('load images'); // start measuring execution time

imagesLoaded(sources)
  // use flatMap to get the individual images
  // .flatMap(function (x) {
  //   return Rx.Observable.from(x)
  // })

  .subscribe(function (x) {
    console.timeEnd('load images'); // see how fast this was
    console.log(x)
  })
person gang    schedule 16.08.2015

Вот версия Angular/Typescript для загрузки изображения с помощью RxJS:

import { Observable, Observer } from "rxjs"; 

public loadImage(imagePath: string): Observable<HTMLImageElement> {
  return Observable.create((observer: Observer<HTMLImageElement>) => {
    var img = new Image();
    img.src = imagePath;
    img.onload = () => {
      observer.next(img);
      observer.complete();
    };
    img.onerror = err => {
      observer.error(err);
    };
  });
}
person Tilo    schedule 14.12.2018

Я думаю, вам не нужно создавать Observable самостоятельно для этого.

import { from, fromEvent } from 'rxjs';
import { mergeMap, map, scan, filter } from 'rxjs/operators';

const paths = ["imagePath1","imagePath2"];

from(paths).pipe(
   mergeMap((path) => {
      const img = new Image();

      img.src = path;
      return fromEvent(img, 'load').pipe(
          map((e) => e.target)
      );
   }),
   scan((acc, curr) => [...acc, curr], []),
   filter((images) => images.length === paths.length)
).subscribe((images) => {
   // do what you want with images
});
person DongBin Kim    schedule 06.08.2018
comment
опечатка должна быть filter((images) => images.length === paths.length) - person Kevin K.; 06.03.2019

Вот действительно лучшая реализация, которая отменяет загрузку изображения, если вы отписываетесь от Observable https://stackblitz.com/edit/rxjs-loadimage?file=index.ts

import { Observable, Subscriber } from "rxjs";

/**
 * RxJS Observable of loading image that is cancelable
 */
function loadImage(
  url: string,
  crossOrigin?: string
): Observable<HTMLImageElement> {
  return new Observable(function subscriber(subscriber) {
    let img = new Image();
    img.onload = function onload() {
      subscriber.next(img);
      subscriber.complete();
    };
    img.onerror = function onerror(err: Event | string) {
      subscriber.error(err);
    };
    // data-urls appear to be buggy with crossOrigin
    // https://github.com/kangax/fabric.js/commit/d0abb90f1cd5c5ef9d2a94d3fb21a22330da3e0a#commitcomment-4513767
    // see https://code.google.com/p/chromium/issues/detail?id=315152
    //     https://bugzilla.mozilla.org/show_bug.cgi?id=935069
    // crossOrigin null is the same as not set.
    if (
      url.indexOf("data") !== 0 &&
      crossOrigin !== undefined &&
      crossOrigin !== null
    ) {
      img.crossOrigin = crossOrigin;
    }
    // IE10 / IE11-Fix: SVG contents from data: URI
    // will only be available if the IMG is present
    // in the DOM (and visible)
    if (url.substring(0, 14) === "data:image/svg") {
      // TODO: Implement this :)
      // img.onload = null;
      // fabric.util.loadImageInDom(img, onLoadCallback);
    }
    img.src = url;
    return function unsubscribe() {
      img.onload = img.onerror = undefined;
      if (!img.complete) {
        img.src = "";
      }
      img = undefined;
    };
  });
}

// Example
const cacheBurst = new Date().getTime();
const imgUrl = `https://i.pinimg.com/originals/36/0c/62/360c628d043b2461d011d0b7f9b4d880.jpg?nocache=${cacheBurst}`;

const s = loadImage(imgUrl).subscribe(
  img => {
    console.log("Img", img);
  },
  err => {
    console.log("Err", err);
  }
);

setTimeout(() => {
  // uncomment to check how canceling works
  // s.unsubscribe();
}, 100);

person the_smoke    schedule 20.01.2021