Golang Sync包
Golang sync包
$GOROOT/src/sync
提供 基础的异步操作方法 包括互斥锁Mutex 执行Once和并发等待组WaitGroup
Mutex 互斥锁
RWMutex 读写锁
WaitGroup 并发等待组
Once 执行一次
Cond 信号量
Pool 临时对象池
Map 自带锁的map
sync.Mutex 互斥锁用在并发编程
互斥锁 要保证的是同一个时间段 不能有多个并发协程 同时访问 某一资源(临界区)
sync.Mutex有2个函数Lock和UnLock分别表示获得锁和释放锁
func (m *Mutex) Lock()
func (m *Mutex) UnLock()
sync.Mutex 初始值为UnLock状态 且sync.Mutex 常做为其它结构体的匿名变量
网上支付 同一个银行账户在某一个时间既有支出也有收入 银行得保证 余额准确
银行的支出和收入来说明Mutex
type Bank struct {
sync.Mutex
balance map[string]float64
}
func (b *Bank) In(account string, value float64) {// In 收入
b.Lock() //加锁 保证同一时间只有一个协程能访问这段代码
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
func (b *Bank) Out(account string, value float64) error {// Out 支出
b.Lock() // 加锁 保证同一时间只有一个协程能访问这段代码
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
sync.RWMutex 读写锁
是 sync.Mutex 变种 RWMutex来自于计算机操作系统的读者写者问题sync.RWMutex 目的是为了能够支持多个并发协程 同时读取某一个资源 但只有一个并发协程能更新资源
也就是说读和写是互斥的 写和写也是互斥的 读和读不互斥
有一个协程在读的时候 所有写的协程必须等到所有读的协程结束才可以获得锁进行写操作
有一个协程在读的时候 读协程不受影响都可读操作
有一个协程在写的时候 所有读 写的协程必须等到写协程结束才可以获得锁进行读 写操作
RWMutex有5个函数 分别为读和写提供锁操作
写操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
读操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
RLocker()能获取读锁 然后传递给其他协程使用
func (rw *RWMutex) RLocker() Locker
用Mutex互斥锁就无办法支持多人同时查询
所以 用sync.RWMutex来改写这个代码
type Bank struct {
sync.RWMutex
balance map[string]float64
}
func (b *Bank) In(account string, value float64) {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
func (b *Bank) Out(account string, value float64) error {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
func (b *Bank) Query(account string) float64 {
b.RLock()
defer b.RUnlock()
v, ok := b.balance[account]
if !ok {
return 0.0
}
return v
}
sync.WaitGroup等待组
Golang 等待一组工作完成后 再进行下一组工作sync.WaitGroup有3个函数
func (wg *WaitGroup) Add(delta int) Add添加n个并发协程
func (wg *WaitGroup) Done() Done完成一个并发协程
func (wg *WaitGroup) Wait() Wait等待其它并发协程结束
sync.WaitGroup 在Golang编程里面最常用于协程池
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ { //同时启动1000个并发协程
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
wg.Wait() // 等待所有协程结束
fmt.Println("WaitGroup all process done ~")
}
sync.WaitGroup 无法指定最大并发协程数 有些场景下有问题
如操作数据库场景
不希望某一些时刻出现大量连接数据库 导致数据库不可访问
为了控制最大的并发数
推荐 github.com/remeh/sizedwaitgroup 用法和sync.WaitGroup类似
最多只有10个并发协程
若已经达到10个并发协程 只有某一个协程执行了Done才能启动一个新的协程
import "github.com/remeh/sizedwaitgroup"
func main() {
wg := sizedwaitgroup.New(10) //最大10个并发
for i = 0; i < 1000; i++ {
wg.Add()
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
wg.Wait() // 等待所有协程结束
fmt.Println("WaitGroup all process done ~")
}
sync.Once只执行一次
的对象实现 用来控制函数只被调用一次sync.Once 使用场景 如单例模式 系统初始化
例如并发情况下多次调用channel的close会导致panic
可用sync.Once来保证close只会被执行一次
sync.Once的结构 只有一个函数 用变量done来记录函数的执行状态
用sync.Mutex和sync.atomic来保证线程安全的读取done
type Once struct {
m Mutex #互斥锁
done uint32 #执行状态
}
func (o *Once) Do(f func())
1000个并发协程情况下 只有一个协程会执行到fmt.Printf
多次执行的情况下输出的内容还不一样
因为这取决于哪个协程先调用到该匿名函数
func main() {
once := &sync.Once{}
for i := 0; i < 1000; i++ {
go func(idx int) {
once.Do(func() {
time.Sleep(1 * time.Second)
fmt.Printf("hello world index %d", idx)
})
}(i)
}
time.Sleep(5 * time.Second)
}
sync.Cond 同步条件变量
需要与互斥锁组合使用 本质上是一些正在等待某个条件的协程的同步机制// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L:l}
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
sync.Cond有3个函数Wait、Signal、Broadcast
func (c *Cond) Wait() // Wait 等待通知
func (c *Cond) Signal() // Signal 单播通知
func (c *Cond) Broadcast() // Broadcast 广播通知
sync.Cond用于并发协程条件变量
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() { // this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() { // this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
sync.Pool 临时对象池
Golang和Java具有GC机制 开发者不必考虑内存回收问题不像C++需要自己回收对象
Gc是把双刃剑 编程方便 同时增加了运行时开销
使用不当会严重影响程序性能 因此性能要求高的场景不可产生这类问题
sync.Pool 用来解决这类问题 Pool 作为临时对象池 不自己单独创建对象
而从临时对象池中获取出对象
sync.Pool有2个函数Get和Put
Get负责从临时对象池中取出对象
Put用于结束时 把对象放回临时对象池
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
官方例子
var bufPool = sync.Pool{
New func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) { // 获取临时对象 无 > 自动创建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes()) // 将临时对象放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
创建Pool对象并不能指定大小
所以sync.Pool的缓存对象数量是无限制的(只受限于内存)
那sync.Pool 如何控制缓存临时对象数呢?
sync.Pool在init的时候注册了一个poolCleanup函数
它会清除所有的pool里面的所有缓存的对象
该函数注册进去之后会在每次Gc之前都会调用
因此sync.Pool缓存的期限只是两次Gc之间这段时间
正因Gc的时候会清掉缓存对象 所以不用担心pool会无限增大的问题
正因为如此sync.Pool适合用于缓存临时对象
不适合用来做持久保存的对象池(连接池等)
sync.Map
Go在1.9版本之前自带的map对象 不具有并发安全 得自己封装支持并发安全的Map如下给map加个读写锁sync.RWMutex
type MapWithLock struct {
sync.RWMutex
M map[string]Kline
}
Go1.9版本新增了 sync.Map 原生支持并发安全map
sync.Map 封装了更为复杂的数据结构实现了比之前加读写锁锁map更优秀的性能
sync.Map 共5个函数
func (m *Map) Load(key interface{}) (value interface{}, ok bool) // 查询一个key
func (m *Map) Store(key, value interface{}) // 设置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) // 若key存在则返回key对应的value 否则设置key value
func (m *Map) Delete(key interface{})// 删除一个key
func (m *Map) Range(f func(key, value interface{}) bool) // 遍历map 仍然是无序的
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
type Locker
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
type Once
func (o *Once) Do(f func())
type Pool
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
type WaitGroup
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
sync.WaitGroup 死锁
golang官网上的正常例子package main
import (
"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
wg.Add(1) // Increment the WaitGroup counter
go func(url string) { // Launch a goroutine to fetch the URL
defer wg.Done() // Decrement the counter when the goroutine completes
http.Get(url)// Fetch the URL
}(url)
}
wg.Wait() // Wait for all HTTP fetches to complete.
}
阻塞等待所有任务完成之后再继续执行。
创建一个任务的时候wg.Add(1)
任务完成的时候使用wg.Done()来将任务减一
使用wg.Wait()来阻塞等待所有任务完成
死锁例子
func main() {
var wg sync.WaitGroup
ch := make(chan int, 1000)
for i := 0; i < 1000; i++ {
wg.Add(1)
go doSomething(i, wg, ch)
}
wg.Wait()
fmt.Println("all done")
for i := 0; i < 1000; i++ {
dd := <-ch
fmt.Println("from ch:"+strconv.Itoa(dd))
}
}
func doSomething(index int, wg sync.WaitGroup, ch chan int) {
defer wg.Done()
fmt.Println("start done:" + strconv.Itoa(index))
//time.Sleep(20 * time.Millisecond)
ch <- index
}
报错fatal error: all goroutines are asleep - deadlock!
死锁了 所有的协程都运行完了 这边还在等待
原因是golang 如果方法传递的不是地址 那么就做个拷贝
所以这里调用的wg 就不是同一个对象 而是对象的拷贝
传递的地方传递地址就可以了
go doSomething(i, &wg, ch)
func doSomething(index int, wg *sync.WaitGroup, ch chan int)
尊贵的董事大人
英文标题不为空时 视为本栏投稿
需要关键字 描述 英文标题