Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions network/banNet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func NewConnection(conn *net.TCPConn, ConnID uint32, handle banIface.IMsgHandle,
isClose: false,
ctx: ctx,
cancel: cancel,
msgChan: make(chan []byte),
msgChan: make(chan []byte, 10), // 高优通道加小缓冲,避免硬阻塞
msgBuffChan: make(chan []byte, config.G.MaxMsgChanLen),
}
c.TCPServer.GetConnMgr().Add(c)
return c
}
func (c *Connection) StartReader() {
fmt.Println("[StartReader]")
defer fmt.Println("[Conn] 完美退出")
fmt.Println("[Reader] commenced")
defer fmt.Println("[Conn] exited gracefully")
defer c.Stop()

for {
Expand Down Expand Up @@ -100,8 +100,8 @@ func (c *Connection) StartReader() {
}

func (c *Connection) StartWriter() {
fmt.Println("[StartWriter]")
defer fmt.Println("[INFO] Writer 已关闭")
fmt.Println("[Writer] commenced")
defer fmt.Println("[Writer] closed")
defer c.Stop()

for {
Expand All @@ -122,14 +122,6 @@ func (c *Connection) StartWriter() {
select {
case <-c.ExitBuffChan:
return
case data, ok := <-c.msgChan:
if !ok {
return
}
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Write err:", err)
return
}
case data, ok := <-c.msgBuffChan:
if !ok {
return
Expand All @@ -144,7 +136,7 @@ func (c *Connection) StartWriter() {
}

func (c *Connection) Start() {
fmt.Println("[Connection] Start Connection", c.ConnID)
fmt.Println("[Connection] established — ID:", c.ConnID)
go c.StartReader()
go c.StartWriter()
c.TCPServer.CallConnStartFunc(c)
Expand All @@ -157,7 +149,7 @@ func (c *Connection) Start() {
}

func (c *Connection) Stop() {
fmt.Println("[Connection] Stop Connection", c.ConnID, "[ZINX]链接正在关闭")
fmt.Println("[Connection] terminated — ID:", c.ConnID)
if c.isClose == true {
return
}
Expand All @@ -171,8 +163,6 @@ func (c *Connection) Stop() {
recover()
}()
close(c.ExitBuffChan)
close(c.msgChan)
close(c.msgBuffChan)
}
func (c *Connection) GetConnID() uint32 {
return c.ConnID
Expand All @@ -186,31 +176,43 @@ func (c *Connection) RemoteAddr() net.Addr {
}

func (c *Connection) SendMsg(msgId uint32, data []byte) error {
dp := NewDataPack()
if c.isClose {
return fmt.Errorf("connection closed")
}

dp := NewDataPack()
msg := NewMessage(msgId, data)
Gdata, err := dp.Pack(msg)
if err != nil {
return err
}
if c.msgChan != nil {
c.msgChan <- Gdata

select {
case c.msgChan <- Gdata:
return nil
case <-c.ctx.Done():
return fmt.Errorf("connection closed")
}
return nil
}

func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
if c.isClose {
return fmt.Errorf("connection closed")
}

dp := NewDataPack()
msg := NewMessage(msgId, data)
Gdata, err := dp.Pack(msg)
if err != nil {
return err
}
if c.msgBuffChan != nil {
c.msgBuffChan <- Gdata
}

return nil
select {
case c.msgBuffChan <- Gdata:
return nil
case <-c.ctx.Done():
return fmt.Errorf("connection closed")
}
}

func (c *Connection) SetProperty(key string, value interface{}) {
Expand Down