AWS KCL с MultiLangDaemon: контрольная точка на ПОСЛЕДНЕМУ?

У меня есть потребитель Kinesis, чья работа заключается в отслеживании текущих активных пользователей в системе. Каждую минуту пользователи отправляют контрольный сигнал в поток Kinesis, и эта система просто хранит список всех уникальных идентификаторов GUID пользователей, которые она видела, а также время последнего получения контрольного сигнала от этого GUID. Если сердцебиение не было замечено в течение 2 минут, мы предполагаем, что пользователь больше не активен, и исключаем его из списка текущих активных пользователей. Довольно прямолинейно.

Поскольку эта система предназначена только для активных в настоящее время пользователей, нам не нужно обрабатывать старые сообщения. Если бы мы выключили этого потребителя на 2 часа, а затем снова включили бы, мы бы хотели начать обработку с ПОСЛЕДНЕГО сообщения, а не продолжать с того места, где остановились.

Наконец, это было реализовано как приложение NodeJS в соответствии с примером NodeJS клиента Amazon Kinesis использование MultiLangDaemon для связи с клиентской библиотекой Kinesis.

Я обнаружил, что при нормальном использовании лучший способ всегда возобновлять работу с ПОСЛЕДНЕГО - это никогда не использовать функцию контрольных точек KCL. Например, в нижней части моего processRecords метода у меня есть следующее:

    // We don't checkpoint with kinrta, because if we crash for some reason we
    // want to immediately catch back up to live instead of wasting time
    // processing expired heartbeats
    // processRecordsInput.checkpointer.checkpoint(sequenceNumber,
      // function(err, checkpointedSequenceNumber) {

        completeCallback();

      // }
    // );

Таким образом, всякий раз, когда я убиваю потребителя и перезапускаю его, он просматривает файл *.properties и видит, что initialPositionInStream является ПОСЛЕДНИМ, а затем начинает обработку оттуда.

ОДНАКО

Когда я повторно сегментирую свой поток (разделяю сегменты или объединяю сегменты), я сталкиваюсь с проблемой. Когда я повторно сегментирую, контрольная точка на новом сегменте не установлена ​​на LATEST, а на TRIM_HORIZON. Поскольку я никогда не пересматриваю контрольную точку, это означает, что если мой потребитель будет выключен и перезапущен, мне придется обрабатывать 24 часа данных.

Я могу исправить это вручную, отредактировав таблицу Dynamo, используемую KCL для управления контрольными точками, но это, очевидно, не масштабируемое решение. Я пробовал использовать контрольную точку и передавать строку LATEST вместо порядкового номера, но это вызывает ошибку, что порядковый номер недействителен.

Как я могу сообщить KCL, что при повторном перетекании я хочу установить контрольную точку на ПОСЛЕДНЮЮ на новых шардах?

В качестве хакерского решения я рассматривал просто использование DynamoDB SDK и исправление контрольной точки в методе initialize. Это уродливо, но я думаю, что это сработает (при условии, что Amazon не изменит способ управления таблицами KCL)

Обновлять

В соответствии с описанным хакерским решением я написал следующий небольшой вспомогательный метод:

/**
 * Assumes the current shardId (available in the initialize method's
 * `initializeInput.shardId`) is stored in the global "state" object,
 * accessible via the "state" import
 */

import { Kinesis, DynamoDB } from "aws-sdk";
import state from "../state";
import logger from "./logger";
 
const kinesis = new Kinesis();
const ddb = new DynamoDB.DocumentClient();

const log = logger().getLogger("recordProcessor");
const appName = process.env.APP_NAME;

export default async function (startingCheckpoint: string) { 
    // We can't update any Dynamo tables if we don't know which table to update
    if (!appName) return;

    // Compute the name of the shard JUST BEFORE ours
    // Because Kinesis uses an "exclusive" start ID...
    const shardIdNum = parseInt(state.shardId.split("-")[1]) - 1;
    const startShardId = "shardId-" + ("000000000000" + shardIdNum).substr(-12);

    // Pull data about our current shard
    const kinesisResp = await kinesis.listShards({
        StreamName: process.env.KINESIS_STREAM_NAME,
        MaxResults: 1,
        ExclusiveStartShardId: startShardId
    }).promise();
    const oldestSeqNumber = kinesisResp.Shards[0].SequenceNumberRange.StartingSequenceNumber;

    // Pull data about our current checkpoint
    const dynamoResp = await ddb.get({
        TableName: appName,
        Key: {
            leaseKey: state.shardId
        }
    }).promise();
    const prevCheckpoint = dynamoResp.Item.checkpoint;

    log.debug(`Oldest sequence number in Kinesis shard: ${oldestSeqNumber} vs checkpoint: ${prevCheckpoint}`);

    // Determine if we need to "fix" anything
    if (startingCheckpoint === "TRIM_HORIZON") {

        // If our checkpoint is before the oldest sequence number, reset it to
        // "TRIM_HORIZON" so we pull the oldest sequence number
        if (prevCheckpoint < oldestSeqNumber) {
            log.info("Updating checkpoint to TRIM_HORIZON");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "TRIM_HORIZON"
                }
            }).promise();
        }

    } else if (startingCheckpoint === "LATEST") {

        if (prevCheckpoint !== "LATEST") {
            log.info("Updating checkpoint to LATEST");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "LATEST"
                }
            }).promise();
        }

    } else {
        log.warn("We can't 'fix' checkpoints that aren't TRIM_HORIZON or LATEST");
    }
}

Я проверил, и это правильно и эффективно обновляет таблицу DynamoDB, но не сразу начинает извлекать записи из нового места. Похоже, что KCL считывает контрольную точку один раз перед вызовом метода initialize и никогда не перечитывает ее.

На этом этапе я ищу либо способ сообщить KCL о начале использования новой контрольной точки, либо способ корректно перезапустить потребителя, чтобы он все повторно инициализировал. У меня пока нет ни того, ни другого, но я буду продолжать исследования. Может быть, я найду что-нибудь в документации MultiLangDaemon, что смогу написать в STDOUT ...


person stevendesu    schedule 17.03.2021    source источник


Ответы (1)


После долгих исследований я пришел к выводу, что Amazon не предоставляет возможности запросить плавное завершение работы. Вам просто нужно остановить своего потребителя (process.exit()) и дождаться, пока Docker перезапустит его.

Однако между моим скриптом исправления контрольных точек (который я запускаю в обратном вызове initialize()) и этим методом перезапуска взломанного сбоя теперь у меня есть решение, которое обновляет мои контрольные точки соответствующим образом - так что Kinesis теперь работает для меня намного более плавно. .

person stevendesu    schedule 22.03.2021