Как читать метаданные в gRPC с помощью Java на стороне клиента

Я использую компилятор Java и Protoc 3.0, и мой прототип файла упомянут ниже. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
    // Request an inline subscription for data at the specified path.
    // The device should send telemetry data back on the same
    // connection as the subscription request.
    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}

    // Terminates and removes an exisiting telemetry subscription
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}

    // Get the list of current telemetry subscriptions from the
    // target. This command returns a list of existing subscriptions
    // not including those that are established via configuration.
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}

    // Get Telemetry Agent Operational States
    rpc getTelemetryOperationalState(GetOperationalStateRequest)    returns (GetOperationalStateReply) {}

    // Return the set of data encodings supported by the device for
    // telemetry data
    rpc getDataEncodings(DataEncodingRequest)                       returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
    // Data associated with a telemetry subscription
    SubscriptionInput input                                 = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;

    // The below configuration is not defined in Openconfig RPC.
    // It is a proposed extension to configure additional
    // subscription request features.
    SubscriptionAdditionalConfig additional_config          = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
    // List of optional collector endpoints to send data for
    // this subscription.
    // If no collector destinations are specified, the collector
    // destination is assumed to be the requester on the rpc channel.
    repeated Collector  collector_list                      = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
    // IP address of collector endpoint
    string address                                          = 1;

    // Transport protocol port number for the collector destination.
    uint32 port                                             = 2;
}

// Data model path
message Path {
    // Data model path of interest
    // Path specification for elements of OpenConfig data models
    string path                                             = 1;

    // Regular expression to be used in filtering state leaves
    string filter                                           = 2;

    // If this is set to true, the target device will only send
    // updates to the collector upon a change in data value
    bool suppress_unchanged                                 = 3;

    // Maximum time in ms the target device may go without sending
    // a message to the collector. If this time expires with
    // suppress-unchanged set, the target device must send an update
    // message regardless if the data values have changed.
    uint32 max_silent_interval                              = 4;

    // Time in ms between collection and transmission of the
    // specified data to the collector platform. The target device
    // will sample the corresponding data (e.g,. a counter) and
    // immediately send to the collector destination.
    //
    // If sample-frequency is set to 0, then the network device
    // must emit an update upon every datum change.
    uint32 sample_frequency                                 = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
    // limit the number of records sent in the stream
    int32 limit_records                                     = 1;

    // limit the time the stream remains open
    int32 limit_time_seconds                                = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
//    subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
    // Response message to a telemetry subscription creation or
    // get request.
    SubscriptionResponse response                           = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
    // Unique id for the subscription on the device. This is
    // generated by the device and returned in a subscription
    // request or when listing existing subscriptions
    uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
//    subscription request.
message OpenConfigData {
    // router name:export IP address
    string system_id                                        = 1;

    // line card / RE (slot number)
    uint32 component_id                                     = 2;

    // PFE (if applicable)
    uint32 sub_component_id                                 = 3;

    // Path specification for elements of OpenConfig data models
    string path                                             = 4;

    // Sequence number, monotonically increasing for each
    // system_id, component_id, sub_component_id + path.
    uint64 sequence_number                                  = 5;

    // timestamp (milliseconds since epoch)
    uint64 timestamp                                        = 6;

    // List of key-value pairs
    repeated KeyValue kv                                    = 7;
}

// Simple Key-value, where value could be one of scalar types
message KeyValue {
    // Key
    string key                                              =  1;

    // One of possible values
    oneof value {
        double double_value                                 =  5;
        int64  int_value                                    =  6;
        uint64 uint_value                                   =  7;
        sint64 sint_value                                   =  8;
        bool   bool_value                                   =  9;
        string str_value                                    = 10;
        bytes  bytes_value                                  = 11;
    }
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
    // Return code
    ReturnCode code                                         = 1;

    // Return code string
    string     code_str                                     = 2;
};

// Result of the operation
enum ReturnCode {
    SUCCESS                                                 = 0;
    NO_SUBSCRIPTION_ENTRY                                   = 1;
    UNKNOWN_ERROR                                           = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
    // List of current telemetry subscriptions
    repeated SubscriptionReply subscription_list            = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
    // Per-subscription_id level operational state can be requested.
    //
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers including agent-level
    // operational stats
    // --- or ---
    // If subscription_id is not present then sent only agent-level
    // operational stats
    uint32 subscription_id                                  = 1;

    // Control verbosity of the output
    VerbosityLevel verbosity                                = 2;
}

// Verbosity Level
enum VerbosityLevel {
    DETAIL                                                  = 0;
    TERSE                                                   = 1;
    BRIEF                                                   = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
    // List of key-value pairs where
    //     key      = operational state definition
    //     value    = operational state value
    repeated KeyValue kv                                    = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
    repeated EncodingType  encoding_list                    = 1;
}

// Encoding Type Supported
enum EncodingType {
    UNDEFINED                                               = 0;
    XML                                                     = 1;
    JSON_IETF                                               = 2;
    PROTO3                                                  = 3;
}

Чтобы выполнить вызов службы (rpc TelemetrySubscribe), сначала мне нужно прочитать заголовок с идентификатором подписки, а затем начать чтение сообщений. Теперь, используя Java, я могу подключиться к службе, я ввел перехватчик, но когда я печатаю / получаю заголовок, он равен нулю. Мой код вызова перехватчика ниже,

 ClientInterceptor interceptor = new HeaderClientInterceptor();
      originChannel = OkHttpChannelBuilder.forAddress(host, port)
        .usePlaintext(true)
        .build();
     Channel channel =  ClientInterceptors.intercept(originChannel, interceptor);
      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

Это код перехватчика для чтения метаданных.

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions, Channel next) {
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {

        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {

             Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);

            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

Хотите знать, есть ли другой способ прочитать метаданные или первое сообщение, в котором есть идентификатор подписки? Все, что мне нужно, - прочитать первое сообщение с идентификатором подписки и вернуть тот же идентификатор подписки на сервер, чтобы можно было запустить потоковую передачу. У меня есть эквивалентный код Python с использованием того же прото-файла, и он обменивается данными с сервером по коду, указанному ниже только для справки:

     sub_req = SubscribeRequestMsg("host",port)
     data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
     metadata = data_itr.initial_metadata()

                   if metadata[0][0] == "responseKey":
                    metainfo = metadata[0][1]
                    print metainfo

                    subreply = agent_pb2.SubscriptionReply()
                    subreply.SetInParent()
                    google.protobuf.text_format.Merge(metainfo, subreply)

                    if subreply.response.subscription_id:
                    SUB_ID = subreply.response.subscription_id

Из приведенного выше кода Python я могу легко получить объект метаданных, не знаю, как получить его с помощью Java?

После прочтения метаданных я получаю следующее: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

Но я знаю, что есть еще одна строка из метаданных, которая

response {
  subscription_id: 2
}

Как я могу извлечь последний ответ из заголовка, в котором есть идентификатор подписки. Я пробовал много вариантов, и здесь я потерялся.


person Ammad    schedule 18.04.2017    source источник


Ответы (2)


Вы использовали метод для метаданных запроса, а не для метаданных ответа:

public void start(Listener<RespT> responseListener, Metadata headers) {

Для метаданных ответа вам понадобится ClientCall.Listener и дождитесь обратного вызова onHeaders:

public void onHeaders(Metadata headers)

Мне кажется, что использование упомянутых вами метаданных кажется странным. Метаданные обычно предназначены для дополнительных сведений об ошибках или сквозных функций, не относящихся к методу RPC (например, аутентификации, трассировки и т. Д.).

person Eric Anderson    schedule 18.04.2017
comment
public void start (Listener ‹RespT› responseListener, заголовки метаданных) {ClientCall.Listener ‹RespT› listener = new ClientCall.Listener ‹RespT› () {@Override public void onHeaders (заголовки метаданных) {SubscriptionReply s = System.out.println (Найден заголовок 1 + заголовки); super.onHeaders (заголовки); }}; super.start (слушатель, заголовки); }}; Установлено зависшее соединение, но данные не передаются. - person Ammad; 22.04.2017
comment
@ Аммад, вы никогда не вызываете переданный Listener (responseListener). Расширьте SimpleForwardingClientCallListener и передайте responseListener его конструктору. - person Eric Anderson; 24.04.2017
comment
Мне нужно немного помощи здесь. Куда позвонить Слушателю? Я вижу, что onHeaders () вызывается с некоторыми начальными данными. - person Ammad; 24.04.2017
comment
SimpleForwardingClientCallListener вызывает для вас слушателя, когда вы вызываете super. Итак, super.onHeaders() вызовет responseListener.onHeaders(). Так что просто расширите SimpleForwardingClientCallListener и обязательно вызовите super для каждого метода, который вы переопределяете. - person Eric Anderson; 25.04.2017
comment
public void start (Listener ‹RespT› responseListener, заголовки метаданных) {responseListener = new SimpleForwardingClientCallListener ‹RespT› (responseListener) {Переопределить public void onHeaders (заголовки метаданных) {System.out.println (найденный заголовок + заголовки); super.onHeaders (заголовки); }}; ClientCall.Listener ‹RespT› listener = new ClientCall.Listener ‹RespT› () {@Override public void onHeaders (заголовки метаданных) { - person Ammad; 28.04.2017

Часто использование ClientInterceptor неудобно, потому что вам нужно поддерживать ссылку на него, чтобы вытащить данные обратно. В вашем случае данные на самом деле являются метаданными. Один из способов упростить доступ к метаданным - поместить их в Context.

Например, вы можете создать Context.Key для идентификатора подписки. В своем клиентском перехватчике вы можете извлечь нужный заголовок Metadata и поместить его в Context, используя Context.current().withValue(key, metadata). Внутри вашего StreamObserver вы можете извлечь это This, вызвав key.get(Context.current()). Это предполагает, что вы используете Async API, а не API блокировки.

Причина, по которой это сложнее, заключается в том, что обычно метаданные представляют собой информацию о вызове, но не имеют прямого отношения к самому вызову. Это для таких вещей, как отслеживание, кодирование, статистика, отмена и тому подобное. Если что-то изменит способ обработки запроса, вероятно, потребуется перейти непосредственно в сам запрос, а не быть на стороне.

person Carl Mastrangelo    schedule 18.04.2017
comment
Привет, Карл, я только что подписался на эту услугу. Я не могу положить ключ, чтобы клиент мог его вытащить. Все, что я могу сделать, это получить данные заголовка и прочитать ключ заголовка в качестве первого шага, чтобы можно было запустить Stream. - person Ammad; 20.04.2017
comment
Как я могу создать Context.key для subscription_id? Предполагая, что subscription_id является строкой? - person Ammad; 23.04.2017
comment
Создание нового контекста для каждого вызова на стороне клиента опасно, так как может образовывать бесконечно длинную цепочку контекста с Async и Future API. Документация для ClientInterceptors теперь явно вызывает это. Более безопасный способ на стороне клиента - использовать AbstractStub.withOption и получить конфигурацию через CallOptions.getOption. - person Eric Anderson; 24.04.2017