diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index e3e454a1..7bf18d41 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -400,6 +400,13 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin }) }) + videoTrack.OnRTCP(func(p rtcp.Packet) { + switch rtcpPacket := p.(type) { + case *rtcp.ReceiverEstimatedMaximumBitrate: // TODO: Deprecated. + manager.metrics.SetReceiverEstimatedMaximumBitrate(session, rtcpPacket.Bitrate) + } + }) + go func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() diff --git a/internal/webrtc/metrics.go b/internal/webrtc/metrics.go index 14c99def..066f011b 100644 --- a/internal/webrtc/metrics.go +++ b/internal/webrtc/metrics.go @@ -21,6 +21,8 @@ type metrics struct { videoIds map[string]prometheus.Gauge videoIdsMu *sync.Mutex + receiverEstimatedMaximumBitrate prometheus.Gauge + iceBytesSent prometheus.Gauge iceBytesReceived prometheus.Gauge sctpBytesSent prometheus.Gauge @@ -92,6 +94,16 @@ func (m *metricsCtx) getBySession(session types.Session) metrics { videoIds: map[string]prometheus.Gauge{}, videoIdsMu: &sync.Mutex{}, + receiverEstimatedMaximumBitrate: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "receiver_estimated_maximum_bitrate", + Namespace: "neko", + Subsystem: "webrtc", + Help: "Receiver Estimated Maximum Bitrate from SCTP.", + ConstLabels: map[string]string{ + "session_id": session.ID(), + }, + }), + iceBytesSent: promauto.NewGauge(prometheus.GaugeOpts{ Name: "ice_bytes_sent", Namespace: "neko", @@ -205,6 +217,12 @@ func (m *metricsCtx) SetVideoID(session types.Session, videoId string) { } } +func (m *metricsCtx) SetReceiverEstimatedMaximumBitrate(session types.Session, bitrate float32) { + met := m.getBySession(session) + + met.receiverEstimatedMaximumBitrate.Set(float64(bitrate)) +} + func (m *metricsCtx) SetIceTransportStats(session types.Session, data webrtc.TransportStats) { met := m.getBySession(session) diff --git a/internal/webrtc/peerstreamtrack.go b/internal/webrtc/peerstreamtrack.go index b77cab2a..2d2fa59b 100644 --- a/internal/webrtc/peerstreamtrack.go +++ b/internal/webrtc/peerstreamtrack.go @@ -5,6 +5,7 @@ import ( "io" "sync" + "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/rs/zerolog" @@ -51,6 +52,9 @@ type PeerStreamTrack struct { stream types.StreamSinkManager streamMu sync.Mutex + + onRtcp func(rtcp.Packet) + onRtcpMu sync.RWMutex } func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error { @@ -90,8 +94,30 @@ func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) go func() { rtcpBuf := make([]byte, 1500) for { - if _, _, err := sender.Read(rtcpBuf); err != nil { - return + n, _, err := sender.Read(rtcpBuf) + if err != nil { + if err == io.EOF || err == io.ErrClosedPipe { + return + } + + peer.logger.Err(err).Msg("RTCP read error") + continue + } + + packets, err := rtcp.Unmarshal(rtcpBuf[:n]) + if err != nil { + peer.logger.Err(err).Msg("RTCP unmarshal error") + continue + } + + peer.onRtcpMu.RLock() + handler := peer.onRtcp + peer.onRtcpMu.RUnlock() + + for _, packet := range packets { + if handler != nil { + go handler(packet) + } } } }() @@ -102,3 +128,10 @@ func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) func (peer *PeerStreamTrack) SetPaused(paused bool) { peer.paused = paused } + +func (peer *PeerStreamTrack) OnRTCP(f func(rtcp.Packet)) { + peer.onRtcpMu.Lock() + defer peer.onRtcpMu.Unlock() + + peer.onRtcp = f +} diff --git a/pkg/types/codec/codecs.go b/pkg/types/codec/codecs.go index cc8b9bd7..d53a5509 100644 --- a/pkg/types/codec/codecs.go +++ b/pkg/types/codec/codecs.go @@ -6,6 +6,18 @@ import ( "github.com/pion/webrtc/v3" ) +var RTCPFeedback = []webrtc.RTCPFeedback{ + {Type: webrtc.TypeRTCPFBTransportCC, Parameter: ""}, + {Type: webrtc.TypeRTCPFBGoogREMB, Parameter: ""}, + + // https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-19 + {Type: webrtc.TypeRTCPFBCCM, Parameter: "fir"}, + + // https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-15 + {Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"}, + {Type: webrtc.TypeRTCPFBNACK, Parameter: ""}, +} + func ParseRTC(codec webrtc.RTPCodecParameters) (RTPCodec, bool) { codecName := strings.Split(codec.RTPCodecCapability.MimeType, "/")[1] return ParseStr(codecName) @@ -61,7 +73,7 @@ func VP8() RTPCodec { ClockRate: 90000, Channels: 0, SDPFmtpLine: "", - RTCPFeedback: []webrtc.RTCPFeedback{}, + RTCPFeedback: RTCPFeedback, }, // https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html // gstreamer1.0-plugins-good @@ -80,7 +92,7 @@ func VP9() RTPCodec { ClockRate: 90000, Channels: 0, SDPFmtpLine: "profile-id=0", - RTCPFeedback: []webrtc.RTCPFeedback{}, + RTCPFeedback: RTCPFeedback, }, // https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html // gstreamer1.0-plugins-good @@ -99,7 +111,7 @@ func H264() RTPCodec { ClockRate: 90000, Channels: 0, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", - RTCPFeedback: []webrtc.RTCPFeedback{}, + RTCPFeedback: RTCPFeedback, }, // https://gstreamer.freedesktop.org/documentation/x264/index.html // gstreamer1.0-plugins-ugly