Получение предупреждения `` Получено неожиданное уведомление об отправке сообщения '' при использовании метода `SendAsync` на производителе Pulsar

У меня есть тема Pulsar, сообщения по которой я публикую через нескольких продюсеров. Эти производители - просто горутины, и я использую SendAsync метод производителя для отправки сообщений, но продолжаю получать Got unexpected send receipt for message предупреждение. Я изучил код библиотеки клиента (клиент golang для Pulsar) и обнаружил, что это предупреждение может появиться только тогда, когда производитель не зарегистрирован в качестве слушателя на этой карте map[uint64]ConnectionListener. Но как я могу публиковать, если продюсер не зарегистрирован в качестве слушателя?

func benchmarkConcurrentProducers(b *testing.B, pool []pulsar.Producer, concur int, payload []byte) {
    wg := &sync.WaitGroup{}
    wg.Add(concur)

    for i := 0; i < concur; i++ {
        go Produce(pool[i], payload, wg)
    }
    wg.Wait()
}

func BenchmarkProducer(b *testing.B) {
    type config struct {
        Pulsarcfg struct {
            Url string `required: "true"`
        }
    }
    cfg := &config{}
    err := configor.Load(cfg, "../config.yml")
    require.Nil(b, err)

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               cfg.Pulsarcfg.Url,
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    require.Nil(b, err)
    defer client.Close()

    concur := 4

    producerPool := make([]pulsar.Producer, concur)

    for i := 0; i < concur; i++ {
        p, err := New(client, string(i))
        require.Nil(b, err)
        defer p.Close()

        producerPool[i] = p
    }

    path, err := filepath.Abs("../events.json")
    if err != nil {
        log.Fatalf("invalid payload file path, %v", err)
    }
    f, err := ioutil.ReadFile(path)
    if err != nil {
        log.Fatalf("bad payload: %v", err)
    }

    b.Run("benchmark producer", func(b *testing.B) {
        // run the producer b.N times
        for n := 0; n < b.N; n++ {
            benchmarkConcurrentProducers(b, producerPool, concur, f)
        }
    })
}

person Deepak Sah    schedule 10.05.2020    source источник


Ответы (2)


Может быть две причины, по которым производитель не зарегистрирован в качестве слушателя в map[uint64]ConnectionListener. Либо продюсер так и не был создан, либо продюсер закрыли. В моем случае я смог публиковать сообщения, что означает, что производитель действительно был создан, но поскольку я использовал метод SendAsync для отправки сообщений, он не блокировал получение подтверждения. И я использовал defer, чтобы закрыть производителя после создания всех сообщений (я использовал wait group, чтобы убедиться, что все сообщения отправлены перед закрытием производителя), но производитель был закрыт до получения подтверждения и, следовательно, предупреждения.

person Deepak Sah    schedule 10.05.2020
comment
Можете выложить пример кода? Как вы уже сказали ранее, карта прослушивателя соединения отслеживается идентификатором производителя. Можете ли вы отследить идентификатор производителя в журнале? - person Ming L.; 11.05.2020
comment
@MingL. Я добавил код, и да, я смог увидеть идентификатор производителя в журналах. - person Deepak Sah; 12.05.2020

У меня было такое же сообщение об ошибке. В моем случае я создал ридера и продюсера, используя одного и того же клиента. Хотя программа чтения уже была закрыта, когда я создавал новые сообщения, я продолжал получать это предупреждение, и, что еще хуже, оно зависало при вызове producer.Send(). Я решил это, используя отдельных клиентов для чтения и для производства.

person Paulo Quintans    schedule 26.07.2020