У меня есть тема Pulsar, сообщения по которой я публикую через нескольких продюсеров. Эти производители - просто горутины, и я использую SendAsync
метод производителя для отправки сообщений, но продолжаю получать Got unexpected send receipt for message
предупреждение. Я изучил код библиотеки клиента (клиент golang для Pulsar) и обнаружил, что это предупреждение может появиться только тогда, когда производитель не зарегистрирован в качестве слушателя на этой карте map[uint64]ConnectionListener
. Но как я могу публиковать, если продюсер не зарегистрирован в качестве слушателя?
func benchmarkConcurrentProducers(b *testing.B, pool []pulsar.Producer, concur int, payload []byte) {
wg := &sync.WaitGroup{}
wg.Add(concur)
for i := 0; i < concur; i++ {
go Produce(pool[i], payload, wg)
}
wg.Wait()
}
func BenchmarkProducer(b *testing.B) {
type config struct {
Pulsarcfg struct {
Url string `required: "true"`
}
}
cfg := &config{}
err := configor.Load(cfg, "../config.yml")
require.Nil(b, err)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: cfg.Pulsarcfg.Url,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
require.Nil(b, err)
defer client.Close()
concur := 4
producerPool := make([]pulsar.Producer, concur)
for i := 0; i < concur; i++ {
p, err := New(client, string(i))
require.Nil(b, err)
defer p.Close()
producerPool[i] = p
}
path, err := filepath.Abs("../events.json")
if err != nil {
log.Fatalf("invalid payload file path, %v", err)
}
f, err := ioutil.ReadFile(path)
if err != nil {
log.Fatalf("bad payload: %v", err)
}
b.Run("benchmark producer", func(b *testing.B) {
// run the producer b.N times
for n := 0; n < b.N; n++ {
benchmarkConcurrentProducers(b, producerPool, concur, f)
}
})
}