Go 协程池 GoroutinePool
为什么需要协程池
Go 协程池 GoroutinePool 池化技术通过复用来提升性能
定义Job 和 Worker 作为协程池控制的最基本单元
type Job struct {//协程池的最小工作单元 具体业务处理结构体
Connection net.Conn //客户端的连接
}
var JobQueue chan Job //队列 用来接收 发送请求
type Worker struct {//执行job 为job的管理者
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {//初始化Worker
return Worker {
WorkerPool:workerPool,
JobChannel:make(chan Job),
quit:make(chan bool),
}
}
func (w Worker) Start() {//运行Worker
go func() {
for {
w.WorkerPool <- w.JobChannel //将可用的worker放进队列中
select {
case job := <- w.JobChannel:
HandleConnection(job.Connection) //接收到具体请求时进行处理
case <-w.quit: //接收停止请求
return
}
}
} ()
}
func (w Worker) Stop() {//发送停止请求
go func() {
w.quit <- true
}()
}
type Dispatcher struct {//分配worker的结构体 dispatcher
WorkerPool chan chan Job //worker的池子 控制worker的数量
WorkerList []Worker //worker的切片
}
func NewDispatcher(maxWorkers int) *Dispatcher {//根据传入的值 创建对应数量的channel
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool:pool,
}
}
func (d *Dispatcher) Run() {//根据最大值 创建对应数量的worker
for i := 0; i < MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
d.WorkerList = append(d.WorkerList, worker)
}
go d.dispatch()//监听工作队列
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func (job Job) {
jobChannel := <-d.WorkerPool
jobChannel <- job
}(job)
}
}
}
func (d *Dispatcher) Stop() {//停止所有的worker
for _, worker := range d.WorkerList {
worker.Stop()
}
}
func main() {
l, e := net.Listen("tcp",":3207")
if e != nil {
fmt.Println(e)
return
}
dispatcher := routinePool.NewDispatcher(routinePool.MaxWorkers)//创建dispatcher
dispatcher.Run()
routinePool.JobQueue = make(chan routinePool.Job, routinePool.MaxQueue)//初始化工作队列
defer l.Close()
defer dispatcher.Stop()
for {
conn, err := l.Accept()//接受客户端的连接
if err != nil {
return
}
job := routinePool.Job{
Connection:conn,
}
routinePool.JobQueue <- job //客户端连接放入工作队列
}
}
客户端请求的处理//解包
func Unpack(buffer []byte, readerChannel chan []byte) []byte {
length := len(buffer)
var i int
for i = 0; i < length; i++ {
if length < i + DataLen {
break
}
messageLen := BytesToInt(buffer[i:i+DataLen])//根据长度来获取数据
if length < i + DataLen + messageLen {
break
}
data := buffer[i+DataLen:i+DataLen+messageLen]
readerChannel <- data
i += DataLen + messageLen - 1
}
if i == length {
return make([]byte, 0)
}
return buffer[i:]
}
func BytesToInt(b []byte) int {//字节转换成整形
bytesBuffer := bytes.NewBuffer(b)
var x int32
binary.Read(bytesBuffer, binary.BigEndian, &x)
return int(x)
}
//处理客户端请求
func HandleConnection(conn net.Conn) {
defer func() {
fmt.Println(conn.RemoteAddr())
conn.Close()
}()
tempBuffer := make([]byte, 0)
readerChannel := make(chan []byte, 16)
//fmt.Println(conn.RemoteAddr())
go reader(readerChannel)
buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
return
}
tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
}
}
func reader(readerChannel chan []byte) {
for {
select {
case data := <- readerChannel:
//fmt.Println(string(data))
data = data
}
}
}
const MaxWorkers = 100000
const MaxQueue = 3000
const DataLen = 4
系统用池化技术来减少消耗提升性能
对象池通过复用对象来减少创建对象 垃圾回收的开销
连接池(数据库连接池、Redis连接池、HTTP连接池)通过复用TCP连接来减少创建和释放连接的时间
线程池通过复用线程提升性能
通过限制waitgroup的大小来限制并发数
//github.com/remeh/sizedwaitgroup
尊贵的董事大人
英文标题不为空时 视为本栏投稿
需要关键字 描述 英文标题