并发

寒江蓑笠翁大约 95 分钟

并发

Go语言对于并发的支持是纯天然的,这是这门语言的核心所在,其上手难度相对较小,开发人员不太需要关注底层实现就能做出一个相当不错的并发应用,提高了开发人员的下限。


协程

协程(coroutine)是一种轻量级的线程,或者说是用户态的线程,不受操作系统直接调度,由Go语言自身的调度器进行运行时调度,因此上下文切换开销非常小,这也是为什么Go的并发性能很不错的原因之一。协程这一概念并非Go首次提出,Go也不是第一个支持协程的语言,但Go是第一个能够将协程和并发支持的相当简洁和优雅的语言。


在Go中,创建一个协程十分的简单,仅需要一个go关键字,就能够快速开启一个协程,go关键字后面必须是一个函数调用。例子如下

提示

具有返回值的内置函数不允许跟随在go关键字后面,例如下面的错误示范

go make([]int,10) //  go discards result of make([]int, 10) (value of type []int)
func main() {
	go fmt.Println("hello world!")
	go hello()
	go func() {
		fmt.Println("hello world!")
	}()
}

func hello() {
	fmt.Println("hello world!")
}

以上三种开启协程的方式都是可以的,但是其实这个例子执行过后在大部分情况下什么都不会输出,协程是并发执行的,系统创建协程需要时间,而在此之前,主协程早已运行结束,一旦主线程退出,其他子协程也就自然退出了。并且协程的执行顺序也是不确定的,无法预判的,例如下面的例子

func main() {
	fmt.Println("start")
	for i := 0; i < 10; i++ {
		go fmt.Println(i)
	}
	fmt.Println("end")
}

这是一个在循环体中开启协程的例子,永远也无法精准的预判到它到底会输出什么。可能子协程还没开始运行,主协程就已经结束了,情况如下

start
end

又或者只有一部分子协程在主协程退出前成功运行,情况如下

start
0  
1  
5  
3  
4  
6  
7  
end

最简单的做法就是让主协程等一会儿,需要使用到time包下的Sleep函数,可以使当前协程暂停一段时间,例子如下

func main() {
	fmt.Println("start")
	for i := 0; i < 10; i++ {
		go fmt.Println(i)
	}
    // 暂停1ms
	time.Sleep(time.Millisecond)
	fmt.Println("end")
}

再次执行输出如下,可以看到所有的数字都完整输出了,没有遗漏

start
0
1
5
2
3
4
6
8
9
7
end

但是顺序还是乱的,因此让每次循环都稍微的等一下。例子如下

func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go fmt.Println(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

现在的输出已经是正常的顺序了

start
0
1
2
3
4
5
6
7
8
9
end

上面的例子中结果输出很完美,那么并发的问题解决了吗,不,一点也没有。对于并发的程序而言,不可控的因素非常多,执行的时机,先后顺序,执行过程的耗时等等,倘若循环中子协程的工作不只是一个简单的输出数字,而是一个非常巨大复杂的任务,耗时的不确定的,那么依旧会重现之前的问题。例如下方代码

func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go hello(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

func hello(i int) {
   // 模拟随机耗时
   time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
   fmt.Println(i)
}

这段代码的输出依旧是不确定的,下面是可能的情况之一

start
0
3
4
end

因此time.Sleep并不是一种良好的解决办法,幸运的是Go提供了非常多的并发控制手段,常用的并发控制方法有三种:

  • channel:管道
  • WaitGroup:信号量
  • Context:上下文

三种方法有着不同的适用情况,WaitGroup可以动态的控制一组指定数量的协程,Context更适合子孙协程嵌套层级更深的情况,管道更适合协程间通信。对于较为传统的锁控制,Go也对此提供了支持:

  • Mutex:互斥锁
  • RWMutex :读写互斥锁

管道

channel,译为管道,Go对于管道的作用如下解释:

Do not communicate by sharing memory; instead, share memory by communicating.

即通过消息来进行内存共享,channel就是为此而生,它是一种在协程间通信的解决方案,同时也可以用于并发控制,先来认识下channel的基本语法。Go中通过关键字chan来代表管道类型,同时也必须声明管道的存储类型,来指定其存储的数据是什么类型,下面的例子是一个普通管道的模样。

var ch chan int

这是一个管道的声明语句,此时管道还未初始化,其值为nil,不可以直接使用。


创建

在创建管道时,有且只有一种方法,那就是使用内置函数make,对于管道而言,make函数接收两个参数,第一个是管道的类型,第二个是可选参数为管道的缓冲大小。例子如下

intCh := make(chan int)
// 缓冲区大小为1的管道
strCh := make(chan string, 1)

在使用完一个管道后一定要记得关闭该管道,使用内置函数close来关闭一个管道,该函数签名如下。

func close(c chan<- Type)

一个关闭管道的例子如下

func main() {
	intCh := make(chan int)
	// do something
	close(intCh)
}

有些时候使用defer来关闭管道可能会更好。


读写

对于一个管道而言,Go使用了两种很形象的操作符来表示读写操作:

ch <-:表示对一个管道写入数据

<- ch:表示对一个管道读取数据

<-很生动的表示了数据的流动方向,来看一个对int类型的管道读写的例子

func main() {
    // 如果没有缓冲区则会导致死锁
	intCh := make(chan int, 1)
	defer close(intCh)
    // 写入数据
	intCh <- 114514
    // 读取数据
	fmt.Println(<-intCh)
}

上面的例子中创建了一个缓冲区大小为1的int型管道,对其写入数据114514,然后再读取数据并输出,最后关闭该管道。对于读取操作而言,还有第二个返回值,一个布尔类型的值,用于表示数据是否读取成功

ints, ok := <-intCh

管道中的数据流动方式与队列一样,即先进先出(FIFO),协程对于管道的操作是同步的,在某一个时刻,只有一个协程能够对其写入数据,同时也只有一个协程能够读取管道中的数据。


无缓冲

对于无缓冲管道而言,因为缓冲区容量为0,所以不会临时存放任何数据。正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据,否则就会阻塞等待,读取数据时也是同理,这也解释了为什么下面看起来很正常的代码会发生死锁。

func main() {
	// 创建无缓冲管道
	ch := make(chan int)
	defer close(ch)
	// 写入数据
	ch <- 123
	// 读取数据
	n := <-ch
	fmt.Println(n)
}

无缓冲管道不应该同步的使用,正确来说应该开启一个新的协程来发送数据,如下例

func main() {
	// 创建无缓冲管道
	ch := make(chan int)
	defer close(ch)
	go func() {
		// 写入数据
		ch <- 123
	}()
	// 读取数据
	n := <-ch
	fmt.Println(n)
}

有缓冲

当管道有了缓冲区,就像是一个阻塞队列一样,读取空的管道和写入已满的管道都会造成阻塞。无缓冲管道在发送数据时,必须立刻有人接收,否则就会一直阻塞。对于有缓冲管道则不必如此,对于有缓冲管道写入数据时,会先将数据放入缓冲区里,只有当缓冲区容量满了才会阻塞的等待协程来读取管道中的数据。同样的,读取有缓冲管道时,会先从缓冲区中读取数据,直到缓冲区没数据了,才会阻塞的等待协程来向管道中写入数据。因此,无缓冲管道中会造成死锁例子在这里可以顺利运行。

func main() {
   // 创建有缓冲管道
   ch := make(chan int, 1)
   defer close(ch)
   // 写入数据
   ch <- 123
   // 读取数据
   n := <-ch
   fmt.Println(n)
}

尽管可以顺利运行,但这种同步读写的方式是非常危险的,一旦管道缓冲区空了或者满了,将会永远阻塞下去,因为没有其他协程来向管道中写入或读取数据。来看看下面的一个例子

func main() {
	// 创建有缓冲管道
	ch := make(chan int, 5)
	// 创建两个无缓冲管道
	chW := make(chan struct{})
	chR := make(chan struct{})
	defer func() {
		close(ch)
		close(chW)
		close(chR)
	}()
	// 负责写
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
			fmt.Println("写入", i)
		}
		chW <- struct{}{}
	}()
	// 负责读
	go func() {
		for i := 0; i < 10; i++ {
            // 每次读取数据都需要花费1毫秒
			time.Sleep(time.Millisecond)
			fmt.Println("读取", <-ch)
		}
		chR <- struct{}{}
	}()
	fmt.Println("写入完毕", <-chW)
	fmt.Println("读取完毕", <-chR)
}

这里总共创建了3个管道,一个有缓冲管道用于协程间通信,两个无缓冲管道用于同步父子协程的执行顺序。负责读的协程每次读取之前都会等待1毫秒,负责写的协程一口气做多也只能写入5个数据,因为管道缓冲区最大只有5,在没有协程来读取之前,只能阻塞等待。所以该示例输出如下

写入 0
写入 1
写入 2
写入 3
写入 4 // 一下写了5个,缓冲区满了,等其他协程来读
读取 0
写入 5 // 读一个,写一个
读取 1
写入 6
读取 2
写入 7
读取 3
写入 8
写入 9
读取 4
写入完毕 {} // 所有的数据都发送完毕,写协程执行完毕
读取 5
读取 6
读取 7
读取 8
读取 9
读取完毕 {} // 所有的数据都读完了,读协程执行完毕

可以看到负责写的协程刚开始就一口气发送了5个数据,缓冲区满了以后就开始阻塞等待读协程来读取,后面就是每当读协程1毫秒读取一个数据,缓冲区有空位了,写协程就写入一个数据,直到所有数据发送完毕,写协程执行结束,随后当读协程将缓冲区所有数据读取完毕后,读协程也执行结束,最后主协程退出。

提示

通过内置函数len可以访问管道缓冲区中数据的个数,通过cap可以访问管道缓冲区的大小。

func main() {
   ch := make(chan int, 5)
   ch <- 1
   ch <- 2
   ch <- 3
   fmt.Println(len(ch), cap(ch))
}

输出

3 5

利用管道的阻塞条件,可以很轻易的写出一个主协程等待子协程执行完毕的例子

func main() {
   // 创建一个无缓冲管道
   ch := make(chan struct{})
   defer close(ch)
   go func() {
      fmt.Println(2)
      // 写入
      ch <- struct{}{}
   }()
   // 阻塞等待读取
   <-ch
   fmt.Println(1)
}

输出

2
1

通过有缓冲管道还可以实现一个简单的互斥锁,看下面的例子

var count = 0

// 缓冲区大小为1的管道
var lock = make(chan struct{}, 1)

func Add() {
    // 加锁
	lock <- struct{}{}
	fmt.Println("当前计数为", count, "执行加法")
	count += 1
    // 解锁
	<-lock
}

func Sub() {
    // 加锁
	lock <- struct{}{}
	fmt.Println("当前计数为", count, "执行减法")
	count -= 1
    // 解锁
	<-lock
}

由于管道的缓冲区大小为1,最多只有一个数据存放在缓冲区中。AddSub函数在每次操作前都会尝试向管道中发送数据,由于缓冲区大小为1,倘若有其他协程已经写入了数据,缓冲区已经满了,当前协程就必须阻塞等待,直到缓冲区空出位置来,如此一来,在某一个时刻,最多只能有一个协程对变量count进行修改,这样就实现了一个简单的互斥锁。


注意点

下面是一些总结,以下几种情况使用不当会导致管道阻塞:

读写无缓冲管道

当对一个无缓冲管道直接进行同步读写操作都会导致该协程阻塞

func main() {
   // 创建了一个无缓冲管道
   intCh := make(chan int)
   defer close(intCh)
   // 发送数据
   intCh <- 1
   // 读取数据
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

读取空缓冲区的管道

当读取一个缓冲区为空的管道时,会导致该协程阻塞

func main() {
   // 创建的有缓冲管道
   intCh := make(chan int, 1)
   defer close(intCh)
   // 缓冲区为空,阻塞等待其他协程写入数据
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

写入满缓冲区的管道

当管道的缓冲区已满,对其写入数据会导致该协程阻塞

func main() {
	// 创建的有缓冲管道
	intCh := make(chan int, 1)
	defer close(intCh)
	
	intCh <- 1
    // 满了,阻塞等待其他协程来读取数据
	intCh <- 1
}

管道为nil

当管道为nil时,无论怎样读写都会导致当前协程阻塞

func main() {
	var intCh chan int
    // 写
	intCh <- 1
}
func main() {
	var intCh chan int
    // 读
	fmt.Println(<-intCh)
}

关于管道阻塞的条件需要好好掌握和熟悉,大多数情况下这些问题隐藏的十分隐蔽,并不会像例子中那样直观。


以下几种情况还会导致panic

关闭一个nil管道

当管道为nil时,使用close函数对其进行关闭操作会导致panic`

func main() {
	var intCh chan int
	close(intCh)
}

写入已关闭的管道

对一个已关闭的管道写入数据会导致panic

func main() {
	intCh := make(chan int, 1)
	close(intCh)
	intCh <- 1
}

关闭已关闭的管道

在一些情况中,管道可能经过层层传递,调用者或许也不知道到底该由谁来关闭管道,如此一来,可能会发生关闭一个已经关闭了的管道,就会发生panic

func main() {
	ch := make(chan int, 1)
	defer close(ch)
	go write(ch)
	fmt.Println(<-ch)
}

func write(ch chan<- int) {
	// 只能对管道发送数据
	ch <- 1
	close(ch)
}

单向管道

双向管道指的是既可以写,也可以读,即可以在管道两边进行操作。单向管道指的是只读或只写的管道,即只能在管道的一边进行操作。手动创建的一个只读或只写的管道没有什么太大的意义,因为不能对管道读写就失去了其存在的作用。单向管道通常是用来限制通道的行为,一般会在函数的形参和返回值中出现,例如用于关闭通道的内置函数close的函数签名就用到了单向通道。

func close(c chan<- Type)

又或者说常用到的time包下的After函数

func After(d Duration) <-chan Time 

close函数的形参是一个只写通道,After函数的返回值是一个只读通道,所以单向通道的语法如下:

  • 箭头符号<-在前,就是只读通道,如<-chan int
  • 箭头符号<-在后,就是只写通道,如chan<- string

当尝试对只读的管道写入数据时,将会无法通过编译

func main() {
	timeCh := time.After(time.Second)
	timeCh <- time.Now()
}

报错如下,意思非常明确

invalid operation: cannot send to receive-only channel timeCh (variable of type <-chan time.Time)

对只写的管道读取数据也是同理。


双向管道可以转换为单向管道,反过来则不可以。通常情况下,将双向管道传给某个协程或函数并且不希望它读取/发送数据,就可以用到单向管道来限制另一方的行为。

func main() {
   ch := make(chan int, 1)
   go write(ch)
   fmt.Println(<-ch)
}

func write(ch chan<- int) {
   // 只能对管道发送数据
   ch <- 1
}

只读管道也是一样的道理

提示

chan是引用类型,即便Go的函数参数是值传递,但其引用依旧是同一个,这一点会在后续的管道原理中说明。

for range

通过for range语句,可以遍历读取缓冲管道中的数据,如下例

func main() {
	ch := make(chan int, 10)
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
	}()
	for n := range ch {
		fmt.Println(n)
	}
}

通常来说,for range遍历其他可迭代数据结构时,会有两个返回值,第一个是索引,第二个元素值,但是对于管道而言,有且仅有一个返回值,for range会不断读取管道中的元素,当管道缓冲区为空或无缓冲时,就会阻塞等待,直到有其他协程向管道中写入数据才会继续读取数据。所以输出如下:

0
1                                                           
2                                                           
3                                                           
4                                                           
5                                                           
6                                                           
7                                                           
8                                                           
9                                                           
fatal error: all goroutines are asleep - deadlock!

可以看到上面的代码发生了死锁,因为子协程已经执行完毕了,而主协程还在阻塞等待其他协程来向管道中写入数据,所以应该管道在写入完毕后将其关闭。修改为如下代码

func main() {
   ch := make(chan int, 10)
   go func() {
      for i := 0; i < 10; i++ {
         ch <- i
      }
      // 关闭管道
      close(ch)
   }()
   for n := range ch {
      fmt.Println(n)
   }
}

写完后关闭管道,上述代码便不再会发生死锁。前面提到过读取管道是有两个返回值的,for range遍历管道时,当无法成功读取数据时,便会退出循环。第二个返回值指的是能否成功读取数据,而不是管道是否已经关闭,即便管道已经关闭,对于有缓冲管道而言,依旧可以读取数据,并且第二个返回值仍然为true。看下面的一个例子

func main() {
	ch := make(chan int, 10)
	for i := 0; i < 5; i++ {
		ch <- i
	}
    // 关闭管道
	close(ch)
    // 再读取数据
	for i := 0; i < 6; i++ {
		n, ok := <-ch
		fmt.Println(n, ok)
	}
}

输出结果

0 true
1 true 
2 true 
3 true 
4 true 
0 false

由于管道已经关闭了,即便缓冲区为空,再读取数据也不会导致当前协程阻塞,可以看到在第六次遍历的时候读取的是零值,并且okfalse

提示

关于管道关闭的时机,应该尽量在向管道发送数据的那一方关闭管道,而不要在接收方关闭管道,因为大多数情况下接收方只知道接收数据,并不知道该在什么时候关闭管道。


select

select在Linux系统中,是一种IO多路复用的解决方案,类似的,在Go中,select是一种管道多路复用的控制结构。什么是多路复用,简单的用一句话概括:在某一时刻,同时监测多个元素是否可用,被监测的可以是网络请求,文件IO等。在Go中的select监测的元素就是管道,且只能是管道。select的语法与switch语句类似,下面看看一个select语句长什么样

func main() {
	// 创建三个管道
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
	defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()
	select {
	case n, ok := <-chA:
		fmt.Println(n, ok)
	case n, ok := <-chB:
		fmt.Println(n, ok)
	case n, ok := <-chC:
		fmt.Println(n, ok)
	default:
		fmt.Println("所有管道都不可用")
	}
}

switch类似,select由多个case和一个default组成,default分支可以省略。每一个case只能操作一个管道,且只能进行一种操作,要么读要么写,当有多个case可用时,select会伪随机的选择一个case来执行。如果所有case都不可用,就会执行default分支,倘若没有default分支,将会阻塞等待,直到至少有一个case可用。由于上例中没有对管道写入数据,自然所有的case都不可用,所以最终输出为default分支的执行结果。稍微修改下后如下:

func main() {
   chA := make(chan int)
   chB := make(chan int)
   chC := make(chan int)
   defer func() {
      close(chA)
      close(chB)
      close(chC)
   }()
   // 开启一个新的协程
   go func() {
      // 向A管道写入数据
      chA <- 1
   }()
   select {
   case n, ok := <-chA:
      fmt.Println(n, ok)
   case n, ok := <-chB:
      fmt.Println(n, ok)
   case n, ok := <-chC:
      fmt.Println(n, ok)
   }
}

上例开启了一个新的协程来向管道A写入数据,select由于没有默认分支,所以会一直阻塞等待直到有case可用。当管道A可用时,执行完对应分支后主协程就直接退出了。要想一直监测管道,可以配合for循环使用,如下。

func main() {
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
	defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()
	go Send(chA)
	go Send(chB)
	go Send(chC)
	// for循环
	for {
		select {
		case n, ok := <-chA:
			fmt.Println("A", n, ok)
		case n, ok := <-chB:
			fmt.Println("B", n, ok)
		case n, ok := <-chC:
			fmt.Println("C", n, ok)
		}
	}
}

func Send(ch chan<- int) {
	for i := 0; i < 3; i++ {
		time.Sleep(time.Millisecond)
		ch <- i
	}
}

这样确实三个管道都能用上了,但是死循环+select会导致主协程永久阻塞,所以可以将其单独放到新协程中,并且加上一些其他的逻辑。

func main() {
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
	defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()

	l := make(chan struct{})

	go Send(chA)
	go Send(chB)
	go Send(chC)

	go func() {
	Loop:
		for {
			select {
			case n, ok := <-chA:
				fmt.Println("A", n, ok)
			case n, ok := <-chB:
				fmt.Println("B", n, ok)
			case n, ok := <-chC:
				fmt.Println("C", n, ok)
			case <-time.After(time.Second): // 设置1秒的超时时间
				break Loop // 退出循环
			}
		}
		l <- struct{}{} // 告诉主协程可以退出了
	}()

	<-l
}

func Send(ch chan<- int) {
	for i := 0; i < 3; i++ {
		time.Sleep(time.Millisecond)
		ch <- i
	}
}

上例中通过for循环配合select来一直监测三个管道是否可以用,并且第四个case是一个超时管道,超时过后便会退出循环,结束子协程。最终输出如下

C 0 true
A 0 true
B 0 true
A 1 true
B 1 true
C 1 true
B 2 true
C 2 true
A 2 true

超时

上一个例子用到了time.After函数,其返回值是一个只读的管道,该函数配合select使用可以非常简单的实现超时机制,例子如下

func main() {
	chA := make(chan int)
	defer close(chA)
	go func() {
		time.Sleep(time.Second * 2)
		chA <- 1
	}()
	select {
	case n := <-chA:
		fmt.Println(n)
	case <-time.After(time.Second):
		fmt.Println("超时")
	}
}

永久阻塞

select语句中什么都没有时,就会永久阻塞,例如

func main() {
	fmt.Println("start")
	select {}
	fmt.Println("end")
}

end永远也不会输出,主协程会一直阻塞,这种情况一般是有特殊用途。

提示

selectcase中对值为nil的管道进行操作的话,并不会导致阻塞,该case则会被忽略,永远也不会被执行。例如下方代码无论执行多少次都只会输出timeout。

func main() {
   var nilCh chan int
   select {
   case <-nilCh:
      fmt.Println("read")
   case nilCh <- 1:
      fmt.Println("write")
   case <-time.After(time.Second):
      fmt.Println("timeout")
   }
}

WaitGroup

sync.WaitGroupsync包下提供的一个结构体,WaitGroup即等待执行,使用它可以很轻易的实现等待一组协程的效果。该结构体只对外暴露三个方法。

Add方法用于指明要等待的协程的数量

func (wg *WaitGroup) Add(delta int)

Done方法表示当前协程已经执行完毕

func (wg *WaitGroup) Done()

Wait方法等待子协程结束,否则就阻塞

func (wg *WaitGroup) Wait()

WaitGroup使用起来十分简单,属于开箱即用。其内部的实现是计数器+信号量,程序开始时调用Add初始化计数,每当一个协程执行完毕时调用Done,计数就-1,直到减为0,而在此期间,主协程调用Wait 会一直阻塞直到全部计数减为0,然后才会被唤醒。看一个简单的使用例子

func main() {
	var wait sync.WaitGroup
	// 指定子协程的数量
	wait.Add(1)
	go func() {
		fmt.Println(1)
		// 执行完毕
		wait.Done()
	}()
	// 等待子协程
	wait.Wait()
	fmt.Println(2)
}

这段代码永远都是先输出1再输出2,主协程会等待子协程执行完毕后再退出。

1
2

针对协程介绍中最开始的例子,可以做出如下修改

func main() {
   var mainWait sync.WaitGroup
   var wait sync.WaitGroup
   // 计数10
   mainWait.Add(10)
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      // 循环内计数1
      wait.Add(1)
      go func() {
         fmt.Println(i)
         // 两个计数-1
         wait.Done()
         mainWait.Done()
      }()
      // 等待当前循环的协程执行完毕
      wait.Wait()
   }
   // 等待所有的协程执行完毕
   mainWait.Wait()
   fmt.Println("end")
}

这里使用了sync.WaitGroup替代了原先的time.Sleep,协程并发执行的的顺序更加可控,不管执行多少次,输出都如下

start
0  
1  
2  
3  
4  
5  
6  
7  
8  
9  
end

WaitGroup通常适用于可动态调整协程数量的时候,例如事先知晓协程的数量,又或者在运行过程中需要动态调整。WaitGroup的值不应该被复制,复制后的值也不应该继续使用,尤其是将其作为函数参数传递时,因该传递指针而不是值。倘若使用复制的值,计数完全无法作用到真正的WaitGroup上,这可能会导致主协程一直阻塞等待,程序将无法正常运行。例如下方的代码

func main() {
	var mainWait sync.WaitGroup
	mainWait.Add(1)
	hello(mainWait)
	mainWait.Wait()
	fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
	fmt.Println("hello")
	wait.Done()
}

错误提示所有的协程都已经退出,但主协程依旧在等待,这就形成了死锁,因为hello函数内部对一个形参WaitGroup调用Done并不会作用到原来的mainWait上,所以应该使用指针来进行传递。

hello
fatal error: all goroutines are asleep - deadlock!  

提示

当计数变为负数,或者计数数量大于子协程数量时,将会引发panic

Context

Context译为上下文,是Go提供的一种并发控制的解决方案,相比于管道和WaitGroup,它可以更好的控制子孙协程以及层级更深的协程。Context本身是一个接口,只要实现了该接口都可以称之为上下文例如著名Web框架Gin中的gin.Contextcontext标准库也提供了几个实现,分别是:

  • emptyCtx
  • cancelCtx
  • timerCtx
  • valueCtx

Context

先来看看Context接口的定义,再去了解它的具体实现。

type Context interface {
   
   Deadline() (deadline time.Time, ok bool)

   Done() <-chan struct{}

   Err() error

   Value(key any) any
}

Deadline

该方法具有两个返回值,deadline是截止时间,即上下文应该取消的时间。第二个值是是否设置deadline,如果没有设置则一直为false

Deadline() (deadline time.Time, ok bool)

Done

其返回值是一个空结构体类型的只读管道,该管道仅仅起到通知作用,不传递任何数据。当上下文所做的工作应该取消时,该通道就会被关闭,对于一些不支持取消的上下文,可能会返回nil

Done() <-chan struct{}

Err

该方法返回一个error,用于表示上下关闭的原因。当Done管道没有关闭时,返回nil,如果关闭过后,会返回一个err来解释为什么会关闭。

Err() error

Value

该方法返回对应的键值,如果key不存在,或者不支持该方法,就会返回nil

Value(key any) any

emptyCtx

顾名思义,emptyCtx就是空的上下文,context包下所有的实现都是不对外暴露的,但是提供了对应的函数来创建上下文。emptyCtx就可以通过context.Backgroundcontext.TODO来进行创建。两个函数如下

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

可以看到仅仅只是返回了emptyCtx指针。emptyCtx的底层类型实际上是一个int,之所以不使用空结构体,是因为emptyCtx的实例必须要有不同的内存地址,它没法被取消,没有deadline,也不能取值,实现的方法都是返回零值。

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
   return
}

func (*emptyCtx) Done() <-chan struct{} {
   return nil
}

func (*emptyCtx) Err() error {
   return nil
}

func (*emptyCtx) Value(key any) any {
   return nil
}

emptyCtx通常是用来当作最顶层的上下文,在创建其他三种上下文时作为父上下文传入。context包中的各个实现关系如下图所示

valueCtx

valueCtx实现比较简单,其内部只包含一对键值对,和一个内嵌的Context类型的字段。

type valueCtx struct {
   Context
   key, val any
}

其本身只实现了Value方法,逻辑也很简单,当前上下文找不到就去父上下文找。

func (c *valueCtx) Value(key any) any {
   if c.key == key {
      return c.val
   }
   return value(c.Context, key)
}

下面看一个valueCtx的简单使用案例

var waitGroup sync.WaitGroup

func main() {
	waitGroup.Add(1)
    // 传入上下文
	go Do(context.WithValue(context.Background(), 1, 2))
	waitGroup.Wait()
}

func Do(ctx context.Context) {
    // 新建定时器
	ticker := time.NewTimer(time.Second)
	defer waitGroup.Done()
	for {
		select {
		case <-ctx.Done(): // 永远也不会执行
		case <-ticker.C:
			fmt.Println("timeout")
			return
		default:
			fmt.Println(ctx.Value(1))
		}
		time.Sleep(time.Millisecond * 100)
	}
}

valueCtx多用于在多级协程中传递一些数据,无法被取消,因此ctx.Done永远会返回nilselect会忽略掉nil管道。最后输出如下

2
2
2
2
2
2
2
2
2
2
timeout

cancelCtx

cancelCtx以及timerCtx都实现了canceler接口,接口类型如下

type canceler interface {
    // removeFromParent 表示是否从父上下文中删除自身
    // err 表示取消的原因
	cancel(removeFromParent bool, err error)
    // Done 返回一个管道,用于通知取消的原因
	Done() <-chan struct{}
}

cancel方法不对外暴露,在创建上下文时通过闭包将其包装为返回值以供外界调用,就如context.WithCancel源代码中所示

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   c := newCancelCtx(parent)
   // 尝试将自身添加进父级的children中
   propagateCancel(parent, &c)
   // 返回context和一个函数
   return &c, func() { c.cancel(true, Canceled) }
}

cancelCtx译为可取消的上下文,创建时,如果父级实现了canceler,就会将自身添加进父级的children中,否则就一直向上查找。如果所有的父级都没有实现canceler,就会启动一个协程等待父级取消,然后当父级结束时取消当前上下文。当调用cancelFunc时,Done通道将会关闭,该上下文的任何子级也会随之取消,最后会将自身从父级中删除。下面是一个简单的示例:

var waitGroup sync.WaitGroup

func main() {
	bkg := context.Background()
    // 返回了一个cancelCtx和cancel函数
	cancelCtx, cancel := context.WithCancel(bkg)
	waitGroup.Add(1)
	go func(ctx context.Context) {
		defer waitGroup.Done()
		for {
			select {
			case <-ctx.Done():
				fmt.Println(ctx.Err())
				return
			default:
				fmt.Println("等待取消中...")
			}
			time.Sleep(time.Millisecond * 200)
		}

	}(cancelCtx)
	time.Sleep(time.Second)
	cancel()
	waitGroup.Wait()
}

输出如下

等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
context canceled

再来一个层级嵌套深一点的示例

var waitGroup sync.WaitGroup

func main() {
   waitGroup.Add(3)
   ctx, cancelFunc := context.WithCancel(context.Background())
   go HttpHandler(ctx)
   time.Sleep(time.Second)
   cancelFunc()
   waitGroup.Wait()
}

func HttpHandler(ctx context.Context) {
   cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
   cancelCtxMail, cancelMail := context.WithCancel(ctx)

   defer cancelAuth()
   defer cancelMail()
   defer waitGroup.Done()

   go AuthService(cancelCtxAuth)
   go MailService(cancelCtxMail)

   for {
      select {
      case <-ctx.Done():
         fmt.Println(ctx.Err())
         return
      default:
         fmt.Println("正在处理http请求...")
      }
      time.Sleep(time.Millisecond * 200)
   }

}

func AuthService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("auth 父级取消", ctx.Err())
         return
      default:
         fmt.Println("auth...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

func MailService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("mail 父级取消", ctx.Err())
         return
      default:
         fmt.Println("mail...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

例子中创建了3个cancelCtx,尽管父级cancelCtx在取消的同时会取消它的子上下文,但是保险起见,如果创建了一个cancelCtx,在相应的流程结束后就应该调用cancel函数。输出如下

正在处理http请求...
auth...
mail...
mail...
auth...
正在处理http请求...
auth...
mail...
正在处理http请求...
正在处理http请求...
auth...
mail...
auth...
正在处理http请求...
mail...
context canceled
auth 父级取消 context canceled
mail 父级取消 context canceled

timerCtx

timerCtxcancelCtx 的基础之上增加了超时机制,context包下提供了两种创建的函数,分别是WithDeadlineWithTimeout,两者功能类似,前者是指定一个具体的超时时间,比如指定一个具体时间2023/3/20 16:32:00,后者是指定一个超时的时间间隔,比如5分钟后。两个函数的签名如下

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

timerCtx会在时间到期后自动取消当前上下文,取消的流程除了要额外的关闭timer之外,基本与cancelCtx一致。下面是一个简单的timerCtx的使用示例

var wait sync.WaitGroup

func main() {
	deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
	defer cancel()
	wait.Add(1)
	go func(ctx context.Context) {
		defer wait.Done()
		for {
			select {
			case <-ctx.Done():
				fmt.Println("上下文取消", ctx.Err())
				return
			default:
				fmt.Println("等待取消中...")
			}
			time.Sleep(time.Millisecond * 200)
		}
	}(deadline)
	wait.Wait()
}

尽管上下文到期会自动取消,但是为了保险起见,在相关流程结束后,最好手动取消上下文。输出如下

等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
上下文取消 context deadline exceeded

WithTimeout其实与WithDealine非常相似,它的实现也只是稍微封装了一下并调用WithDeadline,和上面例子中的WithDeadline用法一样,如下

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
   return WithDeadline(parent, time.Now().Add(timeout))
}

提示

就跟内存分配后不回收会造成内存泄漏一样,上下文也是一种资源,如果创建了但从来不取消,一样会造成上下文泄露,所以最好避免此种情况的发生。


先来看看的一个例子

var wait sync.WaitGroup
var count = 0

func main() {
   wait.Add(10)
   for i := 0; i < 10; i++ {
      go func(data *int) {
         // 模拟访问耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         // 访问数据
         temp := *data
         // 模拟计算耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         ans := 1
         // 修改数据
         *data = temp + ans
         fmt.Println(*data)
         wait.Done()
      }(&count)
   }
   wait.Wait()
   fmt.Println("最终结果", count)
}

对于上面的例子,开启了十个协程来对count进行+1操作,并且使用了time.Sleep来模拟不同的耗时,根据直觉来讲,10个协程执行10个+1操作,最终结果一定是10,正确结果也确实是10,但事实并非如此,上面的例子执行结果如下:

1
2
3
3
2
2
3
3
3
4
最终结果 4

可以看到最终结果为4,而这只是众多可能结果中的一种。由于每个协程访问和计算所需的时间不同,A协程访问数据耗费500毫秒,此时访问到的count值为1,随后又花费了400毫秒计算,但在这400毫秒内,B协程已经完成了访问和计算并成功更新了count的值,A协程在计算完毕后,A协程最初访问到的值已经过时了,但A协程并不知道这件事,依旧在原先访问到的值基础上加一,并赋值给count,这样一来,B协程的执行结果被覆盖了。多个协程读取和访问一个共享数据时,尤其会发生这样的问题,为此就需要用到锁。

Go中sync包下的MutexRWMutex提供了互斥锁与读写锁两种实现,且提供了非常简单易用的API,加锁只需要Lock(),解锁也只需要Unlock()。需要注意的是,Go所提供的锁都是非递归锁,也就是不可重入锁,所以重复加锁或重复解锁都会导致fatal。锁的意义在于保护不变量,加锁是希望数据不会被其他协程修改,如下

func DoSomething() {
	Lock()
    // 在这个过程中,数据不会被其他协程修改
	Unlock()
}

倘若是递归锁的话,就可能会发生如下情况

func DoSomething() {
	Lock()
    DoOther()
	Unlock()
}

func DoOther() {
	Lock()
	// do other
	Unlock()
}

DoSomthing函数显然不知道DoOther函数可能会对数据做点什么,从而修改了数据,比如再开几个子协程破坏了不变量。这在Go中是行不通的,一旦加锁以后就必须保证不变量的不变性,此时重复加锁解锁都会导致死锁。所以在编写代码时应该避免上述情况,必要时在加锁的同时立即使用defer语句解锁。


互斥锁

sync.Mutex是Go提供的互斥锁实现,其实现了sync.Locker接口

type Locker interface {
   // 加锁
   Lock()
   // 解锁
   Unlock()
}

使用互斥锁可以非常完美的解决上述问题,例子如下

var wait sync.WaitGroup
var count = 0

var lock sync.Mutex

func main() {
	wait.Add(10)
	for i := 0; i < 10; i++ {
		go func(data *int) {
			// 加锁
			lock.Lock()
			// 模拟访问耗时
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			// 访问数据
			temp := *data
			// 模拟计算耗时
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			ans := 1
			// 修改数据
			*data = temp + ans
			// 解锁
			lock.Unlock()
			fmt.Println(*data)
			wait.Done()
		}(&count)
	}
	wait.Wait()
	fmt.Println("最终结果", count)
}

每一个协程在访问数据前,都先上锁,更新完成后再解锁,其他协程想要访问就必须要先获得锁,否则就阻塞等待。如此一来,就不存在上述问题了,所以输出如下

1
2
3
4
5
6
7
8
9
10
最终结果 10

读写锁

互斥锁适合读操作与写操作频率都差不多的情况,对于一些读多写少的数据,如果使用互斥锁,会造成大量的不必要的协程竞争锁,这会消耗很多的系统资源,这时候就需要用到读写锁,即读写互斥锁,对于一个协程而言:

  • 如果获得了读锁,其他协程进行写操作时会阻塞,其他协程进行读操作时不会阻塞
  • 如果获得了写锁,其他协程进行写操作时会阻塞,其他协程进行读操作时会阻塞

Go中读写互斥锁的实现是sync.RWMutex,它也同样实现了Locker接口,但它提供了更多可用的方法,如下:

// 加读锁
func (rw *RWMutex) RLock() 

// 尝试加读锁
func (rw *RWMutex) TryRLock() bool 

// 解读锁
func (rw *RWMutex) RUnlock() 

// 加写锁
func (rw *RWMutex) Lock()

// 尝试加写锁
func (rw *RWMutex) TryLock() bool 

// 解写锁
func (rw *RWMutex) Unlock()

其中TryRlockTryLock两个尝试加锁的操作是非阻塞式的,成功加锁会返回true,无法获得锁时并不会阻塞而是返回false。读写互斥锁内部实现依旧是互斥锁,并不是说分读锁和写锁就有两个锁,从始至终都只有一个锁。下面来看一个读写互斥锁的使用案例

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	rw.RLock()
	fmt.Println("拿到读锁")
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	wait.Done()
}

该例开启了3个写协程,7个读协程,在读数据的时候都会先获得读锁,读协程可以正常获得读锁,但是会阻塞写协程,获得写锁的时候,则会同时阻塞读协程和写协程,直到释放写锁,如此一来实现了读协程与写协程互斥,保证了数据的正确性。例子输出如下:

拿到读锁
拿到读锁
拿到读锁
拿到读锁
释放读锁 0
释放读锁 0
释放读锁 0
释放读锁 0
拿到写锁
释放写锁 1
拿到读锁  
拿到读锁
拿到读锁
释放读锁 1
释放读锁 1
释放读锁 1
拿到写锁
释放写锁 2
拿到写锁
释放写锁 3
最终结果 3

提示

对于锁而言,不应该将其作为值传递和存储,应该使用指针。


条件变量

条件变量,与互斥锁一同出现和使用,所以有些人可能会误称为条件锁,但它并不是锁,是一种通讯机制。Go中的sync.Cond对此提供了实现,而创建条件变量的函数签名如下:

func NewCond(l Locker) *Cond 

可以看到创建一个条件变量前提就是需要创建一个锁,sync.Cond提供了如下的方法以供使用

// 阻塞等待条件生效,直到被唤醒
func (c *Cond) Wait()

// 唤醒一个因条件阻塞的协程
func (c *Cond) Signal()

// 唤醒所有因条件阻塞的协程
func (c *Cond) Broadcast()

条件变量使用起来非常简单,将上面的读写互斥锁的例子稍微修改下即可

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

// 条件变量
var cond = sync.NewCond(rw.RLocker())

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	rw.RLock()
	fmt.Println("拿到读锁")
	// 条件不满足就一直阻塞
	for *i < 3 {
		cond.Wait()
	}
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	// 唤醒所有因条件变量阻塞的协程
	cond.Broadcast()
	wait.Done()
}

在创建条件变量时,因为在这里条件变量作用的是读协程,所以将读锁作为互斥锁传入,如果直接传入读写互斥锁会导致写协程重复解锁的问题。这里传入的是sync.rlocker,通过RWMutex.RLocker方法获得。

func (rw *RWMutex) RLocker() Locker {
   return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

可以看到rlocker也只是把读写互斥锁的读锁操作封装了一下,实际上是同一个引用,依旧是同一个锁。读协程读取数据时,如果小于3就会一直阻塞等待,直到数据大于3,而写协程在更新数据后都会尝试唤醒所有因条件变量而阻塞的协程,所以最后的输出如下

拿到读锁
拿到读锁
拿到读锁
拿到读锁
拿到写锁
释放写锁 1
拿到读锁
拿到写锁
释放写锁 2
拿到读锁
拿到读锁
拿到写锁
释放写锁 3 // 第三个写协程执行完毕
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
最终结果 3

从结果中可以看到,当第三个写协程更新完数据后,七个因条件变量而阻塞的读协程都恢复了运行。

提示

对于条件变量,应该使用for而不是if,应该使用循环来判断条件是否满足,因为协程被唤醒时并不能保证当前条件就已经满足了。

for !condition {
	cond.Wait()
}

sync

Go中很大一部分的并发相关的工具都是sync标准库提供的,上述已经介绍过了sync.WaitGroupsync.Locker等,除此之外,sync包下还有一些其他的工具可以使用。


Once

当在使用一些数据结构时,如果这些数据结构太过庞大,可以考虑采用懒加载的方式,即真正要用到它的时候才会初始化该数据结构。如下面的例子

type MySlice []int

func (m *MySlice) Get(i int) (int, bool) {
   if *m == nil {
      return 0, false
   } else {
      return (*m)[i], true
   }
}

func (m *MySlice) Add(i int) {
   // 当真正用到切片的时候,才会考虑去初始化
   if *m == nil {
      *m = make([]int, 0, 10)
   }
   *m = append(*m, i)
}

那么问题就来了,如果只有一个协程使用肯定是没有任何问题的,但是如果有多个协程访问的话就可能会出现问题了。比如协程A和B同时调用了Add方法,A执行的稍微快一些,已经初始化完毕了,并且将数据成功添加,随后协程B又初始化了一遍,这样一来将协程A添加的数据直接覆盖掉了,这就是问题所在。

而这就是sync.Once要解决的问题,顾名思义,Once译为一次,sync.Once保证了在并发条件下指定操作只会执行一次。它的使用非常简单,只对外暴露了一个Do方法,签名如下:

func (o *Once) Do(f func()) 

在使用时,只需要将初始化操作传入Do方法即可,如下

var wait sync.WaitGroup

func main() {
	var slice MySlice
	wait.Add(4)
	for i := 0; i < 4; i++ {
		go func() {
			slice.Add(1)
			wait.Done()
		}()
	}
	wait.Wait()
	fmt.Println(slice.Len())
}

type MySlice struct {
	s []int
	o sync.Once
}

func (m *MySlice) Get(i int) (int, bool) {
	if m.s == nil {
		return 0, false
	} else {
		return m.s[i], true
	}
}

func (m *MySlice) Add(i int) {
	// 当真正用到切片的时候,才会考虑去初始化
	m.o.Do(func() {
		fmt.Println("初始化")
		if m.s == nil {
			m.s = make([]int, 0, 10)
		}
	})
	m.s = append(m.s, i)
}

func (m *MySlice) Len() int {
	return len(m.s)
}

输出如下

初始化
4

从输出结果中可以看到,所有的数据等正常添加进切片,初始化操作只执行了一次。其实sync.Once的实现相当简单,去除注释真正的代码逻辑只有16行,其原理就是锁+原子操作。源代码如下:

type Once struct {
    // 用于判断操作是否已经执行
	done uint32
	m    Mutex
}

func (o *Once) Do(f func()) {
	// 原子加载数据
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
    // 加锁
	o.m.Lock()
    // 解锁
	defer o.m.Unlock()
    // 判断是否执行
	if o.done == 0 {
        // 执行完毕后修改done
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

Pool

sync.Pool的设计目的是用于存储临时对象以便后续的复用,是一个临时的并发安全对象池,将暂时用不到的对象放入池中,在后续使用中就不需要再额外的创建对象可以直接复用,减少内存的分配与释放频率,最重要的一点就是降低GC压力。sync.Pool总共只有两个方法,如下:

// 申请一个对象
func (p *Pool) Get() any

// 放入一个对象
func (p *Pool) Put(x any)

并且sync.Pool有一个对外暴露的New字段,用于对象池在申请不到对象时初始化一个对象

New func() any

下面以一个例子演示

var wait sync.WaitGroup

// 临时对象池
var pool sync.Pool

// 用于计数过程中总共创建了多少个对象
var numOfObject atomic.Int64

// BigMemData 假设这是一个占用内存很大的结构体
type BigMemData struct {
   M string
}

func main() {
   pool.New = func() any {
      numOfObject.Add(1)
      return BigMemData{"大内存"}
   }
   wait.Add(1000)
   // 这里开启1000个协程
   for i := 0; i < 1000; i++ {
      go func() {
         // 申请对象
         val := pool.Get()
         // 使用对象
         _ = val.(BigMemData)
         // 用完之后再释放对象
         pool.Put(val)
         wait.Done()
      }()
   }
   wait.Wait()
   fmt.Println(numOfObject.Load())
}

例子中开启了1000个协程不断的在池中申请和释放对象,如果不采用对象池,那么1000个协程都需要各自实例化对象,并且这1000个实例化后的对象在使用完毕后都需要由GC来释放内存,如果有几十万个协程或者说创建该对象的成本十分的高昂,这种情况下就会占用很大的内存并且给GC带来非常大的压力,采用对象池后,可以复用对象减少实例化的频率,比如上述的例子输出可能如下:

5

即便开启了1000个协程,整个过程中也只创建了5个对象,如果不采用对象池的话1000个协程将会创建1000个对象,这种优化带来的提升是显而易见的,尤其是在并发量特别大和实例化对象成本特别高的时候更能体现出优势。

在使用sync.Pool时需要注意几个点:

  • 临时对象:sync.Pool只适合存放临时对象,池中的对象可能会在没有任何通知的情况下被GC移除,所以并不建议将网络链接,数据库连接这类存入sync.Pool中。
  • 不可预知:sync.Pool在申请对象时,无法预知这个对象是新创建的还是复用的,也无法知晓池中有几个对象
  • 并发安全:官方保证sync.Pool一定是并发安全,但并不保证用于创建对象的New函数就一定是并发安全的,New函数是由使用者传入的,所以New函数的并发安全性要由使用者自己来维护,这也是为什么上例中对象计数要用到原子值的原因。

提示

最后需要注意的是,当使用完对象后,一定要释放回池中,如果用了不释放那么对象池的使用将毫无意义。

标准库fmt包下就有一个对象池的使用案例,在fmt.Fprintf函数中

func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
   // 申请一个打印缓冲区
   p := newPrinter()
   p.doPrintf(format, a)
   n, err = w.Write(p.buf)
   // 使用完毕后释放
   p.free()
   return
}

其中newPointer函数和free方法的实现如下

func newPrinter() *pp {
   // 向对象池申请的一个对象
   p := ppFree.Get().(*pp)
   p.panicking = false
   p.erroring = false
   p.wrapErrs = false
   p.fmt.init(&p.buf)
   return p
}

func (p *pp) free() {
    // 为了让对象池中的缓冲区大小大致相同以便更好的弹性控制缓冲区大小
    // 过大的缓冲区就不用放回对象池
	if cap(p.buf) > 64<<10 {
		return
	}
	// 字段重置后释放对象到池中
	p.buf = p.buf[:0]
	p.arg = nil
	p.value = reflect.Value{}
	p.wrappedErr = nil
	ppFree.Put(p)
}

Map

sync.Map是官方提供的一种并发安全Map的实现,开箱即用,使用起来十分的简单,下面是该结构体对外暴露的方法:

// 根据一个key读取值,返回值会返回对应的值和该值是否存在
func (m *Map) Load(key any) (value any, ok bool)

// 存储一个键值对
func (m *Map) Store(key, value any)

// 删除一个键值对
func (m *Map) Delete(key any)

// 如果该key已存在,就返回原有的值,否则将新的值存入并返回,当成功读取到值时,loaded为true,否则为false
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// 删除一个键值对,并返回其原有的值,loaded的值取决于key是否存在
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)

// 遍历Map,当f()返回false时,就会停止遍历
func (m *Map) Range(f func(key, value any) bool) 

下面用一个简单的示例来演示下sync.Map的基本使用

func main() {
	var syncMap sync.Map
	// 存入数据
	syncMap.Store("a", 1)
	syncMap.Store("a", "a")
	// 读取数据
	fmt.Println(syncMap.Load("a"))
	// 读取并删除
	fmt.Println(syncMap.LoadAndDelete("a"))
	// 读取或存入
	fmt.Println(syncMap.LoadOrStore("a", "hello world"))
	syncMap.Store("b", "goodbye world")
	// 遍历map
	syncMap.Range(func(key, value any) bool {
		fmt.Println(key, value)
		return true
	})
}

输出

a true
a true           
hello world false
a hello world    
b goodbye world  

接下来看一个并发使用map的例子:

func main() {
	myMap := make(map[int]int, 10)
	var wait sync.WaitGroup
	wait.Add(10)
	for i := 0; i < 10; i++ {
		go func(n int) {
			for i := 0; i < 100; i++ {
				myMap[n] = n
			}
			wait.Done()
		}(i)
	}
	wait.Wait()
}

上例中使用的普通map,开了10个协程不断的存入数据,显然这很可能会触发fatal,结果大概率会如下

fatal error: concurrent map writes

使用sync.Map就可以避免这个问题

func main() {
	var syncMap sync.Map
	var wait sync.WaitGroup
	wait.Add(10)
	for i := 0; i < 10; i++ {
		go func(n int) {
			for i := 0; i < 100; i++ {
				syncMap.Store(n, n)
			}
			wait.Done()
		}(i)
	}
	wait.Wait()
	syncMap.Range(func(key, value any) bool {
		fmt.Println(key, value)
		return true
	})
}

输出如下

8 8
3 3
1 1
9 9
6 6
5 5
7 7
0 0
2 2
4 4

为了并发安全肯定需要做出一定的牺牲,sync.Map的性能要比map低10-100倍左右。


原子

在计算机学科中,原子或原语操作,通常用于表述一些不可再细化分割的操作,由于这些操作无法再细化为更小的步骤,在执行完毕前,不会被其他的任何协程打断,所以执行结果要么成功要么失败,没有第三种情况可言,如果出现了其他情况,那么它就是不是原子操作。例如下方的代码:

func main() {
	a := 0
	if a == 0 {
		a = 1
	}
	fmt.Println(a)
}

上方的代码是一个简单的判断分支,尽管代码很少,但也不是原子操作,真正的原子操作是由硬件指令层面支持的。


类型

好在大多情况下并不需要自行编写汇编,Go标准库sync/atomic包下已经提供了原子操作相关的API,其提供了以下几种类型以供进行原子操作。

atomic.Bool{}
atomic.Pointer[]{}
atomic.Int32{}
atomic.Int64{}
atomic.Uint32{}
atomic.Uint64{}
atomic.Uintptr{}
atomic.Value{}

其中Pointer原子类型支持泛型,Value类型支持存储任何类型,除此之外,还提供了许多函数来方便操作。因为原子操作的粒度过细,在大多数情况下,更适合处理这些基础的数据类型。

提示

atmoic包下原子操作只有函数签名,没有具体实现,具体的实现是由plan9汇编编写。


使用

每一个原子类型都会提供以下三个方法:

  • Load():原子的获取值
  • Swap(newVal type) (old type):原子的交换值,并且返回旧值
  • Store(val type):原子的存储值

不同的类型可能还会有其他的额外方法,比如整型类型都会提供Add方法来实现原子加减操作。下面以一个int64类型演示为例:

func main() {
	var aint64 atomic.Uint64
	// 存储值
	aint64.Store(64)
	// 交换值
	aint64.Swap(128)
	// 增加
	aint64.Add(112)
    // 加载值
	fmt.Println(aint64.Load())
}

或者也可以直接使用函数

func main() {
   var aint64 int64
   // 存储值
   atomic.StoreInt64(&aint64, 64)
   // 交换值
   atomic.SwapInt64(&aint64, 128)
   // 增加
   atomic.AddInt64(&aint64, 112)
   // 加载
   fmt.Println(atomic.LoadInt64(&aint64))
}

其他的类型的使用也是十分类似的,最终输出为:

240

CAS

atmoic包还提供了CompareAndSwap操作,也就是CAS,它是乐观锁的一种典型实现。乐观锁本身并不是锁,是一种并发条件下无锁化并发控制方式。之所以被称作乐观锁,是因为它总是乐观的假设共享数据不会被修改,仅当发现数据未被修改时才会去执行对应操作,而前面了解到的互斥量就是悲观锁,互斥量总是悲观的认为共享数据肯定会被修改,所以在操作时会加锁,操作完毕后就会解锁。由于无锁化实现的并发安全效率相对于锁要高一些,许多并发安全的数据结构都采用了cAS来进行实现,不过真正的效率要结合具体使用场景来看。看下面的一个例子:

var lock sync.Mutex

var count int

func Add(num int) {
   lock.Lock()
   count += num
   lock.Unlock()
}

这是一个使用互斥锁的例子,每次增加数字前都会先上锁,执行完毕后就会解锁,过程中会导致其他协程阻塞,接下来使用CAS改造一下:

var count int64

func Add(num int64) {
	for {
		expect := atomic.LoadInt64(&count)
		if atomic.CompareAndSwapInt64(&count, expect, expect+num) {
			break
		}
	}
}

对于CAS而言,有三个参数,内存值,期望值,新值。执行时,CAS会将期望值与当前内存值进行比较,如果内存值与期望值相同,就会执行后续的操作,否则的话什么也不做。对于Go中atomic包下的原子操作,CAS相关的函数则需要传入地址,期望值,新值,且会返回是否成功替换的布尔值。例如int64类型的CAS 操作函数签名如下:

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

CAS的例子中,首先会通过LoadInt64来获取期望值,随后使用CompareAndSwapInt64来进行比较交换,如果不成功的话就不断循环,直到成功。这样无锁化的操作虽然不会导致协程阻塞,但是不断的循环对于CPU而言依旧是一个不小的开销,所以在一些实现中失败达到了一定次数可能会放弃操作。但是对于上面的操作而言,仅仅只是简单的数字相加,涉及到的操作并不复杂,所以完全可以考虑无锁化实现。

提示

大多数情况下,仅仅只是比较值是无法做到并发安全的,比如因CAS引起ABA问题,就需要使用额外加入version来解决问题。

Value

atomic.Value结构体,可以存储任意类型的值,结构体如下

type Value struct {
   // any类型
   v any
}

尽管可以存储任意类型,但是它不能存储nil,并且前后存储的值类型应当一致,下面两个例子都无法通过编译

func main() {
   var val atomic.Value
   val.Store(nil)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of nil value into Value   
func main() {
   var val atomic.Value
   val.Store("hello world")
   val.Store(114154)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of inconsistently typed value into Value 

除此之外,它的使用与其他的原子类型并无太大的差别,并且需要注意的是,所有的原子类型都不应该复制值,而是应该使用它们的指针。