GRPC: сохранить ссылку на поток для отправки данных нескольким клиентам

Я начинаю с GRPC, используя go. Я прочитал официальную документацию и несколько примеров.

В большинстве примеров вы не идентифицируете клиента, а используете поток для чтения / записи данных. Я вижу, что в контексте есть API для получения информации об аутентификации, и я могу идентифицировать клиента для ChatRequest. Но что, если я хочу сохранить ссылку / индекс на Stream на основе идентификатора клиента.

Например,

скажем, у меня есть 3 пользователя в чате. Я представляю rpc как (это также может быть серверная потоковая передача)

rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}

скажем, один пользователь отправляет сообщение группе, которое необходимо отправить другим 2. Итак, если мне нужно отправить его через поток, открытый в данный момент для этих пользователей, насколько безопасно хранить ссылку на поток.

Реализация будет как ...

type chatServiceServer struct {
    // keep a map of subscribers / users currently connected; protect with mutex
}

func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
    // md, ok := metadata.FromIncomingContext(stream.Context())
    // p, ok := peer.FromContext(ctx)
    // ... identify client from above

    for {
        // save the message to DB
        // find other users in the chatroom is currently connected
        // if so, stream.Send(m)
        // else notify ....
    }
}

Но я вижу предупреждения в документации API и думаю, как лучше.

https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

Аналогичный вариант использования возникает с любой подпиской (событие, ....), и необходимо уведомлять на основе идентификатора клиента. Любой пример кода, статья тоже подойдут.

Спасибо


person bsr    schedule 14.01.2020    source источник


Ответы (1)


Сохранение потока для использования в другом месте должно быть безопасным, но вы можете вызывать SendMsg только в одной горутине (такое же ограничение справедливо и для RecvMsg, независимо). Это означает, что если вы сделаете это в своем обработчике метода:

for {
  if err := stream.Recv(req); err != nil {
    return err
  }
  for _, s := range allStreams[req.ID] {
    if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
  }
}

Тогда вызов s.Send должен быть защищен блокировкой, потому что несколько из этих обработчиков могут работать одновременно. (Также предполагается, что allStreams является map[ID][]stream, и в этом случае он также должен быть защищен замком.)

person Doug Fawley    schedule 21.01.2020