Go语言之Go语言并发
Go 语言并发
Golang从语言层面就对并发提供了支持,而goruntine是Go语言并发设计的核心。
Go语言的并发机制运用起来非常舒适,在启动并发的方式上直接添加了语言级的关键字就可以实现,和其他编程语言相比更加轻量。
进程&线程
A、进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
B、线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
C、一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。
并发&并行
A、多线程程序在一个核的cpu上运行,就是并发。
B、多线程程序在多个核的cpu上运行,就是并行。
并发不是并行:
并发主要由切换时间片来实现"同时"运行,并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核数,以发挥多核计算机的能力。
协程&线程
协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
线程:一个线程上可以跑多个协程,协程是轻量级的线程。
Goroutine 介绍
goroutine 只是由官方实现的超级"线程池"。每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是Go语言高并发的根本原因。
goroutine 奉行通过通信来共享内存,而不是共享内存来通信。只需在函数调用语句前添加 go 关键字,就可创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。
事实上,入口函数 main 就以 goroutine 运行。另有与之配套的 channel 类型,用以实现 "以通讯来共享内存" 的 CSP 模式。
goroutine 是通过 Go 的 runtime管理的一个线程管理器
package main import ( "fmt" "time" ) func main() { go func() { fmt.Println("hello word") }() time.Sleep(1 * time.Second) }
进入 main 函数开启一个 goroutine 运行匿名函数函数体内容:fmt.Println("Hello, World!") 。主线程执行 time.Sleep(1 * time.Second) 等待 1 秒。goroutine 执行完毕回到主线程,主线程的sleep 完成结束程序。 注意:若去掉 time.Sleep(1 * time.Second) 这段代码,进入 main 函数开启一个 goroutine,没等 goroutine 运行匿名函数函数体内容,主线程已经完成结束程序。
Go语言Chan应用
Channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。
Channel 是先进先出,线程安全的,多个goroutine同时访问,不需要加锁。
chan 阻塞
我们定义的管道 intChan 容量是5,开启 goroutine 写入10条数据,在写满5条数据时会阻塞,而 read() 每秒会从 intChan 中读取一条,然后write() 再会写入一条数据。
package main import ( "fmt" "time" ) func write(ch chan int) { for i := 0; i < 10; i++ { ch <- i fmt.Println("write data:", i) } } func read(ch chan int) { for { i := <-ch fmt.Println("read data:", i) time.Sleep(time.Second) } } func main() { intChan := make(chan int, 5) go write(intChan) go read(intChan) time.Sleep(10 * time.Second) }
同步模式
默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。
package main import "fmt" func main() { data := make(chan string) // 数据交换队列 exit := make(chan bool) // 退出通知 go func() { for d := range data { // 从队列迭代接收数据,直到 close 。 fmt.Println(d) } fmt.Println("received over") exit <- true // 发出退出通知。 }() data <- "oldboy" // 发送数据。 data <- "Linux" data <- "GOlang" data <- "python" close(data) // 关闭队列。 fmt.Println("send over") <-exit // 等待退出通知。 }
异步模式
异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。
通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小。
package main import "fmt" func main() { data := make(chan string, 3) // 缓冲区可以存储 3 个元素 exit := make(chan bool) data <- "old boy" // 在缓冲区未满前,不会阻塞。 data <- "python" data <- "linux" go func() { for d := range data { // 在缓冲区未空前,不会阻塞。 fmt.Println(d) } // 表示读取出data通道中数据 exit <- true }() data <- "java" // 如果缓冲区已满,阻塞。 data <- "C" close(data) <-exit }
chan 选择
如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。
用 select 实现超时控制:
package main import ( "fmt" "time" ) func main() { exit := make(chan bool) intChan := make(chan int, 2) strChan := make(chan string, 2) go func() { select { case vi := <-intChan: fmt.Println(vi) case vs := <-strChan: fmt.Println(vs) case <-time.After(time.Second * 3): fmt.Println("timeout.") } exit <- true }() // intChan <- 100 // 注释掉,引发 timeout。 // strChan <- "oldboy" <-exit }
在循环中使用 select default case 需要小心,避免形成洪水。
简单工厂模式
用简单工厂模式打包并发任务和 channel。
package main import ( "fmt" "math/rand" "time" ) func NewTest() chan int { c := make(chan int) rand.Seed(time.Now().UnixNano()) go func() { time.Sleep(time.Second) c <- rand.Int() }() return c } func main() { t := NewTest() fmt.Println(<-t) // 等待 goroutine 结束返回。 }
Go 语言WaitGroup
WaitGroup能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。
WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。
Add:添加或者减少等待goroutine的数量;
Done:相当于Add(-1);
Wait:执行阻塞,直到所有的WaitGroup数量变成 0;
WaitGroup用于线程同步,WaitGroup等待一组线程集合完成,才会继续向下执行。 主线程(goroutine)调用Add来设置等待的线程(goroutine)数量。 然后每个线程(goroutine)运行,并在完成后调用Done。 同时,Wait用来阻塞,直到所有线程(goroutine)完成才会向下执行。
WaitGroup实例如下:
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func(n int) { // defer wg.Done() defer wg.Add(-1) EchoNum(n) }(i) } wg.Wait() } func EchoNum(i int) { time.Sleep(time.Second) fmt.Println(i) }
程序中将每次循环的数量 sleep 1 秒钟后输出。如果程序不使用WaitGroup,将不会输出结果。因为goroutine还没执行完,主线程已经执行完毕。
注掉的 defer wg.Done() 和 defer wg.Add(-1) 作用一样。
WaitGroup应用
一、用 channel 实现信号量 (semaphore)。
package main import ( "sync" ) func main() { wg := sync.WaitGroup{} wg.Add(3) //增加三个线程 sem := make(chan int, 1) for i := 0; i < 3; i++ { go func(id int) { defer wg.Done() //减少一个线程 sem <- 1 // 向 sem 发送数据,阻塞或者成功。 for x := 0; x < 3; x++ { println(id, x) } <-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据 }(i) } wg.Wait() }
D:\goprogram\go\src\day10 λ go run test.go
二、用 closed channel 发出退出通知。
package main import ( "sync" "time" ) func main() { wg := sync.WaitGroup{} exit := make(chan bool) for i := 0; i < 2; i++ { wg.Add(1) go func(n int) { defer wg.Done() task := func() { println(n, time.Now().String()) time.Sleep(1 * time.Second) } for { select { case <-exit: // closed channel 不会阻塞,因此可用作退出通知。 return default: // 执行正常任务。 task() } } }(i) } time.Sleep(time.Second * 3) // 让测试 goroutine 运行一会。 close(exit) // 发出退出通知。 wg.Wait() }
WaitGroup陷阱
一、add 数量小于done数量导致 WaitGroup数量为负数
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(1) oldboy := func() { time.Sleep(time.Second) fmt.Println("The old boy welcomes you.") wg.Done() } go oldboy() go oldboy() go oldboy() time.Sleep(time.Second * 3) wg.Wait() }
运行错误:
panic: sync: negative WaitGroup counter
二、add 数量大于 done 数量造成 deadlock
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(4) oldboy := func() { time.Sleep(time.Second) fmt.Println("The old boy welcomes you.") wg.Done() } go oldboy() go oldboy() go oldboy() time.Sleep(time.Second * 3) wg.Wait() }
运行错误:
fatal error: all goroutines are asleep - deadlock!
三、跳过 add 和 Done 操作,直接执行 Wait
package main import ( "fmt" "sync" ) func main() { wg := sync.WaitGroup{} for i := 0; i < 5; i++ { go func(wg sync.WaitGroup, i int) { wg.Add(1) fmt.Printf("i=>%d\n", i) wg.Done() }(wg, i) } wg.Wait() fmt.Println("exit") }
WaitGroup 同步的是 goroutine, 而上面的代码却在 goroutine 中进行 Add(1) 操作。因此,可能在这些 goroutine 还没来得及 Add(1) 就已经执行 Wait 操作了。
四、WaitGroup 拷贝传值问题
package main import ( "fmt" "sync" ) func main() { wg := sync.WaitGroup{} for i := 0; i < 5; i++ { wg.Add(1) go func(wg sync.WaitGroup, i int) { fmt.Printf("i=>%d\n", i) wg.Done() }(wg, i) } wg.Wait() }
运行错误:
fatal error: all goroutines are asleep - deadlock!
wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的,因此 Wait 就死锁了。
正确代码实例如下:
package main import ( "fmt" "sync" ) func main() { wg := new(sync.WaitGroup) // wg := &sync.WaitGroup{} for i := 0; i < 5; i++ { wg.Add(1) go func(wg *sync.WaitGroup, i int) { fmt.Printf("i=>%d\n", i) wg.Done() }(wg, i) } wg.Wait() }
Go 语言runtime
runtime包提供Go语言运行时的系统交互的操作,例如控制goruntine的功能。
调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。
默认情况下,进程启动后仅允许一个系统线程服务于 goroutine。可使用环境变量或标准库函数 runtime.GOMAXPROCS 修改,让调度器用多个线程实现多核并行,而不仅仅是并发。
runtime包常用方法
const GOOS string = theGoos
GOOS是可执行程序的目标操作系统(将要在该操作系统的机器上执行):darwin、freebsd、linux等。
func Gosched()
Gosched使当前go程放弃处理器,以让其它go程运行。它不会挂起当前go程,因此当前go程未来会恢复执行。
func NumCPU() int
NumCPU返回本地机器的逻辑CPU个数。
func GOROOT() string
GOROOT返回Go的根目录。如果存在GOROOT环境变量,返回该变量的值;否则,返回创建Go时的根目录。
func GOMAXPROCS(n int) int
GOMAXPROCS设置可同时执行的最大CPU数,并返回先前的设置。 若 n < 1,它就不会更改当前设置。本地机器的逻辑CPU数可通过 NumCPU 查询。本函数在调度程序优化后会去掉。
func Goexit()
Goexit终止调用它的go程。其它go程不会受影响。Goexit会在终止该go程前执行所有defer的函数。
在程序的main go程调用本函数,会终结该go程,而不会让main返回。因为main函数没有返回,程序会继续执行其它的go程。如果所有其它go程都退出了,程序就会崩溃。
func NumGoroutine() int
NumGoroutine返回当前存在的Go程数。
runtime包应用
一、查看机器的逻辑CPU个数、Go的根目录、操作系统
package main import "runtime" func main() { println("cpu:", runtime.NumCPU()) println("go:", runtime.GOROOT()) println("操作系统:", runtime.GOOS) }
D:\goprogram\go\src\day10 λ go run test.go cpu: 4 go: D:\go\go 操作系统: windows
二、GOMAXPROCS 设置golang运行的cpu核数
Golang 默认所有任务都运行在一个 cpu 核里,如果要在 goroutine 中使用多核,可以使用 runtime.GOMAXPROCS 函数修改,当参数小于 1 时使用默认值。
package main import ( "fmt" "runtime" ) var ( signal = false ) func oldboy() { signal = true } func init() { runtime.GOMAXPROCS(1) } func main() { go oldboy() for { if signal { break } } fmt.Println("end") }
上述代码单核执行如果for前面或者中间不延迟,主线程不会让出CPU,导致异步的线程无法执行,从而无法设置signal的值,从而出现死循环。
运行的cpu核数设置成2核
package main import ( "fmt" "runtime" ) var ( signal = false ) func oldboy() { signal = true } func init() { runtime.GOMAXPROCS(2) } func main() { go oldboy() for { if signal { break } } fmt.Println("end") }
运行结果:
end
三、Gosched 让当前的 goroutine 让出 CPU
这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。当前的 goroutine 不会挂起,当前的 goroutine 程未来会恢复执行。
runtime.Gosched()用于让出CPU时间片。这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。
package main import ( "runtime" "sync" ) func main() { wg := new(sync.WaitGroup) wg.Add(1) go func() { for i := 0; i < 6; i++ { println(i) runtime.Gosched() } defer wg.Done() }() for i := 0; i < 6; i++ { wg.Add(1) go func() { defer wg.Done() println("Hello.Golang!") }() } wg.Wait() }
D:\goprogram\go\src\day10 λ go run test.go 0 1 2 3 4 5 Hello.Golang! Hello.Golang! Hello.Golang! Hello.Golang! Hello.Golang! Hello.Golang!
四、Goexit 终止当前 goroutine 执行
调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。
package main import ( "fmt" "runtime" "sync" ) func main() { wg := new(sync.WaitGroup) wg.Add(1) go func() { defer wg.Done() defer fmt.Println("A.defer") func() { defer fmt.Println("B.defer") runtime.Goexit() // 终止当前 goroutine fmt.Println("B") // 不会执行 }() fmt.Println("A") // 不会执行 }() wg.Wait() }
B.defer A.defer