upd(websocket): clean up main loop, expect encoded payloads
This commit is contained in:
parent
0b8b11555a
commit
463afa77d0
@ -20,9 +20,8 @@ type (
|
||||
|
||||
subs *Subscriptions
|
||||
|
||||
send chan interface{}
|
||||
stop chan interface{}
|
||||
detach chan string
|
||||
send chan []byte
|
||||
stop chan []byte
|
||||
|
||||
remoteAddr string
|
||||
|
||||
@ -36,9 +35,8 @@ func (Session) New(ctx context.Context, conn *websocket.Conn) *Session {
|
||||
ctx: ctx,
|
||||
config: config,
|
||||
subs: Subscriptions{}.New(),
|
||||
send: make(chan interface{}, 512),
|
||||
stop: make(chan interface{}, 1),
|
||||
detach: make(chan string, 64),
|
||||
send: make(chan []byte, 512),
|
||||
stop: make(chan []byte, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,7 +74,10 @@ func (sess *Session) readLoop() error {
|
||||
|
||||
if err := sess.dispatch(raw); err != nil {
|
||||
// @todo: log error?
|
||||
sess.send <- outgoing.NewError(err)
|
||||
sess.send <- func() []byte {
|
||||
b, _ := outgoing.NewError(err).EncodeMessage()
|
||||
return b
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -89,18 +90,17 @@ func (sess *Session) writeLoop() error {
|
||||
sess.Close() // break readLoop
|
||||
}()
|
||||
|
||||
write := func(mt int, msg interface{}) error {
|
||||
write := func(msg []byte) error {
|
||||
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout))
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case outgoing.MessageEncoder:
|
||||
return sess.conn.WriteJSON(msg)
|
||||
case []byte:
|
||||
return sess.conn.WriteMessage(mt, msg)
|
||||
default:
|
||||
return sess.conn.WriteMessage(mt, nil)
|
||||
if msg != nil {
|
||||
return sess.conn.WriteMessage(websocket.TextMessage, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ping := func() error {
|
||||
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout))
|
||||
return sess.conn.WriteMessage(websocket.PingMessage, nil)
|
||||
}
|
||||
|
||||
for {
|
||||
@ -111,21 +111,17 @@ func (sess *Session) writeLoop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := write(websocket.TextMessage, msg); err != nil {
|
||||
if err := write(msg); err != nil {
|
||||
return errors.Wrap(err, "writeLoop send")
|
||||
}
|
||||
|
||||
case msg := <-sess.stop:
|
||||
// Shutdown requested, don't care if the message is delivered
|
||||
if msg != nil {
|
||||
write(websocket.TextMessage, msg)
|
||||
}
|
||||
write(msg)
|
||||
return nil
|
||||
|
||||
case topic := <-sess.detach:
|
||||
sess.subs.Delete(topic)
|
||||
|
||||
case <-ticker.C:
|
||||
if err := write(websocket.PingMessage, nil); err != nil {
|
||||
if err := ping(); err != nil {
|
||||
return errors.Wrap(err, "writeLoop ping")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user