Обновление от 2020.04.29: Micro удалила функцию задания cron в версии 2.5.0. Пример кода больше не будет работать. Но логика остается в силе, вы можете реализовать свою собственную работу cron с помощью etcd.

Это заключительная часть серии «Micro In Action».

Пройдя через темы о том, как создавать и использовать микросервисы с Micro в предыдущих статьях, мы подошли к последней теме: Cron Job.

Распределенная работа Cron - сложная задача

Практически каждая система требует работы cron. Их роль заключается в выполнении некоторой фоновой работы, возможно, один раз в определенное время или несколько раз с регулярным интервалом.

Когда в системе всего один узел, это довольно просто и просто. мы помещаем веб-службу и службу демона в один узел. Служба демона будет работать хорошо, пока работает единственный узел.

Но это редкость в современных системах, поскольку мы должны заботиться о надежности и масштабируемости. Когда номер узла превышает единицу, вы должны столкнуться с двумя проблемами: дублированное выполнение и одноточечный сбой.

Если мы просто добавим больше идентичных узлов без каких-либо изменений, это приведет к дублированию. Конфликт между двумя службами демона приходится решать вручную, что чревато ошибками.

Некоторые «умные» ребята могут изменить своего архитектора, перенеся рабочие функции из службы демона в веб-службу и выставив их в виде веб-API или gRPC API. Затем запустите выполнение через crontab ОС:

Преимущество этого архитектора - устранение дублирования выполнения. Балансировщик нагрузки пересылает запрос задания на один узел, когда задание запускается crontab ОС. Но он все же пострадал от единой точки отказа.

Большинство других альтернативных решений также не могут решить эти две проблемы одновременно. Например:

Отделите один узел демона. Проблема: сбой в одной точке.

Отдельные узлы демона умножения. Проблема: дублированное выполнение.

Итак, каково правильное решение этой дилеммы? Короче говоря, мы сталкиваемся с классической проблемой в распределенной системе: Выборы лидера.

Мы хотим, чтобы наши сервисные узлы не имели состояния и были идентичными (большую часть времени), что упрощает масштабирование системы. Но в некоторых сценариях среди этих узлов должен быть выбран единственный ведущий узел для выполнения критически важной работы.

Нам не нужно реализовывать это самостоятельно, потому что многие системы обнаружения сервисов имеют встроенную функцию выбора лидера, например etcd, zookeeper.

С помощью Leader Election мы могли убедиться, что наше задание cron выполняется на единственном ведущем узле (без дублирования выполнения). А когда ведущий узел выходит из строя, другой узел берет на себя ведущую роль и выполняет последующие задания (без единой точки отказа).

Именно так Micro выполняет задания cron.

Работа Cron в Micro

Micro предоставляет эту функцию в пакете github.com/micro/go-micro/v2/sync.

Как и многие другие функции Micro, эта функция является «скрытой». Ни документа, ни тестового примера, ни примера. Изучите его по исходному коду и используйте на свой страх и риск😂.

Возможно, эта статья пока единственное руководство в Интернете.

Ключевые компоненты

Эта функция состоит из нескольких интерфейсов и структур.

Самый важный компонент - это интерфейс sync.Cron:

// Cron is a distributed scheduler using leader election
// and distributed task runners. It uses the leader and
// task interfaces.
type Cron interface {
   Schedule(task.Schedule, task.Command) error
}

Единственный метод этого интерфейса принимает два параметра.

Первый параметр task.Schedule представляет информацию о расписании задания.

// Schedule represents a time or interval at which a task should run
type Schedule struct {
   // When to start the schedule. Zero time means immediately
   Time time.Time
   // Non zero interval dictates an ongoing schedule
   Interval time.Duration
}

В этой структуре два поля: одно - начальная точка задания, другое - интервал выполнения.

Второй параметр task.Command определяет команду задания. У него есть имя и указатель на функцию.

// Command to be executed
type Command struct {
   Name string
   Func func() error
}

Запланировать работу

С помощью компонентов, упомянутых выше, мы можем легко запланировать работу:

package main

import (
   "time"

   "github.com/micro/go-micro/v2"
   "github.com/micro/go-micro/v2/sync"
   "github.com/micro/go-micro/v2/sync/task"
   log "github.com/micro/go-micro/v2/logger"
)

func main() {
   // New Service
   service := micro.NewService(
      micro.Name("com.foo.cron.example"), // name the client service
   )
   // Initialise service
   service.Init()

   cron := sync.NewCron()
   cron.Schedule(
      task.Schedule{Interval: 10 * time.Second},
      task.Command{Name: "foo", Func: func() error {
         log.Debug("finish command foo")
         return nil
      }},
   )

   if err := service.Run(); err != nil {
      log.Fatal(err)
   }
}

Как обычно, мы создали и инициализировали сервис. Затем создайте Cron интерфейс с sync.NewCron().

После этого мы планируем простое задание с cron.Schedule, которое будет печатать строку журнала каждые десять секунд.

А теперь давайте запустим:

go run main.go
2020-04-03 18:49:59  level=info Starting [service] com.foo.cron.example
2020-04-03 18:49:59  level=info Server [grpc] Listening on [::]:60752
2020-04-03 18:49:59  level=info Registry [mdns] Registering node: com.foo.cron.example-1ae32da9-97e0-4acb-9f5a-aecdd83d6aa6

Вы можете ожидать, что что-то выводится на консоль каждые десять секунд. Но ничего. Ни ошибок, ни журнала. почему?

Как вы знаете, у Micro есть соглашение: все основные компоненты имеют облегченную реализацию по умолчанию. Например, у нас есть реализация mDNS для реестра и реализация HTTP для брокера. Мы могли использовать все функции без каких-либо дополнительных зависимостей, что упрощает локальную разработку.

Но функция Cron Job нарушает это соглашение. Перед использованием sync.Cron необходимо установить etcd. Исходный код sync.NewCron объясняет это:

func NewCron(opts ...Option) Cron {
   ...
   if options.Leader == nil {
      options.Leader = etcd.NewLeader()
   }
   ...
}

Если мы не предоставили параметр Leader (тип leader.Leader), будет создан вариант по умолчанию (etcd.NewLeader). Эта строка кода создает зависимость от etcd. Если вы покопаетесь в коде etcd.NewLeader, вы найдете основную причину:

func NewLeader(opts ...leader.Option) leader.Leader {
   ...
   if len(endpoints) == 0 {
      endpoints = []string{"http://127.0.0.1:2379"}
   }
   ...
}

Поскольку у нас нет etcd для 127.0.0.1:2379 , это задание будет зависать навсегда, без вывода ошибок.

Какой вариант leader.Leader? зачем нам это нужно? Давайте посмотрим на определение leader.Leader:

// Leader provides leadership election
type Leader interface {
   // elect leader
   Elect(id string, opts ...ElectOption) (Elected, error)
   // follow the leader
   Follow() chan string
}

Как упоминалось в предыдущем разделе, для запуска распределенного задания cron нам нужна возможность выбора лидера. leader.Leader служит для этой цели. Внутри реализации sync.Cron будет вызываться метод Leader.Elect для получения фактического лидера.

func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
   id := fmt.Sprintf("%s-%s", s.String(), t.String())

   ...
         e, err := c.opts.Leader.Elect(id)
   ...
}

Примечание. Я думаю, что Leader - пример неправильного наименования, и Elector было бы лучшим выбором.

Поэтому нам нужно предоставить Leader для инициализации Cron. Мы могли бы выбрать etcd в качестве реестра, а затем его можно было бы повторно использовать для выборов лидера:

import (
   ...
   "github.com/micro/go-micro/v2/sync/leader"
   "github.com/micro/go-micro/v2/sync/leader/etcd"
   ...
)

func main() {
   // New Service
   service := micro.NewService(
      micro.Name("com.foo.cron.example"), // name the client service
   )
   // Initialise service
   service.Init()

   // get etcd node list from registry
   etcdList := service.Options().Registry.Options().Addrs
   // build leader
   lead := etcd.NewLeader(leader.Nodes(etcdList...))
   
   cron := sync.NewCron(sync.WithLeader(lead))
   cron.Schedule(
      task.Schedule{Interval: 10 * time.Second},
      task.Command{Name: "foo", Func: func() error {
         log.Info("finish command foo")
         return nil
      }},
   )

   if err := service.Run(); err != nil {
      log.Fatal(err)
   }
}

Затем запустите программу с параметрами реестра:

go run main.go --registry=etcd --registry_address=etcd1.foo.com:2379,etcd2.foo.com:2379,etcd3.foo.com:2379
2020-04-03 18:52:46  level=info Starting [service] com.foo.cron.example
2020-04-03 18:52:46  level=info Server [grpc] Listening on [::]:61094
2020-04-03 18:52:46  level=info Registry [etcd] Registering node: com.foo.cron.example-da16c259-302a-4247-83aa-2d0fc1d3a4e2
2020-04-03 18:52:56  level=info [cron] executing command foo
2020-04-03 18:52:56  level=info finish command foo
...

Наконец, мы запускаем задание cron в Micro.

Попался: функция Cron Job в Micro по умолчанию зависит от etcd.

Давайте загрузим два дополнительных узла (узел B, узел C). На них не выполняются никакие задания, поскольку первый узел (узел A) пока является лидером.

Затем мы выключаем узел A, через несколько секунд руководство будет автоматически передано узлу B или узлу C. Следовательно, задание cron будет запущено на новом ведущем узле.

Это то, чего мы хотим добиться: без единой точки отказа и без дублирования выполнения.

Изменить начальную точку расписания работ

Иногда вам нужно, чтобы задание запускалось в фиксированное время, попробуйте изменить поле Time в task.Schedule:

// start from the next New Year's Day
startPoint, _ := time.Parse("2006-01-01", "2021-01-01")
cron.Schedule(
   task.Schedule{
      Time:     startPoint,
      Interval: 10 * time.Second,
   },
   task.Command{Name: "foo", Func: func() error {
      log.Info("finish command foo")
      return nil
   }},
)

Он будет работать так, как вы ожидаете.

Однако, если вы хотите, чтобы задание началось через минуту после расписания, вы можете попробовать следующее:

cron.Schedule(
   task.Schedule{
      Time:     time.Now().Add(time.Minute),
      Interval: 10 * time.Second,
   },
   task.Command{Name: "foo", Func: func() error {
      log.Info("finish command foo")
      return nil
   }},
)

И вы будете разочарованы, потому что на всех узлах будет выполняться дублирование. Причина кроется в реализации sync.Cron и task.Schedule.

//cron.go
func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
   id := fmt.Sprintf("%s-%s", s.String(), t.String())

   ...
         e, err := c.opts.Leader.Elect(id)
   ...
}
//task.go
func (s Schedule) String() string {
   return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval)
}

Идентификатор выборов зависит от Schedule.String()и Schedule.String() зависит отSchedule.Time.

Когда мы выбираем относительное время в качестве начальной точки, каждый узел будет формировать свой уникальный идентификатор выборов, потому что они, вероятно, будут загружаться с разных временных меток.

Тогда каждый узел становится лидером своей избирательной группы.

Попался: никогда не назначайте относительное время в качестве времени начала работы.

Резюме

Внедрение Cron Job в распределенной системе непросто. Micro представляет собой простое, но элегантное решение. Это решение может охватывать большинство наших вариантов использования, хотя в нем все еще отсутствуют некоторые расширенные функции, такие как выражение crontab.

Будьте осторожны с ошибками, и вы, наконец, их приручите. Этот совет применим как к функции задания cron, так и ко всей платформе Micro.

Итак, я завершаю эту серию статей. Спасибо за прочтение.

Написание этой серии для меня также является процессом обучения. Это убеждает меня в том, что Micro - отличный инструмент, он заслуживает вашего времени и поможет вам создавать распределенные системы чрезвычайно простым способом.

У него есть недостатки, но они не являются неприемлемыми. И с выходом новых версий становится все лучше и лучше. Мои статьи основаны на v2.4.0, я очень рад попробовать новые версии. (Но не спешите использовать последнюю версию в производственной среде. Если вы читали эту серию, вы должны были знать причину.)

В будущем я продолжу писать статьи о Micro, но не сериями. Это будут отдельные статьи, посвященные определенным темам.

Если вы хотите получать уведомления о выходе новой статьи, подпишитесь на меня на Medium / Twitter (@ dche423).

До скорого. Спасибо.

- Дэн Че

Смотрите также: