go的锁机制

June 12, 2017

go是一门并发特性非常强大的语言,我们在实现并发编程时,往往会碰到多个线程同时访问同一个变量的情况,也就是所谓的竞态,这种情况可能会导致数据混乱出错,因此,这个时候就需要对变量上锁,来保证一次只有一个线程能修改该变量,下面将详细介绍go的锁机制。

sync

go语言中的锁机制是通过自带的sync包来实现的,该包包含了以下几种锁类型。

sync.Mutex

Mutex是互斥锁,其定义方式很简单,先是定义了一个Mutex类型结构体,然后该类型实现了Lock()和Unlock()两个方法。

type Mutex struct {
    state int32
    sema  uint32
}

func (m *Mutex) Lock()

func (m *Mutex) Unlock()

当一个变量被上了互斥锁后,其他访问该变量的线程会被堵塞,不可对该变量进行读写操作,直到锁被释放。下面是一个互斥锁的例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

var m *sync.Mutex

func main() {
    m = new(sync.Mutex)
    go read(1)
    go read(2)
    time.Sleep(time.Second) // goroutine有足够的时间执行完
}

func read(i int) {
    fmt.Println(i, "begin lock")
    m.Lock()
    fmt.Println(i, "in lock")
    m.Unlock()
    fmt.Println(i, "unlock")
}

main函数里启了两个goroutine,调了两次read函数,无论哪个goroutine先执行,先调用read函数的会先获得互斥锁,而另一个goroutine在获取互斥锁时发现已经被占用了,其必须等待互斥锁被释放后才能获得该线程内的互斥锁,所以程序打印结果只会是以下两种:

1 begin lock
2 begin lock
1 in lock
1 unlock
2 in lock
2 unlock

// 或
2 begin lock
1 begin lock
2 in lock
2 unlock
1 in lock
1 unlock

而不会出现在1 lock start中打印出2 in lock的情况。

sync.RWMutex

在上面的例子中,如果有很多goroutine并发执行的话就会存在一个问题,因为某个线程获得互斥锁后,其他的goroutine被堵塞,导致程序的效率较低,这种情况下就需要用到读写锁RWMutex了。

RWMutex是基于互斥锁Mutex实现的,包含了读锁Rlock()和写锁Lock(),上读锁时,数据可以被多个goroutine并发访问但不可写,而上写锁时,数据不可被其他goroutine读或写。下面是其定义方式:

type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

func (*RWMutex) Lock    // 写锁

func (*RWMutex) Unlock

func (*RWMutex) RLock   // 读锁

func (*RWMutex) RUnlock

我们将上面互斥锁的例子改写一下:

package main

import (
	"fmt"
	"sync"
	"time"
)

var m *sync.RWMutex
var val = 0

func main() {
	m = new(sync.RWMutex)
	go read(1)
	go write(2)
	go read(3)
	time.Sleep(5 * time.Second)
}

func read(i int) {
	fmt.Println(i, "begin read")
	m.RLock()
	time.Sleep(1 * time.Second)
	fmt.Println(i, "val: ", val)
	time.Sleep(1 * time.Second)
	m.RUnlock()
	fmt.Println(i, "end read")
}

func write(i int) {
	fmt.Println(i, "begin write")
	m.Lock()
	val = 10
	fmt.Println(i, "val: ", val)
	time.Sleep(1 * time.Second)
	m.Unlock()
	fmt.Println(i, "end write")
}

可能的打印结果:

2 begin write
3 begin read
1 begin read
2 val:  10
2 end write
1 val:  10
3 val:  10
1 end read
3 end read

分析以上结果可以看到,2在写的时候,无法打印出1或3的val,只有2 end write了,才能开始打印出他们的val,说明写锁期间其他goroutine不能访问该变量。继续分析发现在1还没有end read时,已经打印出了3的val,说明读锁期间是允许多个goroutine访问同一变量的。

RWMutex只有当获得锁的大部分goroutine都是读操作,而锁在竞争条件下,也就是说,goroutine们必须等待才能获取到锁的时候,使用RWMutex才是最能带来好处的。RWMutex需要更复杂的内部记录,所以它会比一般的无竞争锁的mutex慢一些。

sync.Once

某些情况下,多个goroutine并发执行时,我们希望goroutine中的某个函数只执行一次,这时候用Once就非常方便了。其定义方式如下:

type Once struct {
    m    Mutex
    done uint32
}

func (o *Once) Do(f func())

该类型也是基于Mutex实现的,因为只会调用一次,其作用类似于init初始化函数,也往往用于初始化操作,请看下面的例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var once sync.Once
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(read)
        }()
    }
    time.Sleep(time.Second)
}

func read() {
    fmt.Println(1)
}

打印结果:

1

最终只会打印出一次1。

sync.WaitGroup

WaitGroup用于等待一组goroutine执行完成,主线程调用Add方法来设置要等待的goroutine数量,每个goroutine运行后会调用Done方法,同时Wait方法会一直堵塞直到所有goroutine执行完成。

type WaitGroup struct {
    // contains filtered or unexported fields
}

func (wg *WaitGroup) Add(delta int)

func (wg *WaitGroup) Done()

func (wg *WaitGroup) Wait()

我们结合下面的例子来看看它是如何实现的:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    var str = []string{
        "Hello, World",
        "Hello, Go",
        "Bye, PHP",
    }
    for _, s := range str {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to read the str.
        go func(s string) {
            // Decrement the counter when the goroutine completes.
            defer wg.Done()
            // Println the s.
            read(s)
        }(s)
    }
    // Wait for all goroutine to complete.
    wg.Wait()
}

func read(s string) {
    time.Sleep(time.Second * 1)
    fmt.Println(s)
}

WaitGroup中存在一个计数器,其原理其实是通过这个计数器来实现的。Add接受一个int类型的参数,当传入正整数n时,计数器的值会增加n,当传入负整数n时,计数器的值会减少n;而当计数器的值等于0时,也就意味着所有goroutine执行完了,堵塞的Wait会被释放,WaitGroup的使命也就完成了。注意:Wait释放之前,计数器的值不能为负,否则程序会panic掉。

上述例子中,main函数执行时,Wait会一直堵塞,for循环开始都会调用一次Add(1),使计数器加一,每个goroutine执行完成后会调用Done,使计数器减一,这个Done其实是调用了Add(-1),大家可以查看下源码。这样,整个for循环跑完后计数器的值肯定是0,也就是说所有goroutine执行完了,然后堵塞的Wait会被释放,后面的程序会继续执行。

根据以上结论,我们也可以将wg.Add()写在for循环外面:

func main() {
    var wg sync.WaitGroup
    var str = []string{
        "Hello, World",
        "Hello, Go",
        "Bye, PHP",
    }
    // Increment the WaitGroup counter.
    wg.Add(len(str))
    for _, s := range str {
        // Launch a goroutine to read the str.
        go func(s string) {
            // Decrement the counter when the goroutine completes.
            defer wg.Done()
            // Println the s.
            read(s)
        }(s)
    }
    wg.Wait()
}

打印结果:

Hello, World
Hello, Go
Bye, PHP

sync.Cond

Cond的作用和WaitGroup是一样的,都是让goroutine堵塞,不同的是WaitGroup是被动堵塞,所有goroutine跑完后,wait会自动释放,而Cond是主动堵塞,我们必须给cond发送一个信号,来通知wait释放。

type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

func NewCond(l Locker) *Cond

func (c *Cond) Signal()

func (c *Cond) Broadcast()

func (c *Cond) Wait()

通过Cond的定义方式可以看到,通过调用NewCond函数来获得一个Cond对象,每个Cond都关联一个Locker L(通常是一个*Mutex或*RWMutex),在更改条件和调用Wait方法时必须持有该Locker。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    locker := new(sync.Mutex)
    cond := sync.NewCond(locker)
    done := false

    cond.L.Lock()

    go func() {
        time.Sleep(time.Second * 1)
        done = true
        cond.Signal()    // 发送信号,通知Wait()释放
    }()

    if !done {
        cond.Wait()      // 堵塞主goroutine
    }

    fmt.Println("now done is", done)    //一秒钟后会打印出 now done is true
}

这里的cond.Signal()就是用来发送一个信号给Wait来通知其释放的,sync.Cond还有一个BroadCast方法,用来通知释放所有堵塞的gouroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func read(x int) {
    cond.L.Lock()    // 获取锁
    cond.Wait()      // 等待通知,暂时阻塞
    fmt.Println(x)
    time.Sleep(time.Second * 1)
    cond.L.Unlock()  // 释放锁,不释放的话将只会有一次输出
}

func main() {
    for i := 0; i < 40; i++ {
        go read(i)
    }
    fmt.Println("start all")
    time.Sleep(time.Second * 1)
    cond.Broadcast() // 下发广播给所有等待的goroutine
    time.Sleep(time.Second * 60)
}

(完)