У меня есть потребитель Kinesis, чья работа заключается в отслеживании текущих активных пользователей в системе. Каждую минуту пользователи отправляют контрольный сигнал в поток Kinesis, и эта система просто хранит список всех уникальных идентификаторов GUID пользователей, которые она видела, а также время последнего получения контрольного сигнала от этого GUID. Если сердцебиение не было замечено в течение 2 минут, мы предполагаем, что пользователь больше не активен, и исключаем его из списка текущих активных пользователей. Довольно прямолинейно.
Поскольку эта система предназначена только для активных в настоящее время пользователей, нам не нужно обрабатывать старые сообщения. Если бы мы выключили этого потребителя на 2 часа, а затем снова включили бы, мы бы хотели начать обработку с ПОСЛЕДНЕГО сообщения, а не продолжать с того места, где остановились.
Наконец, это было реализовано как приложение NodeJS в соответствии с примером NodeJS клиента Amazon Kinesis использование MultiLangDaemon для связи с клиентской библиотекой Kinesis.
Я обнаружил, что при нормальном использовании лучший способ всегда возобновлять работу с ПОСЛЕДНЕГО - это никогда не использовать функцию контрольных точек KCL. Например, в нижней части моего processRecords
метода у меня есть следующее:
// We don't checkpoint with kinrta, because if we crash for some reason we
// want to immediately catch back up to live instead of wasting time
// processing expired heartbeats
// processRecordsInput.checkpointer.checkpoint(sequenceNumber,
// function(err, checkpointedSequenceNumber) {
completeCallback();
// }
// );
Таким образом, всякий раз, когда я убиваю потребителя и перезапускаю его, он просматривает файл *.properties
и видит, что initialPositionInStream является ПОСЛЕДНИМ, а затем начинает обработку оттуда.
ОДНАКО
Когда я повторно сегментирую свой поток (разделяю сегменты или объединяю сегменты), я сталкиваюсь с проблемой. Когда я повторно сегментирую, контрольная точка на новом сегменте не установлена на LATEST, а на TRIM_HORIZON. Поскольку я никогда не пересматриваю контрольную точку, это означает, что если мой потребитель будет выключен и перезапущен, мне придется обрабатывать 24 часа данных.
Я могу исправить это вручную, отредактировав таблицу Dynamo, используемую KCL для управления контрольными точками, но это, очевидно, не масштабируемое решение. Я пробовал использовать контрольную точку и передавать строку LATEST вместо порядкового номера, но это вызывает ошибку, что порядковый номер недействителен.
Как я могу сообщить KCL, что при повторном перетекании я хочу установить контрольную точку на ПОСЛЕДНЮЮ на новых шардах?
В качестве хакерского решения я рассматривал просто использование DynamoDB SDK и исправление контрольной точки в методе initialize
. Это уродливо, но я думаю, что это сработает (при условии, что Amazon не изменит способ управления таблицами KCL)
Обновлять
В соответствии с описанным хакерским решением я написал следующий небольшой вспомогательный метод:
/**
* Assumes the current shardId (available in the initialize method's
* `initializeInput.shardId`) is stored in the global "state" object,
* accessible via the "state" import
*/
import { Kinesis, DynamoDB } from "aws-sdk";
import state from "../state";
import logger from "./logger";
const kinesis = new Kinesis();
const ddb = new DynamoDB.DocumentClient();
const log = logger().getLogger("recordProcessor");
const appName = process.env.APP_NAME;
export default async function (startingCheckpoint: string) {
// We can't update any Dynamo tables if we don't know which table to update
if (!appName) return;
// Compute the name of the shard JUST BEFORE ours
// Because Kinesis uses an "exclusive" start ID...
const shardIdNum = parseInt(state.shardId.split("-")[1]) - 1;
const startShardId = "shardId-" + ("000000000000" + shardIdNum).substr(-12);
// Pull data about our current shard
const kinesisResp = await kinesis.listShards({
StreamName: process.env.KINESIS_STREAM_NAME,
MaxResults: 1,
ExclusiveStartShardId: startShardId
}).promise();
const oldestSeqNumber = kinesisResp.Shards[0].SequenceNumberRange.StartingSequenceNumber;
// Pull data about our current checkpoint
const dynamoResp = await ddb.get({
TableName: appName,
Key: {
leaseKey: state.shardId
}
}).promise();
const prevCheckpoint = dynamoResp.Item.checkpoint;
log.debug(`Oldest sequence number in Kinesis shard: ${oldestSeqNumber} vs checkpoint: ${prevCheckpoint}`);
// Determine if we need to "fix" anything
if (startingCheckpoint === "TRIM_HORIZON") {
// If our checkpoint is before the oldest sequence number, reset it to
// "TRIM_HORIZON" so we pull the oldest sequence number
if (prevCheckpoint < oldestSeqNumber) {
log.info("Updating checkpoint to TRIM_HORIZON");
await ddb.update({
TableName: appName,
Key: {
leaseKey: state.shardId
},
UpdateExpression: "SET #checkpoint = :value",
ExpressionAttributeNames: {
"#checkpoint": "checkpoint"
},
ExpressionAttributeValues: {
":value": "TRIM_HORIZON"
}
}).promise();
}
} else if (startingCheckpoint === "LATEST") {
if (prevCheckpoint !== "LATEST") {
log.info("Updating checkpoint to LATEST");
await ddb.update({
TableName: appName,
Key: {
leaseKey: state.shardId
},
UpdateExpression: "SET #checkpoint = :value",
ExpressionAttributeNames: {
"#checkpoint": "checkpoint"
},
ExpressionAttributeValues: {
":value": "LATEST"
}
}).promise();
}
} else {
log.warn("We can't 'fix' checkpoints that aren't TRIM_HORIZON or LATEST");
}
}
Я проверил, и это правильно и эффективно обновляет таблицу DynamoDB, но не сразу начинает извлекать записи из нового места. Похоже, что KCL считывает контрольную точку один раз перед вызовом метода initialize
и никогда не перечитывает ее.
На этом этапе я ищу либо способ сообщить KCL о начале использования новой контрольной точки, либо способ корректно перезапустить потребителя, чтобы он все повторно инициализировал. У меня пока нет ни того, ни другого, но я буду продолжать исследования. Может быть, я найду что-нибудь в документации MultiLangDaemon, что смогу написать в STDOUT ...