Golang Sync包


Golang sync包
$GOROOT/src/sync
提供 基础的异步操作方法 包括互斥锁Mutex 执行Once和并发等待组WaitGroup
Mutex 互斥锁
RWMutex 读写锁
WaitGroup 并发等待组
Once 执行一次
Cond 信号量
Pool 临时对象池
Map 自带锁的map
sync.Mutex 互斥锁用在并发编程
互斥锁 要保证的是同一个时间段 不能有多个并发协程 同时访问 某一资源(临界区)
sync.Mutex有2个函数Lock和UnLock分别表示获得锁和释放锁
func (m *Mutex) Lock()
func (m *Mutex) UnLock()
sync.Mutex 初始值为UnLock状态 且sync.Mutex 常做为其它结构体的匿名变量
网上支付 同一个银行账户在某一个时间既有支出也有收入 银行得保证 余额准确
银行的支出和收入来说明Mutex
type Bank struct {
    sync.Mutex
    balance map[string]float64
}
func (b *Bank) In(account string, value float64) {// In 收入
    b.Lock()  //加锁 保证同一时间只有一个协程能访问这段代码
    defer b.Unlock()
    v, ok := b.balance[account]
    if !ok {
        b.balance[account] = 0.0
    }
    b.balance[account] += v
}
func (b *Bank) Out(account string, value float64) error {// Out 支出       
    b.Lock() // 加锁 保证同一时间只有一个协程能访问这段代码
    defer b.Unlock()
    v, ok := b.balance[account]
    if !ok || v < value {
        return errors.New("account not enough balance")
    }
    b.balance[account] -= value
    return nil
}

sync.RWMutex 读写锁

是 sync.Mutex 变种 RWMutex来自于计算机操作系统的读者写者问题
sync.RWMutex 目的是为了能够支持多个并发协程 同时读取某一个资源 但只有一个并发协程能更新资源
也就是说读和写是互斥的 写和写也是互斥的 读和读不互斥
有一个协程在读的时候 所有写的协程必须等到所有读的协程结束才可以获得锁进行写操作
有一个协程在读的时候 读协程不受影响都可读操作
有一个协程在写的时候 所有读 写的协程必须等到写协程结束才可以获得锁进行读 写操作
RWMutex有5个函数 分别为读和写提供锁操作
写操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
读操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
RLocker()能获取读锁 然后传递给其他协程使用
func (rw *RWMutex) RLocker() Locker
用Mutex互斥锁就无办法支持多人同时查询
所以 用sync.RWMutex来改写这个代码
 type Bank struct {
    sync.RWMutex
    balance map[string]float64
}
func (b *Bank) In(account string, value float64) {
    b.Lock()
    defer b.Unlock()
     v, ok := b.balance[account]
    if !ok {
        b.balance[account] = 0.0
    }
     b.balance[account] += v
}
 func (b *Bank) Out(account string, value float64) error {
    b.Lock()
    defer b.Unlock()
     v, ok := b.balance[account]
    if !ok || v < value {
        return errors.New("account not enough balance")
    }
     b.balance[account] -= value
    return nil
}
func (b *Bank) Query(account string) float64 {
    b.RLock()
    defer b.RUnlock()
     v, ok := b.balance[account]
    if !ok {
        return 0.0
    }
     return v
}

sync.WaitGroup等待组

Golang 等待一组工作完成后 再进行下一组工作
sync.WaitGroup有3个函数
func (wg *WaitGroup) Add(delta int)  Add添加n个并发协程
func (wg *WaitGroup) Done()  Done完成一个并发协程
func (wg *WaitGroup) Wait()  Wait等待其它并发协程结束
sync.WaitGroup 在Golang编程里面最常用于协程池
func main() {
     wg := &sync.WaitGroup{}
     for i := 0; i < 1000; i++ { //同时启动1000个并发协程
       wg.Add(1)
       go func() {
                     defer func() {
                         wg.Done()
                     }()
                     time.Sleep(1 * time.Second)
                     fmt.Println("hello world ~")
          }()
     }    
     wg.Wait() // 等待所有协程结束
     fmt.Println("WaitGroup all process done ~")
}
sync.WaitGroup 无法指定最大并发协程数 有些场景下有问题
如操作数据库场景
不希望某一些时刻出现大量连接数据库 导致数据库不可访问
为了控制最大的并发数
推荐 github.com/remeh/sizedwaitgroup 用法和sync.WaitGroup类似
最多只有10个并发协程
若已经达到10个并发协程 只有某一个协程执行了Done才能启动一个新的协程
import  "github.com/remeh/sizedwaitgroup"
func main() {    
     wg := sizedwaitgroup.New(10) //最大10个并发
     for i = 0; i < 1000; i++ {
         wg.Add()
         go func() {
             defer func() {
                  wg.Done()
              }()
               time.Sleep(1 * time.Second)
               fmt.Println("hello world ~")
           }()
     }     
     wg.Wait()  // 等待所有协程结束
     fmt.Println("WaitGroup all process done ~")
}

sync.Once只执行一次

的对象实现  用来控制函数只被调用一次
sync.Once 使用场景 如单例模式 系统初始化
例如并发情况下多次调用channel的close会导致panic
可用sync.Once来保证close只会被执行一次
sync.Once的结构 只有一个函数 用变量done来记录函数的执行状态
用sync.Mutex和sync.atomic来保证线程安全的读取done
type Once struct {
    m    Mutex     #互斥锁
    done uint32    #执行状态
}
func (o *Once) Do(f func())
1000个并发协程情况下 只有一个协程会执行到fmt.Printf
多次执行的情况下输出的内容还不一样
因为这取决于哪个协程先调用到该匿名函数
func main() {
    once := &sync.Once{}
    for i := 0; i < 1000; i++ {
        go func(idx int) {
            once.Do(func() {
                time.Sleep(1 * time.Second)
                fmt.Printf("hello world index %d", idx)
            })
        }(i)
    }
    time.Sleep(5 * time.Second)
}

sync.Cond 同步条件变量

需要与互斥锁组合使用 本质上是一些正在等待某个条件的协程的同步机制
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
    return &Cond{L:l}
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}
sync.Cond有3个函数Wait、Signal、Broadcast
func (c *Cond) Wait() // Wait 等待通知
func (c *Cond) Signal() // Signal 单播通知
func (c *Cond) Broadcast() // Broadcast 广播通知
sync.Cond用于并发协程条件变量
var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)    
    go func() { // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()
     go func() { // this go routine wait for changes to the sharedRsc       
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()
     // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

sync.Pool 临时对象池

Golang和Java具有GC机制 开发者不必考虑内存回收问题
不像C++需要自己回收对象
Gc是把双刃剑 编程方便 同时增加了运行时开销
使用不当会严重影响程序性能 因此性能要求高的场景不可产生这类问题
sync.Pool 用来解决这类问题 Pool 作为临时对象池 不自己单独创建对象
而从临时对象池中获取出对象
sync.Pool有2个函数Get和Put
Get负责从临时对象池中取出对象
Put用于结束时 把对象放回临时对象池
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
官方例子
var bufPool = sync.Pool{
    New func() interface{} {
        return new(bytes.Buffer)
    },
}
func timeNow() time.Time {
    return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {  // 获取临时对象 无 > 自动创建
    b := bufPool.Get().(*bytes.Buffer)
    b.Reset()
    b.WriteString(timeNow().UTC().Format(time.RFC3339))
    b.WriteByte(' ')
    b.WriteString(key)
    b.WriteByte('=')
    b.WriteString(val)
    w.Write(b.Bytes())  // 将临时对象放回到 Pool 中
    bufPool.Put(b)
}
func main() {
    Log(os.Stdout, "path", "/search?q=flowers")
}
创建Pool对象并不能指定大小
所以sync.Pool的缓存对象数量是无限制的(只受限于内存)
那sync.Pool 如何控制缓存临时对象数呢?
sync.Pool在init的时候注册了一个poolCleanup函数
它会清除所有的pool里面的所有缓存的对象
该函数注册进去之后会在每次Gc之前都会调用
因此sync.Pool缓存的期限只是两次Gc之间这段时间
正因Gc的时候会清掉缓存对象 所以不用担心pool会无限增大的问题
正因为如此sync.Pool适合用于缓存临时对象
不适合用来做持久保存的对象池(连接池等)

sync.Map

Go在1.9版本之前自带的map对象 不具有并发安全 得自己封装支持并发安全的Map
如下给map加个读写锁sync.RWMutex
type MapWithLock struct {
    sync.RWMutex
    M map[string]Kline
}
Go1.9版本新增了 sync.Map 原生支持并发安全map
sync.Map 封装了更为复杂的数据结构实现了比之前加读写锁锁map更优秀的性能
sync.Map 共5个函数
func (m *Map) Load(key interface{}) (value interface{}, ok bool) // 查询一个key
func (m *Map) Store(key, value interface{}) // 设置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) // 若key存在则返回key对应的value 否则设置key value
func (m *Map) Delete(key interface{})// 删除一个key
func (m *Map) Range(f func(key, value interface{}) bool) // 遍历map 仍然是无序的
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()
type Once
    func (o *Once) Do(f func())
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()

sync.WaitGroup 死锁

golang官网上的正常例子
package main
import (
  "sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        wg.Add(1)  // Increment the WaitGroup counter        
        go func(url string) { // Launch a goroutine to fetch the URL
            defer wg.Done() // Decrement the counter when the goroutine completes         
            http.Get(url)// Fetch the URL
        }(url)
    }
    wg.Wait() // Wait for all HTTP fetches to complete.
}
阻塞等待所有任务完成之后再继续执行。
创建一个任务的时候wg.Add(1)
任务完成的时候使用wg.Done()来将任务减一
使用wg.Wait()来阻塞等待所有任务完成
死锁例子
func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 1000)
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go doSomething(i, wg, ch)
    }
    wg.Wait()
    fmt.Println("all done")
    for i := 0; i < 1000; i++ {
        dd := <-ch
        fmt.Println("from ch:"+strconv.Itoa(dd))
    }
}
func doSomething(index int, wg  sync.WaitGroup, ch chan int) {
    defer wg.Done()
    fmt.Println("start done:" + strconv.Itoa(index))
    //time.Sleep(20 * time.Millisecond)
    ch <- index
}
报错fatal error: all goroutines are asleep - deadlock!
死锁了 所有的协程都运行完了 这边还在等待
原因是golang 如果方法传递的不是地址 那么就做个拷贝
所以这里调用的wg 就不是同一个对象 而是对象的拷贝
传递的地方传递地址就可以了
go doSomething(i, &wg, ch)
func doSomething(index int, wg *sync.WaitGroup, ch chan int)