Go 协程池 GoroutinePool


为什么需要协程池
Go 协程池 GoroutinePool  池化技术通过复用来提升性能
定义Job 和 Worker 作为协程池控制的最基本单元
type Job struct {//协程池的最小工作单元 具体业务处理结构体
    Connection net.Conn  //客户端的连接
}
var JobQueue chan Job //队列 用来接收 发送请求
type Worker struct {//执行job 为job的管理者
    WorkerPool chan chan Job
    JobChannel chan Job
    quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {//初始化Worker
    return Worker {
        WorkerPool:workerPool,
        JobChannel:make(chan Job),
        quit:make(chan bool),
    }
}
func (w Worker) Start() {//运行Worker
    go func() {
        for {           
                w.WorkerPool  <- w.JobChannel //将可用的worker放进队列中
                select {
                case job := <- w.JobChannel:               
                        HandleConnection(job.Connection) //接收到具体请求时进行处理
                case <-w.quit:  //接收停止请求
                        return
                }
        }
    } ()
}
func (w Worker) Stop() {//发送停止请求
    go func() {
        w.quit <- true
    }()
}

type Dispatcher struct {//分配worker的结构体 dispatcher
    WorkerPool chan chan Job    //worker的池子 控制worker的数量
    WorkerList []Worker         //worker的切片
}
func NewDispatcher(maxWorkers int) *Dispatcher {//根据传入的值 创建对应数量的channel
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{
        WorkerPool:pool,
    }
}
func (d *Dispatcher) Run() {//根据最大值 创建对应数量的worker
    for i := 0; i < MaxWorkers; i++ {
        worker := NewWorker(d.WorkerPool)
        worker.Start()
        d.WorkerList = append(d.WorkerList, worker)
    }    
    go d.dispatch()//监听工作队列
}
func (d *Dispatcher) dispatch() {
    for {
            select {
            case job := <-JobQueue:
                    go func (job Job) {
                            jobChannel := <-d.WorkerPool
                            jobChannel <- job
                    }(job)
            }
    }
}
func (d *Dispatcher) Stop() {//停止所有的worker
    for _, worker := range d.WorkerList {
        worker.Stop()
    }
}
func main() {
    l, e := net.Listen("tcp",":3207")
    if e != nil {
        fmt.Println(e)
        return
    }    
    dispatcher := routinePool.NewDispatcher(routinePool.MaxWorkers)//创建dispatcher
    dispatcher.Run()    
    routinePool.JobQueue = make(chan routinePool.Job, routinePool.MaxQueue)//初始化工作队列
    defer l.Close()
    defer dispatcher.Stop()
    for {        
        conn, err := l.Accept()//接受客户端的连接
        if err != nil {
            return
        }
        job := routinePool.Job{
            Connection:conn,
        }       
        routinePool.JobQueue <- job //客户端连接放入工作队列
    }
}
客户端请求的处理//解包
func Unpack(buffer []byte, readerChannel chan []byte) []byte {
    length := len(buffer)
    var i int
    for i = 0; i < length; i++ {
        if length < i + DataLen {
            break
        }        
        messageLen := BytesToInt(buffer[i:i+DataLen])//根据长度来获取数据
        if length < i + DataLen + messageLen {
            break
        }
        data := buffer[i+DataLen:i+DataLen+messageLen]
        readerChannel <- data
        i += DataLen + messageLen - 1
    }
    if i == length {
        return make([]byte, 0)
    }
    return buffer[i:]
}
func BytesToInt(b []byte) int {//字节转换成整形
    bytesBuffer := bytes.NewBuffer(b)
    var x int32
    binary.Read(bytesBuffer, binary.BigEndian, &x)
    return int(x)
}

//处理客户端请求
func HandleConnection(conn net.Conn) {
    defer func() {
            fmt.Println(conn.RemoteAddr())
            conn.Close()
    }()
    tempBuffer := make([]byte, 0)
    readerChannel := make(chan []byte, 16)
    //fmt.Println(conn.RemoteAddr())
    go reader(readerChannel)
    buffer := make([]byte, 1024)
    for {
            n, err := conn.Read(buffer)
            if err != nil {
                    return
            }
            tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
    }
}
func reader(readerChannel chan []byte) {
    for {
        select {
        case data := <- readerChannel:
            //fmt.Println(string(data))
            data = data
        }
    }
}
const MaxWorkers = 100000
const MaxQueue = 3000
const DataLen = 4
系统用池化技术来减少消耗提升性能
对象池通过复用对象来减少创建对象 垃圾回收的开销
连接池(数据库连接池、Redis连接池、HTTP连接池)通过复用TCP连接来减少创建和释放连接的时间
线程池通过复用线程提升性能
通过限制waitgroup的大小来限制并发数
//github.com/remeh/sizedwaitgroup