From e51541fe015f4be87e432a4bdb411108fc0c0495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 1 Nov 2020 20:53:25 +0100 Subject: [PATCH] separate disconnect from destroy. --- internal/session/manager.go | 27 ++++++++++++--------------- internal/session/session.go | 12 +++++++++++- internal/types/session.go | 3 ++- internal/webrtc/webrtc.go | 4 +--- internal/websocket/handler/session.go | 9 ++++----- internal/websocket/manager.go | 24 +++++++++++------------- 6 files changed, 41 insertions(+), 38 deletions(-) diff --git a/internal/session/manager.go b/internal/session/manager.go index 53d5da5a..c8ac91da 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -57,10 +57,7 @@ func (manager *SessionManagerCtx) Destroy(id string) error { session, ok := manager.members[id] if ok { delete(manager.members, id) - err := session.destroy() - - manager.emmiter.Emit("destroy", id) - return err + return session.destroy() } return nil @@ -151,17 +148,6 @@ func (manager *SessionManagerCtx) OnHostCleared(listener func(session types.Sess }) } -func (manager *SessionManagerCtx) OnDestroy(listener func(id string)) { - manager.emmiter.On("destroy", func(payload ...interface{}) { - // Stop streaming, if everyone left - if manager.capture.Streaming() && len(manager.members) == 0 { - manager.capture.StopStream() - } - - listener(payload[0].(string)) - }) -} - func (manager *SessionManagerCtx) OnCreated(listener func(session types.Session)) { manager.emmiter.On("created", func(payload ...interface{}) { // Start streaming, when first joins @@ -178,3 +164,14 @@ func (manager *SessionManagerCtx) OnConnected(listener func(session types.Sessio listener(payload[0].(*SessionCtx)) }) } + +func (manager *SessionManagerCtx) OnDisconnected(listener func(session types.Session)) { + manager.emmiter.On("disconnected", func(payload ...interface{}) { + // Stop streaming, if everyone left + if manager.capture.Streaming() && len(manager.members) == 0 { + manager.capture.StopStream() + } + + listener(payload[0].(*SessionCtx)) + }) +} diff --git a/internal/session/session.go b/internal/session/session.go index 80be221d..efcbd2e2 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -74,6 +74,15 @@ func (session *SessionCtx) SetConnected() { session.manager.emmiter.Emit("connected", session) } +func (session *SessionCtx) SetDisconnected() { + session.connected = false + session.manager.emmiter.Emit("disconnected", session) + session.socket = nil + + // TODO: Refactor. + session.manager.Destroy(session.id) +} + func (session *SessionCtx) Disconnect(reason string) error { if session.socket == nil { return nil @@ -87,7 +96,8 @@ func (session *SessionCtx) Disconnect(reason string) error { return err } - return session.manager.Destroy(session.id) + session.SetDisconnected() + return nil } func (session *SessionCtx) Send(v interface{}) error { diff --git a/internal/types/session.go b/internal/types/session.go index ae9b6b8b..7c5715df 100644 --- a/internal/types/session.go +++ b/internal/types/session.go @@ -15,6 +15,7 @@ type Session interface { SetSocket(socket WebSocket) SetPeer(peer Peer) SetConnected() + SetDisconnected() Disconnect(reason string) error Send(v interface{}) error SignalAnswer(sdp string) error @@ -37,9 +38,9 @@ type SessionManager interface { OnHost(listener func(session Session)) OnHostCleared(listener func(session Session)) - OnDestroy(listener func(id string)) OnCreated(listener func(session Session)) OnConnected(listener func(session Session)) + OnDisconnected(listener func(session Session)) // auth Authenticate(r *http.Request) (Session, error) diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index b5c11a4a..3b20ebea 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -156,9 +156,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (string, bool case webrtc.PeerConnectionStateDisconnected: case webrtc.PeerConnectionStateFailed: manager.logger.Info().Str("id", session.ID()).Msg("peer disconnected") - if err:= session.Disconnect("peer connection state failed"); err != nil { - manager.logger.Warn().Err(err).Msg("error while disconnecting session") - } + session.SetDisconnected() case webrtc.PeerConnectionStateConnected: manager.logger.Info().Str("id", session.ID()).Msg("peer connected") session.SetConnected() diff --git a/internal/websocket/handler/session.go b/internal/websocket/handler/session.go index 923bbad6..4f788455 100644 --- a/internal/websocket/handler/session.go +++ b/internal/websocket/handler/session.go @@ -81,14 +81,13 @@ func (h *MessageHandlerCtx) SessionConnected(session types.Session) error { return nil } -func (h *MessageHandlerCtx) SessionDestroyed(id string) error { +func (h *MessageHandlerCtx) SessionDisconnected(session types.Session) error { // clear host if exists - host := h.sessions.GetHost() - if host != nil && host.ID() == id { + if session.IsHost() { h.sessions.ClearHost() if err := h.sessions.Broadcast(message.Control{ Event: event.CONTROL_RELEASE, - ID: id, + ID: session.ID(), }, nil); err != nil { h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.CONTROL_RELEASE) } @@ -98,7 +97,7 @@ func (h *MessageHandlerCtx) SessionDestroyed(id string) error { if err := h.sessions.Broadcast( message.MemberDisconnected{ Event: event.MEMBER_DISCONNECTED, - ID: id, + ID: session.ID(), }, nil); err != nil { h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.MEMBER_DISCONNECTED) return err diff --git a/internal/websocket/manager.go b/internal/websocket/manager.go index 1d5ba136..f8884ea6 100644 --- a/internal/websocket/manager.go +++ b/internal/websocket/manager.go @@ -51,7 +51,7 @@ type WebSocketManagerCtx struct { func (ws *WebSocketManagerCtx) Start() { ws.sessions.OnCreated(func(session types.Session) { if err := ws.handler.SessionCreated(session); err != nil { - ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error") + ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with an error") } else { ws.logger.Debug().Str("id", session.ID()).Msg("session created") } @@ -59,17 +59,17 @@ func (ws *WebSocketManagerCtx) Start() { ws.sessions.OnConnected(func(session types.Session) { if err := ws.handler.SessionConnected(session); err != nil { - ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error") + ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with an error") } else { ws.logger.Debug().Str("id", session.ID()).Msg("session connected") } }) - ws.sessions.OnDestroy(func(id string) { - if err := ws.handler.SessionDestroyed(id); err != nil { - ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error") + ws.sessions.OnDisconnected(func(session types.Session) { + if err := ws.handler.SessionDisconnected(session); err != nil { + ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session disconnected with an error") } else { - ws.logger.Debug().Str("id", id).Msg("session destroyed") + ws.logger.Debug().Str("id", session.ID()).Msg("session disconnected") } }) @@ -184,11 +184,11 @@ func (ws *WebSocketManagerCtx) Upgrade(w http.ResponseWriter, r *http.Request) e Msg("session ended") }() - ws.handle(connection, session.ID()) + ws.handle(connection, session) return nil } -func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) { +func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, session types.Session) { bytes := make(chan []byte) cancel := make(chan struct{}) ticker := time.NewTicker(pingPeriod) @@ -197,9 +197,7 @@ func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) { defer func() { ticker.Stop() ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending") - if err := ws.handler.Disconnected(id); err != nil { - ws.logger.Warn().Err(err).Msg("socket disconnected with error") - } + session.SetDisconnected() }() for { @@ -223,12 +221,12 @@ func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) { select { case raw := <-bytes: ws.logger.Debug(). - Str("session", id). + Str("session", session.ID()). Str("address", connection.RemoteAddr().String()). Str("raw", string(raw)). Msg("received message from client") - if err := ws.handler.Message(id, raw); err != nil { + if err := ws.handler.Message(session.ID(), raw); err != nil { ws.logger.Error().Err(err).Msg("message handler has failed") } case <-cancel: