Camel REST: обработка асинхронных запросов для долго работающих HTTP-сервисов

Я пытался найти актуальный пример для такого рода проблем, но, к сожалению, не нашел. Я пытаюсь реализовать веб-сервис с верблюдом, который должен вести себя следующим образом:

  • Camel получает входные данные от Rest-Endpoint либо через GET, либо через POST (api / startsearch)
  • bean-компонент обрабатывает ввод и генерирует идентификатор билета
  • тот же компонент отвечает клиенту HTTP-202 или кодом состояния перенаправления, включая URL-адрес перенаправления (api / result? ticket-id = jf3298u23).
  • bean также передает ввод в очередь запуска activemq, где маршрут Camel будет выполнять всю свою долгую обработку.
  • По завершении маршрута результат должен быть доступен по URL-адресу перенаправления (/ result? Ticket-id = jf3298u23). Если обработка еще не завершена, он должен ответить пользовательским кодом состояния, например HTTP-299-processing.

Итак, мой маршрут выглядит так:

rest().path(apiPath).produces("application/json")
            .get(searchEndpoint)
            .to("bean:requestHandler?method=processNewSearch") // generate ticket-id and reply with 202 or 3xx
            .route().inOnly("activemq:queue:start").endRest() // put the incoming message into the start-queue where the processing starts
            .get(resultEndpoint).to("bean:requestHandler?method=returnResult"); // return 299 when processing not done or 200 + result

from("activemq:queue:start")
            .setHeader("recipients").method(new ExtractRecipients(), "extractRecipients")
            .to("activemq:queue:recipientlist");

... etc, etc... until:

from("activemq:queue:output")
            .to("bean:requestHandler?method=saveFinishedSearch");

Сам bean-компонент имеет три метода:

public void processNewSearch(Exchange exchange) {
    //generate ticket and stuff and finally set Header and body of the response

    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 202);
    exchange.getOut().setBody(redirectUrl);
}

public void returnResult(Exchange exchange) {
    //handle ticket related stuff, if valid fetch result and create http response:
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody(searchResult);
        return;
}

public void saveFinishedSearch(Exchange exchange) {
    // get search Results from the final Message that was processing asynchronously in the background and save it
    finishedSearches.put(ticket, body);
}

Я уверен, что это неправильный способ ответа с помощью вручную установленных кодов ответа и сообщений, но я не нашел другого способа сделать это.

Таким образом, проблема в настоящее время заключается в том, что верблюд ждет, пока все сообщение будет обработано, и поэтому ответ, сгенерированный .to("bean:requestHandler?method=processNewSearch"), ничего не делает, поскольку он просто будет помещен в очередь запуска.

Как мне немедленно вернуть настраиваемый ответ с помощью верблюда и позволить маршруту обрабатывать запрос асинхронно?


person user3238620    schedule 13.06.2018    source источник


Ответы (1)


Прежде всего, вам следует придерживаться протокола HTTP и запускать только фоновые задачи с помощью POST операций. Вы, вероятно, не хотите, чтобы краулер запускал длительный фоновый процесс с помощью GET запросов, не так ли?

Таким образом, вы также должны использовать HTTP-заголовок Location для возврата URI ресурса, из которого можно получить дополнительную информацию о текущем состоянии процесса. Я бы также использовал общий URI, а не какое-то перенаправление.

В вашей настройке маршрута я обычно также сохраняю все зависящие от маршрута вещи в блоке .route(). Мы поддерживаем процесс сборки каталога и систему архива сообщений EDI, которая будет собирать сообщения, отправленные и / или полученные в определенный период времени из-за немецкого законодательства, вынуждающего клиентов создавать резервные копии своих сообщений EDI, которыми они обменивались.

Мы разделяем запуск нового запроса на архивирование или сборку и получение текущего состояния запроса.

rest("/archives")
  .post()
    .bindingMode(RestBindingMode.json)
    .type(ArchiveRequestSettings.class)
    .consumes(MediaType.APPLICATION_JSON)
    .produces(MediaType.APPLICATION_JSON)
    .description("Invokes the generation of a new message archive for 
                 "messages matching a criteria contained in the payload")

    .route().routeId("create-archives")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // extract specified settings
      .bean(ExtractArchiveRequestSettings.class)
      // forward the task to the archive generation queue
      .to(SomeEndpoints.ARCHIVE_GENERATION_QUEUE)
      // return 202 Accepted response
      .bean(ReturnArchiveRequestCreatedStatus.class)
    .endRest()

  .get("/{archiveId}")
    .bindingMode(RestBindingMode.json)
    .outType(ArchiveRequestEntity.class)
    .produces(MediaType.APPLICATION_JSON)
    .description("Returns the status of the message archive generation process." 
                 + " If the process has finished this operation will return the"
                 + " link to the download location of the generated archive")

    .route().routeId("archive-status")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // return the current state of the task to the client. If the job is done, 
      // the response will also include a download link as wel as an MD5 hash to
      // verify the correctness of the downloaded archive
      .bean(ReturnArchiveRequestStatus.class)
    .endRest();

Класс ExtractArchiveRequestSettings просто выполняет проверки корректности полученной полезной нагрузки и устанавливает значения по умолчанию для отсутствующих полей. После этого запрос сохраняется в базе данных, а его уникальный идентификатор сохраняется в заголовке.

ArchiveRequestSetting действительно похож на пример ниже (немного упрощенный)

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ArchiveRequestSettings {

  /** Specifies if sent or received messages should be included in the artifact. Setting this field
   * to 'DELIVERED' will include only delivered documents where the companyUuid of the requesting
   * user matches the documents sender identifier. Specifying this field as 'RECEIVED' will include
   * only documents whose receiver identifier matches the companyUuid of the requesting user. **/
  private String direction;

  /** The naming schema of entries within the archive **/
  private String entryPattern;

  /** The upper timestamp bound to include messages. Entries older than this value will be omitted **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date from;
  /** The lower timestamp bound to include messages. Entries younger than this value will be
   * omitted. If left empty this will include even the most recent messages. **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date till;
}

Класс ReturnArchiveRequestCreatedStatus ищет сохраненный объект запроса и возвращает его с ответом 202 Accepted.

@Handler
public void returnStatus(Exchange exchange) {

    String archiveId = exchange.getIn().getHeader(HeaderConstants.ARCHIVES_REQUEST_ID, String.class);
    ArchiveRequestEntity archive = repository.findOne(archiveId);

    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 202); // Accepted
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setHeader("Location", archiveLocationUrl + "/" + archiveId);

    msg.setBody(archive);

    exchange.setOut(msg);
}

Возврат текущего состояния сохраненного запроса гарантирует, что клиент может проверить, какие параметры действительно были применены, и может обновить их, если какие-то параметры по умолчанию неудобны или необходимо применить дополнительные изменения.

Фактический процесс поддержки запускается путем отправки обмена в очередь Redis, которая используется на другом компьютере. Результатом этого процесса будет архив, содержащий запрошенные файлы, который будет загружен в общедоступное место, и только ссылка будет сохранена в объекте запроса. Обратите внимание, что у нас есть настраиваемый компонент camel, который имитирует точку входа seda только для очередей Redis. Однако использования seda должно быть достаточно, чтобы начать обработку задачи в другом потоке.

В зависимости от текущего статуса процесса поддержки сохраненный объект запроса будет обновлен процессом поддержки. При получении запроса статуса (через GET) хранилище данных запрашивает текущий статус и сопоставляется с определенными ответами:

public class ReturnArchiveRequestStatus {

  @Resource
  private ArchiveRequestRepository repository;

  @Handler
  public void returnArchiveStatus(Exchange exchange) throws JSONException {

    String archiveId = exchange.getIn().getHeader("archiveId", String.class);

    if (StringUtils.isBlank(archiveId)) {
      badRequest(exchange);
      return;
    }

    ArchiveRequestEntity archive = repository.findOne(archiveId);
    if (null == archive) {
      notFound(archiveId, exchange);
      return;
    }

    ok(archive, exchange);
  }

  private void badRequest(Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "No archive identifier found");

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void notFound(String archiveId, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 403);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "Could not find pending archive process with ID " + archiveId);

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void ok(UserArchiveRequestEntity archive, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);

    msg.setBody(archive);
    exchange.setOut(msg);
  }
}

Фактическая сущность, хранящаяся и обновляемая в течение всего процесса, выглядит примерно так (упрощенно):

@Getter
@Setter
@Builder
@ToString
@Document(collection = "archive")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ArchiveRequestEntity {

  /**
   * The current state of the archiving process
   */
  public enum State {
    /** The request to create an archive was cued but not yet processed **/
    QUEUED,
    /** The archive is currently under construction **/
    RUNNING,
    /** The archive was generated successfully. {@link #downloadUrl} should contain the link the
     * archive can be found **/
    FINISHED,
    /** Indicates that the archive generation failed. {@link #error} should indicate the actual
     * reason why the request failed **/
    FAILED
  }

  @Id
  @JsonIgnore
  private String id;

  /** Timestamp the process was triggered **/
  @JsonIgnore
  @Indexed(expireAfterSeconds = DEFAULT_EXPIRE_TIME)
  private Date timestamp = new Date();

  /** The identifier of the company to create the archive for **/
  private String companyUuid;

  /** The state this archive is currently in **/
  private State state = State.QUEUED;

  ...

  /** Marks the upper limit to include entries to the archive. Entries older then this field will
   * not be included in the archives while entries equal or younger than this timestamp will be
   * included unless they are younger than {@link #till} timestamp **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date from;
  /** Marks the lower limit to include entries to the archive. Entries younger than this field will
   * not be included in the archive **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date till;

  /** Information on why the archive creation failed **/
  private String error;

  /** The URL of the final archive to download **/
  private String downloadUrl;

  /** The MD5 Hash of the final artifact in order to guarantee clients an unmodified version of the
   * archive **/
  private String md5Hash;

  ...
}

Обратите внимание, что независимо от текущего состояния статуса обработки 200 OK возвращается с текущим представлением JSON статуса процессов. Клиент либо увидит состояние FINISHED с установленными свойствами downloadUrl и md5Hash, либо другой статус с другими доступными свойствами.

Разумеется, процессу резервного копирования необходимо соответствующим образом обновить статус запроса, поскольку в противном случае клиент не сможет получить правильную информацию о текущем статусе запроса.

Этот подход должен быть применим практически к любым длительно выполняющимся процессам, хотя внутреннее устройство передаваемой вами информации, вероятно, будет отличаться от нашего сценария. Надеюсь, это поможет

person Roman Vottner    schedule 13.06.2018
comment
Спасибо за ответ - это очень помогает увидеть актуальный для практики код, как используется camel. Я согласен использовать POST вместо GET. Я понимаю, что вы сохраняете запрос в БД, где он обрабатывается другой машиной, которая постоянно обновляет данные, пока процесс не будет завершен. Мой опыт состоит в том, чтобы делать больше доказательств концепции для университетской диссертации. Поэтому я стараюсь сделать это как можно проще. Желательно без БД, но вместо этого Очереди и Карты ‹› для хранения. - person user3238620; 13.06.2018
comment
# 2 ...: Ваш способ отправки ответного сообщения с определенным кодом состояния HTTP не сильно отличается от моего подхода (хотя ваш выглядит лучше). Итак, одна вещь, которую я не могу понять: как я могу отправить ответ 202 клиенту, но также поместить обмен в очередь activemq, где прослушивается весь мой маршрут обработки? - person user3238620; 13.06.2018
comment
Как уже упоминалось, мы перенаправляем обмен в очередь Redis, которая выполняется в собственном процессе seda (и, следовательно, в своем собственном потоке), где другой процесс (машина) забирает его для дальнейшей обработки. Тем временем основной поток маршрута продолжается и генерирует выходные данные для клиента. Вы также можете заглянуть в .wireTap, чтобы сделать копию обмена и обработать ее в другом потоке. Обычно это делается для выполнения определенного мониторинга / регистрации данных, содержащихся в обмене. Внутренне прослушка создает начальный поток, а затем порождает новый поток, в котором полезная нагрузка передается в - person Roman Vottner; 13.06.2018
comment
Вместо использования базы данных вы можете подумать об использовании своего собственного типа реестра (для простоты он может быть определен как Singleton), который вы используете в качестве хранилища данных и таким образом управляете состоянием объектов. Просто убедитесь, что это потокобезопасный. - person Roman Vottner; 13.06.2018
comment
WireTap - это компонент, который я искал - большое спасибо. - person user3238620; 13.06.2018