neko/server/internal/webrtc/manager.go
Hani bc19ab866f
feat(client): add microphone passthrough button to controls toolbar (#620)
* feat(client): add microphone passthrough button to controls toolbar

Add mic toggle button to the bottom controls bar that enables users to
share their local microphone with the remote neko session via WebRTC.

The server already supports microphone capture (capture.microphone.enabled)
but the legacy client had no UI to trigger getUserMedia and send an audio
track to the peer connection.

Changes:
- base.ts: Add enableMicrophone/disableMicrophone methods that call
  getUserMedia and addTrack/removeTrack on the RTCPeerConnection.
  Mic is cleaned up automatically on disconnect.
- controls.vue: Add mic button (fa-microphone/fa-microphone-slash) between
  play/pause and volume controls with tooltip and error handling.
- en-us.ts: Add i18n strings for mic tooltips and error dialog.

* if the error is not io.EOF, log it. Otherwise, it's a normal closure of the track.

* tie microphone to active host and auto-disable on control loss

---------

Co-authored-by: h1n054ur <admin@haniumer.com>
Co-authored-by: Miroslav Šedivý <sedivy.miro@gmail.com>
2026-02-17 17:02:51 +01:00

599 lines
15 KiB
Go

package webrtc
import (
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"github.com/m1k1o/neko/server/internal/config"
"github.com/m1k1o/neko/server/internal/webrtc/cursor"
"github.com/m1k1o/neko/server/internal/webrtc/pionlog"
"github.com/m1k1o/neko/server/pkg/types"
"github.com/m1k1o/neko/server/pkg/types/codec"
"github.com/m1k1o/neko/server/pkg/types/event"
"github.com/m1k1o/neko/server/pkg/types/message"
"github.com/m1k1o/neko/server/pkg/utils"
)
const (
// size of receiving channel used to buffer incoming TCP packets
tcpReadChanBufferSize = 50
// size of buffer used to buffer outgoing TCP packets. Default is 4MB
tcpWriteBufferSizeInBytes = 4 * 1024 * 1024
// the duration without network activity before a Agent is considered disconnected. Default is 5 Seconds
disconnectedTimeout = 4 * time.Second
// the duration without network activity before a Agent is considered failed after disconnected. Default is 25 Seconds
failedTimeout = 6 * time.Second
// how often the ICE Agent sends extra traffic if there is no activity, if media is flowing no traffic will be sent. Default is 2 seconds
keepAliveInterval = 2 * time.Second
// send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
rtcpPLIInterval = 3 * time.Second
)
func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx {
logger := log.With().Str("module", "webrtc").Logger()
configuration := webrtc.Configuration{
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
}
if !config.ICELite {
ICEServers := []webrtc.ICEServer{}
for _, server := range config.ICEServersBackend {
var credential any
if server.Credential != "" {
credential = server.Credential
} else {
credential = false
}
ICEServers = append(ICEServers, webrtc.ICEServer{
URLs: server.URLs,
Username: server.Username,
Credential: credential,
})
}
configuration.ICEServers = ICEServers
}
return &WebRTCManagerCtx{
logger: logger,
config: config,
metrics: newMetricsManager(),
webrtcConfiguration: configuration,
desktop: desktop,
capture: capture,
curImage: cursor.NewImage(logger, desktop),
curPosition: cursor.NewPosition(logger),
}
}
type WebRTCManagerCtx struct {
logger zerolog.Logger
config *config.WebRTC
metrics *metricsManager
peerId int32
desktop types.DesktopManager
capture types.CaptureManager
curImage cursor.Image
curPosition cursor.Position
webrtcConfiguration webrtc.Configuration
tcpMux ice.TCPMux
udpMux ice.UDPMux
camStop, micStop *func()
}
func (manager *WebRTCManagerCtx) Start() {
manager.curImage.Start()
logger := pionlog.New(manager.logger)
// add TCP Mux listener
if manager.config.TCPMux > 0 {
tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: manager.config.TCPMux,
})
if err != nil {
manager.logger.Fatal().Err(err).Msg("unable to setup ice TCP mux")
}
manager.tcpMux = ice.NewTCPMuxDefault(ice.TCPMuxParams{
Listener: tcpListener,
Logger: logger.NewLogger("ice-tcp"),
ReadBufferSize: tcpReadChanBufferSize,
WriteBufferSize: tcpWriteBufferSizeInBytes,
})
}
// add UDP Mux listener
if manager.config.UDPMux > 0 {
var err error
manager.udpMux, err = ice.NewMultiUDPMuxFromPort(manager.config.UDPMux,
ice.UDPMuxFromPortWithLogger(logger.NewLogger("ice-udp")),
)
if err != nil {
manager.logger.Fatal().Err(err).Msg("unable to setup ice UDP mux")
}
}
manager.logger.Info().
Bool("icelite", manager.config.ICELite).
Bool("icetrickle", manager.config.ICETrickle).
Interface("iceservers-frontend", manager.config.ICEServersFrontend).
Interface("iceservers-backend", manager.config.ICEServersBackend).
Str("nat1to1", strings.Join(manager.config.NAT1To1IPs, ",")).
Str("epr", fmt.Sprintf("%d-%d", manager.config.EphemeralMin, manager.config.EphemeralMax)).
Int("tcpmux", manager.config.TCPMux).
Int("udpmux", manager.config.UDPMux).
Msg("webrtc starting")
}
func (manager *WebRTCManagerCtx) Shutdown() error {
manager.logger.Info().Msg("shutdown")
manager.curImage.Shutdown()
manager.curPosition.Shutdown()
return nil
}
func (manager *WebRTCManagerCtx) ICEServers() []types.ICEServer {
return manager.config.ICEServersFrontend
}
func (manager *WebRTCManagerCtx) newPeerConnection(logger zerolog.Logger, codecs []codec.RTPCodec) (*webrtc.PeerConnection, cc.BandwidthEstimator, error) {
// create media engine
engine := &webrtc.MediaEngine{}
for _, codec := range codecs {
if err := codec.Register(engine); err != nil {
return nil, nil, err
}
}
// create setting engine
settings := webrtc.SettingEngine{
LoggerFactory: pionlog.New(logger),
}
settings.DisableMediaEngineCopy(true)
settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetLite(manager.config.ICELite)
// make sure server answer sdp setup as passive, to not force DTLS renegotiation
// otherwise iOS renegotiation fails with: Failed to set SSL role for the transport.
settings.SetAnsweringDTLSRole(webrtc.DTLSRoleServer)
var networkType []webrtc.NetworkType
// udp candidates
if manager.udpMux != nil {
settings.SetICEUDPMux(manager.udpMux)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
} else if manager.config.EphemeralMax != 0 {
_ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
}
// tcp candidates
if manager.tcpMux != nil {
settings.SetICETCPMux(manager.tcpMux)
networkType = append(networkType,
webrtc.NetworkTypeTCP4,
webrtc.NetworkTypeTCP6,
)
}
// enable support for TCP and UDP ICE candidates
settings.SetNetworkTypes(networkType)
// create interceptor registry
registry := &interceptor.Registry{}
// create bandwidth estimator
estimatorChan := make(chan cc.BandwidthEstimator, 1)
if manager.config.Estimator.Enabled {
congestionController, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
gcc.SendSideBWEInitialBitrate(manager.config.Estimator.InitialBitrate),
gcc.SendSideBWEPacer(gcc.NewNoOpPacer()),
)
})
if err != nil {
return nil, nil, err
}
congestionController.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
estimatorChan <- estimator
})
registry.Add(congestionController)
if err = webrtc.ConfigureTWCCHeaderExtensionSender(engine, registry); err != nil {
return nil, nil, err
}
} else {
// no estimator, send nil
estimatorChan <- nil
}
if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil {
return nil, nil, err
}
// create new API
api := webrtc.NewAPI(
webrtc.WithMediaEngine(engine),
webrtc.WithSettingEngine(settings),
webrtc.WithInterceptorRegistry(registry),
)
// create new peer connection
configuration := manager.webrtcConfiguration
connection, err := api.NewPeerConnection(configuration)
return connection, <-estimatorChan, err
}
func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.SessionDescription, types.WebRTCPeer, error) {
id := atomic.AddInt32(&manager.peerId, 1)
// get metrics for session
metrics := manager.metrics.getBySession(session)
metrics.NewConnection()
// add session id to logger context
logger := manager.logger.With().Str("session_id", session.ID()).Int32("peer_id", id).Logger()
logger.Info().Msg("creating webrtc peer")
// all audios must have the same codec
audio := manager.capture.Audio()
audioCodec := audio.Codec()
// all videos must have the same codec
video := manager.capture.Video()
videoCodec := video.Codec()
connection, estimator, err := manager.newPeerConnection(
logger, []codec.RTPCodec{audioCodec, videoCodec})
if err != nil {
return nil, nil, err
}
// asynchronously send local ICE Candidates
if manager.config.ICETrickle {
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
logger.Debug().Msg("all local ice candidates sent")
return
}
session.Send(
event.SIGNAL_CANDIDATE,
message.SignalCandidate{
ICECandidateInit: candidate.ToJSON(),
})
})
}
// audio track
audioTrack, err := NewTrack(logger, audioCodec, connection)
if err != nil {
return nil, nil, err
}
// we disable audio by default manually
audioTrack.SetPaused(true)
// set stream for audio track
_, err = audioTrack.SetStream(audio)
if err != nil {
return nil, nil, err
}
// video track
videoRtcp := make(chan []rtcp.Packet, 1)
videoTrack, err := NewTrack(logger, videoCodec, connection, WithRtcpChan(videoRtcp))
if err != nil {
return nil, nil, err
}
//
// stream for video track will be set later
//
// data channel
dataChannel, err := connection.CreateDataChannel("data", nil)
if err != nil {
return nil, nil, err
}
peer := &WebRTCPeerCtx{
logger: logger,
session: session,
metrics: metrics,
connection: connection,
// bandwidth estimator
estimator: estimator,
estimateTrend: utils.NewTrendDetector(
utils.TrendDetectorParams{
// Probing
//RequiredSamples: 3,
//DownwardTrendThreshold: 0.0,
//CollapseValues: false,
// Non-Probing
RequiredSamples: 8,
DownwardTrendThreshold: -0.5,
CollapseValues: true,
}),
// stream selectors
video: video,
audio: audio,
// tracks & channels
audioTrack: audioTrack,
videoTrack: videoTrack,
dataChannel: dataChannel,
rtcpChannel: videoRtcp,
// config
iceTrickle: manager.config.ICETrickle,
estimatorConfig: manager.config.Estimator,
audioDisabled: true, // we disable audio by default manually
}
connection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
logger := logger.With().
Str("kind", track.Kind().String()).
Str("mime", track.Codec().RTPCodecCapability.MimeType).
Logger()
logger.Info().Msgf("received new remote track")
if !session.Profile().CanShareMedia {
err := receiver.Stop()
logger.Warn().Err(err).Msg("media sharing is disabled for this session")
return
}
// parse codec from remote track
codec, ok := codec.ParseRTC(track.Codec())
if !ok {
err := receiver.Stop()
logger.Warn().Err(err).Msg("remote track with unknown codec")
return
}
var srcManager types.StreamSrcManager
stopped := false
stopFn := func() {
if stopped {
return
}
stopped = true
err := receiver.Stop()
srcManager.Stop()
logger.Err(err).Msg("remote track stopped")
}
if track.Kind() == webrtc.RTPCodecTypeAudio {
// audio -> microphone
srcManager = manager.capture.Microphone()
defer stopFn()
if manager.micStop != nil {
(*manager.micStop)()
}
manager.micStop = &stopFn
} else if track.Kind() == webrtc.RTPCodecTypeVideo {
// video -> webcam
srcManager = manager.capture.Webcam()
defer stopFn()
if manager.camStop != nil {
(*manager.camStop)()
}
manager.camStop = &stopFn
} else {
err := receiver.Stop()
logger.Warn().Err(err).Msg("remote track with unsupported codec type")
return
}
err := srcManager.Start(codec)
if err != nil {
logger.Err(err).Msg("failed to start pipeline")
return
}
ticker := time.NewTicker(rtcpPLIInterval)
defer ticker.Stop()
go func() {
for range ticker.C {
err := connection.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(track.SSRC()),
},
})
if err != nil {
logger.Err(err).Msg("remote track rtcp send err")
}
}
}()
buf := make([]byte, 1400)
for {
i, _, err := track.Read(buf)
if err != nil {
// if the error is not io.EOF, log it. Otherwise, it's a normal closure of the track.
if !errors.Is(err, io.EOF) {
logger.Warn().Err(err).Msg("failed read from remote track")
}
break
}
srcManager.Push(buf[:i])
}
logger.Info().Msg("remote track data finished")
})
connection.OnDataChannel(func(dc *webrtc.DataChannel) {
logger.Info().Interface("data_channel", dc).Msg("got remote data channel")
//
// old implementation created a new data channel on client side
// new implementation creates a new data channel on server side
//
if viper.GetBool("legacy") {
// handle legacy data channel
dc.OnMessage(func(message webrtc.DataChannelMessage) {
if err := manager.handleLegacy(logger, message.Data, session); err != nil {
logger.Err(err).Msg("data handle failed")
}
})
// handle legacy data channel
peer.dataChannel = dc
}
})
var once sync.Once
connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
session.SetWebRTCConnected(peer, true)
case webrtc.PeerConnectionStateDisconnected,
webrtc.PeerConnectionStateFailed:
peer.Destroy()
case webrtc.PeerConnectionStateClosed:
// ensure we only run this once
once.Do(func() {
session.SetWebRTCConnected(peer, false)
//
// TODO: Shutdown peer?
//
audioTrack.Shutdown()
videoTrack.Shutdown()
close(videoRtcp)
})
}
metrics.SetState(state)
})
dataChannel.OnOpen(func() {
manager.curImage.AddListener(peer)
manager.curPosition.AddListener(peer)
// send initial cursor image
cur, img, err := manager.curImage.GetCurrent()
if err == nil {
err := peer.SendCursorImage(cur, img)
if err != nil {
logger.Err(err).Msg("failed to set cursor image")
}
} else {
logger.Err(err).Msg("failed to get cursor image")
}
// send initial cursor position
x, y := manager.desktop.GetCursorPosition()
err = peer.SendCursorPosition(x, y)
if err != nil {
logger.Err(err).Msg("failed to set cursor position")
}
})
dataChannel.OnClose(func() {
manager.curImage.RemoveListener(peer)
manager.curPosition.RemoveListener(peer)
})
dataChannel.OnMessage(func(message webrtc.DataChannelMessage) {
if err := manager.handle(logger, message.Data, dataChannel, session); err != nil {
logger.Err(err).Msg("data handle failed")
}
})
session.SetWebRTCPeer(peer)
offer, err := peer.CreateOffer(false)
if err != nil {
return nil, nil, err
}
// on negotiation needed handler must be registered after creating initial
// offer, otherwise it can fire and intercept sucessful negotiation
connection.OnNegotiationNeeded(func() {
logger.Warn().Msg("negotiation is needed")
if connection.SignalingState() != webrtc.SignalingStateStable {
logger.Warn().Msg("connection isn't stable yet; postponing...")
return
}
offer, err := peer.CreateOffer(false)
if err != nil {
logger.Err(err).Msg("sdp offer failed")
return
}
session.Send(
event.SIGNAL_OFFER,
message.SignalDescription{
SDP: offer.SDP,
})
})
// start metrics collectors
go metrics.rtcpReceiver(videoRtcp)
go metrics.connectionStats(connection)
// start estimator reader
go peer.estimatorReader()
return offer, peer, nil
}
func (manager *WebRTCManagerCtx) SetCursorPosition(x, y int) {
manager.curPosition.Set(x, y)
}