Синие/зеленые развертывания структурированной потоковой передачи Spark

Мы хотели бы иметь возможность развертывать наши задания Spark таким образом, чтобы не было простоев при обработке данных во время развертывания (в настоящее время окно составляет около 2-3 минут). На мой взгляд, самый простой способ сделать это — смоделировать философию «сине-зеленого развертывания», которая заключается в том, чтобы запустить новую версию задания Spark, дать ему прогреться, а затем закрыть старое задание. Однако со структурированной потоковой передачей и контрольными точками мы не можем этого сделать, потому что новое задание Spark видит, что последний файл контрольной точки уже существует (из старого задания). Я прикрепил образец ошибки ниже. У кого-нибудь есть мысли о потенциальном обходном пути?

Я подумал о том, чтобы скопировать существующий каталог контрольных точек в другой каталог контрольных точек для вновь созданного задания — хотя это должно работать как обходной путь (некоторые данные могут быть повторно обработаны, но наша БД должна дедуплицироваться), это кажется супер хакерским, и я бы предпочел не преследовать.

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: rename destination /user/checkpoint/job/offsets/3472939 already exists
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.validateOverwrite(FSDirRenameOp.java:520)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:364)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:282)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:247)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3677)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:914)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename2(ClientNamenodeProtocolServerSideTranslatorPB.java:587)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1991)
    at org.apache.hadoop.fs.Hdfs.renameInternal(Hdfs.java:335)
    at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:678)
    at org.apache.hadoop.fs.FileContext.rename(FileContext.java:958)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.rename(HDFSMetadataLog.scala:356)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:160)
    ... 20 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): rename destination /user/checkpoint/job/offsets/3472939 already exists

person techalicious    schedule 04.04.2018    source источник


Ответы (1)


Это возможно, но это усложнит ваше приложение. Запуск потоков в целом быстрый, поэтому справедливо предположить, что задержка вызвана инициализацией статических объектов и зависимостей. В этом случае вам понадобятся только SparkContext / SparkSession и никаких потоковых зависимостей, поэтому процесс можно описать так:

  • Запустите новое приложение Spark.
  • Инициализировать пакетно-ориентированные объекты.
  • Передайте сообщение предыдущему приложению, чтобы уйти в отставку.
  • Дождитесь подтверждения.
  • Запускайте потоки.

На очень высоком уровне счастливый путь можно представить так:

введите описание изображения здесь

Поскольку это очень общий шаблон, его можно реализовать по-разному, в зависимости от языка и инфраструктуры:

  • Облегченная очередь сообщений, такая как ØMQ.
  • Передача сообщений через распределенную файловую систему.
  • Размещение приложений в интерактивном контексте (Apache Toree, Apache Livy) и использование внешнего клиента для оркестровки.
person Alper t. Turker    schedule 18.04.2018
comment
Вау, это было невероятно подробно, спасибо. Я попробую и дам вам знать, что произойдет - person techalicious; 19.04.2018