温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Go语言中Goroutine退出机制如何使用

发布时间:2022-07-27 11:53:11 来源:亿速云 阅读:145 作者:iii 栏目:开发技术

本篇内容主要讲解“Go语言中Goroutine退出机制如何使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Go语言中Goroutine退出机制如何使用”吧!

goroutine的调度是由 Golang 运行时进行管理的。同一个程序中的所有 goroutine 共享同一个地址空间。goroutine设计的退出机制是由goroutine自己退出,不能在外部强制结束一个正在执行的goroutine(只有一种情况正在运行的goroutine会因为其他goroutine的结束被终止,就是main函数退出或程序停止执行)。下面我先介绍下几种退出方式:

退出方式

进程/main函数退出

kill进程/进程crash

当进程被强制退出,所有它占有的资源都会还给操作系统,而goroutine作为进程内的线程,资源被收回了,那么还未结束的goroutine也会直接退出

main函数结束

同理,当主函数结束,goroutine的资源也会被收回,直接退出。具体可参考下下面的demo,其中go routine里需要print出来的语句是永远也不会出现的。

package main import (    "fmt"    "time" ) func routineTest() {    time.Sleep(time.Second)    fmt.Println("I'm alive") } func main(){    fmt.Println("start test")    go routineTest()    fmt.Println("end test") }

通过channel退出

Go实现了两种并发形式。第一种是大家普遍认知的:多线程共享内存。其实就是Java或者C++等语言中的多线程开发。另外一种是Go语言特有的,也是Go语言推荐的:CSP(communicating sequential processes)并发模型。CSP并发模型是在1970年左右提出的概念,属于比较新的概念,不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的方式来共享内存”。

其核心思想为:

DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.

“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”

普通的线程并发模型,就是像Java、C++、或者Python,他们线程间通信都是通过共享内存的方式来进行的。非常典型的方式就是,在访问共享数据(例如数组、Map、或者某个结构体或对象)的时候,通过锁来访问,因此,在很多时候,衍生出一种方便操作的数据结构,叫做“线程安全的数据结构”。例如Java提供的包”java.util.concurrent”中的数据结构。Go中也实现了传统的线程并发模型。

Go的CSP并发模型,就是通过goroutine和channel来实现的。

因为不是本文重点,在此对channel不做过多介绍,只需要了解channel是goroutine之间的通信机制。 通俗的讲,就是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。channel是go最推荐的goroutine间的通信方式,同时通过channel来通知goroutine退出也是最主要的goroutine退出方式。goroutine虽然不能强制结束另外一个goroutine,但是它可以通过channel通知另外一个goroutine你的表演该结束了。

package main import (    "fmt"    "time" ) func cancelByChannel(quit <-chan time.Time) {    for {       select {       case <-quit:          fmt.Println("cancel goroutine by channel!")          return       default:          fmt.Println("I'm alive")          time.Sleep(1 * time.Second)       }    } } func main() {    quit := time.After(time.Second * 10)    go cancelByChannel(quit)    time.Sleep(15*time.Second)    fmt.Println("I'm done") }

在上面的例子中,我们用时间定义了一个channel,当10秒后,会给到goroutine一个退出信号,然后go routine就会退出。这样我们就实现了在其他线程中通知另一个线程退出的功能。

通过context退出

通过channel通知goroutine退出还有一个更好的方法就是使用context。没错,就是我们在日常开发中接口通用的第一个参数context。它本质还是接收一个channel数据,只是是通过ctx.Done()获取。将上面的示例稍作修改即可。

package main import (    "context"    "fmt"    "time" ) func cancelByContext(ctx context.Context) {    for {       select {       case <- ctx.Done():          fmt.Println("cancel goroutine by context!")          return       default:          fmt.Println("I'm alive")          time.Sleep(1 * time.Second)       }    } } func main() {    ctx, cancel := context.WithCancel(context.Background())    go cancelByContext(ctx)    time.Sleep(10*time.Second)    cancel()    time.Sleep(5*time.Second) }

上面的case中,通过context自带的WithCancel方法将cancel函数传递出来,然后手动调用cancel()函数给goroutine传递了ctx.Done()信号。context也提供了context.WithTimeout()和context.WithDeadline()方法来更方便的传递特定情况下的Done信号。

package main import (    "context"    "fmt"    "time" ) func cancelByContext(ctx context.Context) {    for {       select {       case <- ctx.Done():          fmt.Println("cancel goroutine by context!")          return       default:          fmt.Println("I'm alive")          time.Sleep(1 * time.Second)       }    } } func main() {    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)    go cancelByContext(ctx)    time.Sleep(15*time.Second) }

上述case中使用了context.WithTimeout()来设置10秒后自动退出,使用context.WithDeadline()的功能基本一样。区别是context.WithDeadline()可以指定一个固定的时间点,当然也可以使用time.Now().Add(time.Second*10)的方式来实现同context.WithTimeout()相同的功能。具体示例如下:

package main import (    "context"    "fmt"    "time" ) func cancelByContext(ctx context.Context) {    for {       select {       case <- ctx.Done():          fmt.Println("cancel goroutine by context!")          return       default:          fmt.Println("I'm alive")          time.Sleep(1 * time.Second)       }    } } func main() {    ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))    go cancelByContext(ctx)    time.Sleep(15*time.Second) }

注:这里需要注意的一点是上方两个case中为了方便读者理解,我将context传回的cancel()函数抛弃掉了,实际使用中通常会加上defer cancel()来保证goroutine被杀死。

附:Context 使用原则和技巧

  • 不要把Context放在结构体中,要以参数的方式传递,parent Context一般为Background

  • 应该要把Context作为第一个参数传递给入口请求和出口请求链路上的每一个函数,放在第一位,变量名建议都统一,如ctx。

  • 给一个函数方法传递Context的时候,不要传递nil,否则在tarce追踪的时候,就会断了连接

  • Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递

  • Context是线程安全的,可以放心的在多个goroutine中传递

  • 可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。

通过Panic退出

这是一种不推荐使用的方法!!!在此给出只是提出这种操作的可能性。实际场景中尤其是生产环境请慎用!!

package main import (    "context"    "fmt"    "time" ) func cancelByPanic(ctx context.Context) {    defer func() {       if err := recover(); err != nil {          fmt.Println("cancel goroutine by panic!")       }    }()    for i:=0 ; i< 5 ;i++{       fmt.Println("hello cancelByPanic")       time.Sleep(1 * time.Second)    }    panic("panic") } func main() {    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)    defer cancel()    go cancelByPanic(ctx)    time.Sleep(5*time.Second) }

这里我们通过在defer函数中使用recover来捕获panic error并从panic中拿回控制权,确保程序不会再panic展开到goroutine调用栈顶部后崩溃。

等待自己退出

这是goroutine最常见的退出方式。我们通常都会等待goroutine执行完指定的任务之后自己退出。所以此处就不给示例了。

阻止goroutine退出的方法

了解到goroutine的退出方式后,我们已经可以解决一类问题。那就是当你需要手动控制某个goroutine结束的时候应该怎么办。但是在实际生产中关于goroutine还有一类问题需要解决,那就是当你的主进程结束时,应该如何等待goroutine全部执行完毕后再使主进程退出。

阻止程序退出的方法一种有两种:

通过sync.WaitGroup

package main import (    "fmt" ) func main() {    arr := [3]string{"a", "b", "c"}    for _, v := range arr {       go func(s string) {          fmt.Println(s)       }(v)    }    fmt.Println("End") }

以上方的case为例,可见我们在什么都不加的时候,不会等待go func执行完主程序就会退出。因此下面给出使用WaitGroup的方法。

package main import (     "fmt"     "sync" ) func main() {     var wg sync.WaitGroup // 定义 WaitGroup     arr := [3]string{"a", "b", "c"}     for _, v := range arr {         wg.Add(1) // 增加一个 wait 任务         go func(s string) {             defer wg.Done() // 函数结束时,通知此 wait 任务已经完成             fmt.Println(s)         }(v)     }     // 等待所有任务完成     wg.Wait() }

WaitGroup可以理解为一个goroutine管理者。他需要知道有多少个goroutine在给他干活,并且在干完的时候需要通知他干完了,否则他就会一直等,直到所有的小弟的活都干完为止。我们加上WaitGroup之后,程序会进行等待,直到它收到足够数量的Done()信号为止。

WaitGroup可被调用的方法只有三个:Add() 、Done()、Wait()。通过这三个方法即可实现上述的功能,下面我们把源码贴出。

func (wg *WaitGroup) Add(delta int) {         statep := wg.state()         state := atomic.AddUint64(statep, uint64(delta)<<32)         v := int32(state >> 32) // 计数器         w := uint32(state)      // 等待者个数。这里用uint32,会直接截断了高位32位,留下低32位         if v < 0 {                 // Done的执行次数超出Add的数量                 panic("sync: negative WaitGroup counter")         }         if w != 0 && delta > 0 && v == int32(delta) {                 // 最开始时,Wait不能在Add之前被执行                 panic("sync: WaitGroup misuse: Add called concurrently with Wait")         }         if v > 0 || w == 0 {                 // 计数器不为零,还有没Done的。return     // 没有等待者。return                 return         }         // 所有goroutine都完成任务了,但有goroutine执行了Wait后被阻塞,需要唤醒它         if *statep != state {                 // 已经到了唤醒阶段了,就不能同时并发Add了                 panic("sync: WaitGroup misuse: Add called concurrently with Wait")         }   // 清零之后,就可以继续Add和Done了         *statep = 0         for ; w != 0; w-- {     // 唤醒                 runtime_Semrelease(&wg.sema, false)         } } func (wg *WaitGroup) Done() {         wg.Add(-1) } func (wg *WaitGroup) Wait() {         statep := wg.state()         for {                 state := atomic.LoadUint64(statep)                 v := int32(state >> 32) // 计数器                 w := uint32(state)      // 等待者个数                 if v == 0 {                         // 如果声明变量后,直接执行Wait也不会有问题                         // 下面CAS操作失败,重试,但刚好发现计数器变成零了,安全退出                         return                 }                 if atomic.CompareAndSwapUint64(statep, state, state+1) {                         if race.Enabled && w == 0 {                                 race.Write(unsafe.Pointer(&wg.sema))                         }                         // 挂起当前的g                         runtime_Semacquire(&wg.sema)                         // 被唤醒后,计数器不应该大于0                         // 大于0意味着Add的数量被Done完后,又开始了新一波Add                         if *statep != 0 {                                 panic("sync: WaitGroup is reused before previous Wait has returned")                         }                         return                 }         } }

通过看源码,我们可以知道,有些使用细节是需要注意的:

1.wg.Done()函数实际上实现的是wg.Add(-1),因此直接使用wg.Add(-1)是会造成同样的结果的。在实际使用中要注意避免误操作,使得监听的goroutine数量出现误差。

2.wg.Add()函数可以一次性加n。但是实际使用时通常都设为1。但是wg本身的counter不能设为负数。假设你在没有Add到10以前,一次性wg.Add(-10),会出现panic !

package main import (    "fmt"    "sync" ) func main() {    var wg sync.WaitGroup // 定义 WaitGroup    arr := [3]string{"a", "b", "c"}    for _, v := range arr {       wg.Add(1) // 增加一个 wait 任务       go func(s string) {          defer wg.Done() // 函数结束时,通知此 wait 任务已经完成          fmt.Println(s)       }(v)    }    wg.Add(-10)    // 等待所有任务完成    wg.Wait() } panic: sync: negative WaitGroup counter

3.如果你的程序写的有问题,出现了始终等待的waitgroup会造成死锁。

package main import (    "fmt"    "sync" ) func main() {    var wg sync.WaitGroup // 定义 WaitGroup    arr := [3]string{"a", "b", "c"}    for _, v := range arr {       wg.Add(1) // 增加一个 wait 任务       go func(s string) {          defer wg.Done() // 函数结束时,通知此 wait 任务已经完成          fmt.Println(s)       }(v)    }    wg.Add(1)    // 等待所有任务完成    wg.Wait() } fatal error: all goroutines are asleep - deadlock!

通过channel

第二种方法即是通过channel。具体写法如下:

package main import "fmt" func main() {     arr := [3]string{"a", "b", "c"}     ch := make(chan struct{}, len(arr))     for _, v := range arr {         go func(s string) {             fmt.Println(s)             ch <- struct{}{}         }(v)     }     for i := 0; i < len(arr); i ++ {         <-ch     } }

需要注意的是,channel同样会导致死锁。如下方示例:

package main import "fmt" func main() {    arr := [3]string{"a", "b", "c"}    ch := make(chan struct{}, len(arr))    for _, v := range arr {       go func(s string) {          fmt.Println(s)          ch <- struct{}{}       }(v)    }    for i := 0; i < len(arr); i++ {       <-ch    }    <-ch } fatal error: all goroutines are asleep - deadlock!

封装

利用go routine的这一特性,我们可以将waitGroup等方式封装起来,保证go routine在主进程结束时会继续执行完。封装demo:

package main import (    "fmt"    "sync" ) type WaitGroupWrapper struct {    sync.WaitGroup } func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {    wg.Add(1)    go func() {       f(args...)       wg.Done()    }() } func printArray(args ...interface{}){    fmt.Println(args) } func main() {    var w WaitGroupWrapper // 定义 WaitGroup    arr := [3]string{"a", "b", "c"}    for _, v := range arr {       w.Wrap(printArray,v)    }    w.Wait() }

还可以加上更高端一点的功能,增加时间、事件双控制的wrapper。

package main import (    "fmt"    "sync"    "time" ) type WaitGroupWrapper struct {    sync.WaitGroup } func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {    wg.Add(1)    go func() {       f(args...)       wg.Done()    }() } func (w *WaitGroupWrapper) WaitWithTimeout(d time.Duration) bool {    ch := make(chan struct{})    t := time.NewTimer(d)    defer t.Stop()    go func() {       w.Wait()       ch <- struct{}{}    }()    select {    case <-ch:       fmt.Println("job is done!")       return true    case <-t.C:       fmt.Println("time is out!")       return false    } } func printArray(args ...interface{}){    time.Sleep(3*time.Second) //3秒后会触发time is out分支    //如果改为time.Sleep(time.Second)即会触发job is done分支    fmt.Println(args) } func main() {    var w WaitGroupWrapper // 定义 WaitGroup    arr := [3]string{"a", "b", "c"}    for _, v := range arr {       w.Wrap(printArray,v)    }    w.WaitWithTimeout(2*time.Second) }

到此,相信大家对“Go语言中Goroutine退出机制如何使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI