From 5bb2da2732af97c3cd9b24107ee8a26b1c1a11f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 27 Sep 2021 01:17:25 +0200 Subject: [PATCH] stream fix listener counts. --- internal/capture/stream.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 3e810a4e..96907dd9 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -100,13 +100,13 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample) manager.mu.Lock() defer manager.mu.Unlock() - if manager.listenersCount == 0 { + manager.listenersCount++ + if manager.listenersCount == 1 { err := manager.createPipeline() if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { return addListener, err } - manager.listenersCount++ manager.logger.Info().Msgf("first listener, starting") } @@ -134,11 +134,15 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") + manager.mu.Lock() + manager.listenersCount-- + manager.mu.Unlock() + go func() { manager.mu.Lock() defer manager.mu.Unlock() - if manager.listenersCount == 1 { + if manager.listenersCount == 0 { manager.destroyPipeline() manager.listenersCount = 0 manager.logger.Info().Msgf("last listener, stopping")