goroutine 并发控制
方法 | 优点 | 缺点 | 使用场景 |
---|---|---|---|
全局共享变量(cas) | 简单 | 无法安全退出 | 状态机 |
channel+waitgroup | 简单,可以安全退出 | 运行框架控制 | |
context+waitgroup | 强并发控制 goroutine 间通信,结合了Mutex锁(需要保证goroutine并发安全)和channel而实现的 | 并发通信 |
共享变量示例代码
package main
import (
"fmt"
"sync/atomic"
"time"
)
const (
EStatusStop = iota
EStatusRun
)
var (
Status int32 = EStatusRun
)
func main() {
go func() {
for i := 0; i < 10; i++ {
go echo(i)
}
}()
time.Sleep(5 * time.Second)
atomic.StoreInt32(&Status, EStatusStop)
time.Sleep(time.Second)
fmt.Println("main exit")
}
func echo(id int) {
for {
fmt.Printf("Id=%v say hello\n", id)
time.Sleep(time.Second)
if EStatusStop == atomic.LoadInt32(&Status) {
fmt.Printf("Id=%v exit\n", id)
return
}
}
}
channel +waitgroup 示例代码
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
stop := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int, c chan bool) {
defer wg.Done()
consumer(id, c)
}(i, stop)
}
waitForSignal()
// close channel,all receiver can get close signal
close(stop)
wg.Wait()
fmt.Println("main exit")
}
// <-chan :chan 输出,对应调用者是输入
// chan<- : chan 输入,对应调用者是输出
func consumer(id int, stop <-chan bool) {
for {
select {
case <-stop:
fmt.Printf("id=%v comsumer stop\n", id)
return
default:
fmt.Printf("Id=%v hello world\n", id)
time.Sleep(time.Second)
}
}
}
func waitForSignal() {
sigs := make(chan os.Signal)
signal.Notify(sigs, os.Interrupt)
signal.Notify(sigs, syscall.SIGTERM)
signal.Notify(sigs, syscall.SIGTSTP)
signal.Notify(sigs, syscall.SIGQUIT)
<-sigs
}
context 示例代码
package main
import (
"context"
"crypto/md5"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
type Task struct {
Name string
Url string
}
//使用非导出类型,避免ctx key 类型冲突
type key int
var taskKey key
// 使用工具方法,注入,提取数据,而不是直接使用key 操作;
func NewTaskContext(ctx context.Context, u *Task) context.Context {
return context.WithValue(ctx, taskKey, u)
}
func GetTaskFromCtx(ctx context.Context) (t *Task, ok bool) {
t, ok = ctx.Value(taskKey).(*Task)
return
}
func main() {
addrs := []string{
"https://baidu.com",
"https://www.cmcm.com",
}
ctx, _ := context.WithTimeout(context.TODO(), 3*time.Second)
var wg sync.WaitGroup
for index, addr := range addrs {
wg.Add(1)
go GetUrl(NewTaskContext(ctx, &Task{
Name: fmt.Sprint(index),
Url: addr,
}), &wg)
}
wg.Wait()
fmt.Println("main exit")
}
/**
* ctx 传递参数和超时通知
*
* param: context.Context ctx
* param: *sync.WaitGroup wg
*/
func GetUrl(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
task, ok := GetTaskFromCtx(ctx)
if !ok {
panic(ctx)
}
for {
select {
case <-ctx.Done():
fmt.Printf("task=%v exit\n", task.Name)
return
default:
rsp, err := http.Get(task.Url)
if err != nil {
fmt.Printf("task=%v get url=%v err=%v\n", task.Name, task.Url, err)
break
}
bs, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Printf("task=%v read Body=%v err=%v\n", task.Name, rsp.Body, err)
return
}
rsp.Body.Close()
fmt.Printf("task=%v url=%v md5=%x\n", task.Name, task.Url, md5.Sum(bs))
}
}
}
context 原理说明
标准库定义和说明
type Context interface {
// 如果存在,则到期时间
Deadline() (deadline time.Time, ok bool)
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See https://blog.golang.org/pipelines for more examples of how to use
// select 等待ctx 的超时channel
Done() <-chan struct{}
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
// context 中的key value 是为了session 范围传递处理参数的,例如trace 信息
// 但不适合传递可选择的参数
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
// ctx 中的key 的类型建议是非导出类型,避免包之间冲突;
// Packages that define a Context key should provide type-safe accessors
// for the values stored using that key:
// 并且使用包内提供的工具方法,实现对本包定义数据的存取,避免直接使用key 直接存取,可参考上边例子
Value(key interface{}) interface{}
}
- 标准库实现源码解读
// 空ctx
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
// cancelctx
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
// 可以取消的ctx,取消时,会取消全部子ctx, 原理就是close(chan struct{})
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields 并发安全
done chan struct{} // created lazily, closed by first cancel call 为空时初始化,被第一个取消
children map[canceler]struct{} // set to nil by the first cancel call 取消后情况
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} { // 输出chan
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{}) // chan 要初始化
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
type stringer interface {
String() string
}
func contextName(c Context) string {
if s, ok := c.(stringer); ok {
return s.String()
}
return reflectlite.TypeOf(c).String()
}
func (c *cancelCtx) String() string {
return contextName(c.Context) + ".WithCancel"
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
// 关闭后,使用当前ctx 的routine 则退出程序。
// 并循环通知全部的子ctx 结束进程
// 注意此处,只保证routine 收到退出信号,所以,可以配合wg 实现系统的安全推出;
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// removeChild removes a context from its parent.
// 将当前ctx 从父ctx 中删除
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
// 如果父ctx 早于当前时间,则使用父ctx 返回 ??
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 如果没有主动cancel ,则启动计时器,到期后,取消ctx
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// time ctx
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
c.deadline.String() + " [" +
time.Until(c.deadline).String() + "])"
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
同步原语与锁
Go 语言中常见的同步原语 sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once 和 sync.Cond 以及扩展原语 errgroup.Group、semaphore.Weighted 和 singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。
- mutex 锁:
参考链接:
- 并发 https://blog.csdn.net/u013029603/article/details/81232395
- grpc context 超时控制 https://blog.csdn.net/u013029603/article/details/81232395