+-
                                
                                    
                                
                                
                                    
                                
                                
                                    
                                         首页 专栏 golang 文章详情
                                        
                                        
                                        
                                        首页 专栏 golang 文章详情   
 
  
   
     
   
    
       
      ioly 发布于 52 分钟前
 
     ioly 发布于 52 分钟前 
     
      
     
      
     
      
     
     
      
       
      
     
     
       
     
   
   
    
     
       
       关注作者
  
       
        
      
      
       
      
       关注作者 
     
    
   
   
    
     
    
     
      
       
 
       
         
         
        
        
      
      
     
    
   
  
  
   
    
     
      
        
        关注作者
  
        
         
       
       
        
       
        关注作者 
      
     
    
    
     
      
     
    
    
     
      
       
      
       
        
       
       
        
       
       
        
       
      
     
    
    
     
    
    
     
      
       
        
         
         
        
       
      
     
    
   
  
 
                                            
                                        
                                        
                                    
                                
                            
                        
 
    0 
     
     
     
      
     
   
 
  手撸golang GO与微服务 ChatServer之4 内存泄漏
缘起
最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
案例需求(聊天服务器)
用户可以连接到服务器。 用户可以设定自己的用户名。 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。目标(Day 4)
诊断并修复内存泄漏诊断
在day 3的代码基础上, 使用go tool pprof查看heap日志$ go tool pprof ~/chat_server_mem.profile 
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:35am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 9495.99kB, 100% of 9495.99kB total
Showing top 10 nodes out of 12
      flat  flat%   sum%        cum   cum%
 7287.48kB 76.74% 76.74%  7287.48kB 76.74%  time.startTimer
 1184.27kB 12.47% 89.21%  1184.27kB 12.47%  runtime/pprof.StartCPUProfile
  512.19kB  5.39% 94.61%   512.19kB  5.39%  runtime.malg
  512.05kB  5.39%   100%  7799.53kB 82.13%  learning/gooop/chat_server.(*tChatClient).beginWrite
         0     0%   100%  1184.27kB 12.47%  command-line-arguments.Test_ChatServer
         0     0%   100%   512.19kB  5.39%  runtime.mstart
         0     0%   100%   512.19kB  5.39%  runtime.newproc.func1
         0     0%   100%   512.19kB  5.39%  runtime.newproc1
         0     0%   100%   512.19kB  5.39%  runtime.systemstack
         0     0%   100%  1184.27kB 12.47%  testing.tRunner
(pprof) 因此可以怀疑:
(*tChatClient).beginWrite是内存泄漏的根本点 主要泄漏原因是调用了太多次time.startTimer复查代码
复查tChatClient.beginWrite的代码, 导致不断分配内存的点可能有两个:
Logging.Logf, 不断追加日志. 解决方法: 改造Logging, 限制最多日志条数(使用容量有限的队列) for循环中不断调用time.After, 导致大量创建timer. 解决方法: 不使用time.After, 而使用独立的routine和timer检测读超时func (me *tChatClient) beginWrite() {
    Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return
        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                me.closeConn()
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }
            break
        case <- time.After(time.Duration(5) * time.Second):
            me.postRecvTimeout()
            break
        }
    }
}改造Logging
主要是将日志数组改造为容量有上限的日志队列, 防止诊断日志的采集, 导致内存无限增长.
package chat_server
import (
    "fmt"
    "sync"
)
type ILoggingService interface {
    Logf(f string, args... interface{})
    AllLogs() []string
}
type tLoggingService struct {
    mutex *sync.Mutex
    logs []string
    capacity int
    rindex int
    windex int
}
var gMaxLogs = 10_000
var gEmptyString = ""
func newLoggingService() ILoggingService {
    return &tLoggingService{
        mutex: new(sync.Mutex),
        logs: make([]string, gMaxLogs*2),
        //logs: make([]string, 0),
        capacity: gMaxLogs,
        rindex: 0,
        windex: 0,
    }
}
func (me *tLoggingService) size() int {
    return me.windex - me.rindex
}
func (me *tLoggingService) Logf(f string, args... interface{}) {
    log := fmt.Sprintf(f, args...)
    me.mutex.Lock()
    //me.logs = append(me.logs, log)
    me.ensureSpace()
    me.logs[me.windex] = log
    me.windex++
    me.mutex.Unlock()
    fmt.Println(log)
}
func (me *tLoggingService) ensureSpace() {
    for me.size() >= me.capacity {
        // dequeue head items
        me.logs[me.rindex] = gEmptyString
        me.rindex++
    }
    if me.rindex >= me.capacity {
        // move data to offset 0
        for i,n := 0, me.size();i < n;i++ {
            me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString
        }
        // reset read and write index
        me.windex, me.rindex = me.windex - me.rindex, 0
    }
}
func (me *tLoggingService) AllLogs() []string {
    return me.logs
}
var Logging = newLoggingService()改造tChatClient
去掉写循环中, time.After的调用 使用专门的routine和读计数器, 检测读超时的状况func (me *tChatClient) open(){
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return
    }
    go me.beginWrite()
    go me.beginRead()
    
    // 读超时检测
    go me.beginWatchRecvTimeout()    
}
func (me *tChatClient) beginWatchRecvTimeout() {
    duration := time.Duration(5)
    for range time.Tick(duration * time.Second) {
        if me.isClosed() {
            break
        }
        me.timeoutCounter++
        if me.timeoutCounter >= 3 {
            me.postRecvTimeout()
        }
    }
}
func (me *tChatClient) beginWrite() {
    Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return
        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                me.closeConn()
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }
            break
        
        //case <- time.After(time.Duration(5) * time.Second):
        //    me.postRecvTimeout()
        //    break
        }
    }
}
func (me *tChatClient) beginRead() {
    reader := bufio.NewReader(me.conn)
    for {
        line, err := reader.ReadString('\n')
        if err != nil {
            Logging.Logf("tChatClient.beginRead, read error, %v, serverFlag=%v", me.name, me.serverFlag)
            me.closeConn()
            break
        }
        // 重置读超时计数
        me.timeoutCounter = 0
        ok, msg := MsgDecoder.Decode(line)
        if ok {
            fn := me.recvHandler
            if fn != nil {
                fn(me, msg)
            }
            me.recvLogs = append(me.recvLogs, msg)
        }
    }
}复测
重跑测试, 查pprof, 现在内存清爽多了, 已经看不到业务代码导致的泄漏点, 修复有效$ go tool pprof ~/chat_server_mem.profile 
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:55am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 2.66MB, 100% of 2.66MB total
      flat  flat%   sum%        cum   cum%
    1.50MB 56.47% 56.47%     1.50MB 56.47%  runtime.malg
    1.16MB 43.53%   100%     1.16MB 43.53%  runtime/pprof.StartCPUProfile
         0     0%   100%     1.16MB 43.53%  command-line-arguments.Test_ChatServer
         0     0%   100%     1.50MB 56.47%  runtime.mstart
         0     0%   100%     1.50MB 56.47%  runtime.newproc.func1
         0     0%   100%     1.50MB 56.47%  runtime.newproc1
         0     0%   100%     1.50MB 56.47%  runtime.systemstack
         0     0%   100%     1.16MB 43.53%  testing.tRunner
(pprof) 
(end)
 golang 内存泄漏 pprof 
     
 
     阅读 19  发布于 52 分钟前 
     
 
      
      赞 
      收藏 
      
 
     
       分享 
      
 
      本作品系原创, 采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议 
    
 
   ioly
乐见无知 成长无尽
 
       12  声望 
      
 
       
       6  粉丝 
      
 
     
      0  条评论 
    
 
     得票  时间 
    
 
    
         提交评论 
       
 
      ioly
乐见无知 成长无尽
 
        12  声望 
       
 
        
        6  粉丝 
       
 
      宣传栏
目录
 ▲ 
 
缘起
最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
案例需求(聊天服务器)
用户可以连接到服务器。 用户可以设定自己的用户名。 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。目标(Day 4)
诊断并修复内存泄漏诊断
在day 3的代码基础上, 使用go tool pprof查看heap日志$ go tool pprof ~/chat_server_mem.profile 
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:35am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 9495.99kB, 100% of 9495.99kB total
Showing top 10 nodes out of 12
      flat  flat%   sum%        cum   cum%
 7287.48kB 76.74% 76.74%  7287.48kB 76.74%  time.startTimer
 1184.27kB 12.47% 89.21%  1184.27kB 12.47%  runtime/pprof.StartCPUProfile
  512.19kB  5.39% 94.61%   512.19kB  5.39%  runtime.malg
  512.05kB  5.39%   100%  7799.53kB 82.13%  learning/gooop/chat_server.(*tChatClient).beginWrite
         0     0%   100%  1184.27kB 12.47%  command-line-arguments.Test_ChatServer
         0     0%   100%   512.19kB  5.39%  runtime.mstart
         0     0%   100%   512.19kB  5.39%  runtime.newproc.func1
         0     0%   100%   512.19kB  5.39%  runtime.newproc1
         0     0%   100%   512.19kB  5.39%  runtime.systemstack
         0     0%   100%  1184.27kB 12.47%  testing.tRunner
(pprof) 因此可以怀疑:
(*tChatClient).beginWrite是内存泄漏的根本点 主要泄漏原因是调用了太多次time.startTimer复查代码
复查tChatClient.beginWrite的代码, 导致不断分配内存的点可能有两个:
Logging.Logf, 不断追加日志. 解决方法: 改造Logging, 限制最多日志条数(使用容量有限的队列) for循环中不断调用time.After, 导致大量创建timer. 解决方法: 不使用time.After, 而使用独立的routine和timer检测读超时func (me *tChatClient) beginWrite() {
    Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return
        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                me.closeConn()
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }
            break
        case <- time.After(time.Duration(5) * time.Second):
            me.postRecvTimeout()
            break
        }
    }
}改造Logging
主要是将日志数组改造为容量有上限的日志队列, 防止诊断日志的采集, 导致内存无限增长.
package chat_server
import (
    "fmt"
    "sync"
)
type ILoggingService interface {
    Logf(f string, args... interface{})
    AllLogs() []string
}
type tLoggingService struct {
    mutex *sync.Mutex
    logs []string
    capacity int
    rindex int
    windex int
}
var gMaxLogs = 10_000
var gEmptyString = ""
func newLoggingService() ILoggingService {
    return &tLoggingService{
        mutex: new(sync.Mutex),
        logs: make([]string, gMaxLogs*2),
        //logs: make([]string, 0),
        capacity: gMaxLogs,
        rindex: 0,
        windex: 0,
    }
}
func (me *tLoggingService) size() int {
    return me.windex - me.rindex
}
func (me *tLoggingService) Logf(f string, args... interface{}) {
    log := fmt.Sprintf(f, args...)
    me.mutex.Lock()
    //me.logs = append(me.logs, log)
    me.ensureSpace()
    me.logs[me.windex] = log
    me.windex++
    me.mutex.Unlock()
    fmt.Println(log)
}
func (me *tLoggingService) ensureSpace() {
    for me.size() >= me.capacity {
        // dequeue head items
        me.logs[me.rindex] = gEmptyString
        me.rindex++
    }
    if me.rindex >= me.capacity {
        // move data to offset 0
        for i,n := 0, me.size();i < n;i++ {
            me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString
        }
        // reset read and write index
        me.windex, me.rindex = me.windex - me.rindex, 0
    }
}
func (me *tLoggingService) AllLogs() []string {
    return me.logs
}
var Logging = newLoggingService()改造tChatClient
去掉写循环中, time.After的调用 使用专门的routine和读计数器, 检测读超时的状况func (me *tChatClient) open(){
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return
    }
    go me.beginWrite()
    go me.beginRead()
    
    // 读超时检测
    go me.beginWatchRecvTimeout()    
}
func (me *tChatClient) beginWatchRecvTimeout() {
    duration := time.Duration(5)
    for range time.Tick(duration * time.Second) {
        if me.isClosed() {
            break
        }
        me.timeoutCounter++
        if me.timeoutCounter >= 3 {
            me.postRecvTimeout()
        }
    }
}
func (me *tChatClient) beginWrite() {
    Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return
        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                me.closeConn()
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }
            break
        
        //case <- time.After(time.Duration(5) * time.Second):
        //    me.postRecvTimeout()
        //    break
        }
    }
}
func (me *tChatClient) beginRead() {
    reader := bufio.NewReader(me.conn)
    for {
        line, err := reader.ReadString('\n')
        if err != nil {
            Logging.Logf("tChatClient.beginRead, read error, %v, serverFlag=%v", me.name, me.serverFlag)
            me.closeConn()
            break
        }
        // 重置读超时计数
        me.timeoutCounter = 0
        ok, msg := MsgDecoder.Decode(line)
        if ok {
            fn := me.recvHandler
            if fn != nil {
                fn(me, msg)
            }
            me.recvLogs = append(me.recvLogs, msg)
        }
    }
}复测
重跑测试, 查pprof, 现在内存清爽多了, 已经看不到业务代码导致的泄漏点, 修复有效$ go tool pprof ~/chat_server_mem.profile 
File: chat_server.test
Type: inuse_space
Time: Mar 10, 2021 at 7:55am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 2.66MB, 100% of 2.66MB total
      flat  flat%   sum%        cum   cum%
    1.50MB 56.47% 56.47%     1.50MB 56.47%  runtime.malg
    1.16MB 43.53%   100%     1.16MB 43.53%  runtime/pprof.StartCPUProfile
         0     0%   100%     1.16MB 43.53%  command-line-arguments.Test_ChatServer
         0     0%   100%     1.50MB 56.47%  runtime.mstart
         0     0%   100%     1.50MB 56.47%  runtime.newproc.func1
         0     0%   100%     1.50MB 56.47%  runtime.newproc1
         0     0%   100%     1.50MB 56.47%  runtime.systemstack
         0     0%   100%     1.16MB 43.53%  testing.tRunner
(pprof) 
(end)
 
                