语言系列-Golang并发控制原理

goroutine 并发控制

方法 优点 缺点 使用场景
全局共享变量(cas) 简单 无法安全退出 状态机
channel+waitgroup 简单,可以安全退出 运行框架控制
context+waitgroup 强并发控制 goroutine 间通信,结合了Mutex锁(需要保证goroutine并发安全)和channel而实现的 并发通信
共享变量示例代码
  package main

  import (
      "fmt"
      "sync/atomic"
      "time"
  )

  const (
      EStatusStop = iota
      EStatusRun
  )

  var (
      Status int32 = EStatusRun
  )

  func main() {
      go func() {
          for i := 0; i < 10; i++ {
              go echo(i)
          }
      }()

      time.Sleep(5 * time.Second)
      atomic.StoreInt32(&Status, EStatusStop)

      time.Sleep(time.Second)
      fmt.Println("main exit")
  }

  func echo(id int) {
      for {
          fmt.Printf("Id=%v say hello\n", id)
          time.Sleep(time.Second)
          if EStatusStop == atomic.LoadInt32(&Status) {
              fmt.Printf("Id=%v exit\n", id)
              return
          }
      }
  }
channel +waitgroup 示例代码

package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

func main() {
	stop := make(chan bool)
	var wg sync.WaitGroup

	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(id int, c chan bool) {
			defer wg.Done()
			consumer(id, c)
		}(i, stop)
	}
	waitForSignal()
	// close channel,all receiver can get close signal
	close(stop)
	wg.Wait()
	fmt.Println("main exit")
}

// <-chan :chan 输出,对应调用者是输入
// chan<- : chan 输入,对应调用者是输出
func consumer(id int, stop <-chan bool) {
	for {
		select {
		case <-stop:
			fmt.Printf("id=%v comsumer stop\n", id)
			return
		default:
			fmt.Printf("Id=%v hello world\n", id)
			time.Sleep(time.Second)
		}
	}
}

func waitForSignal() {
	sigs := make(chan os.Signal)
	signal.Notify(sigs, os.Interrupt)
	signal.Notify(sigs, syscall.SIGTERM)
	signal.Notify(sigs, syscall.SIGTSTP)
	signal.Notify(sigs, syscall.SIGQUIT)
	<-sigs
}

context 示例代码

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"net/http"
	"sync"
	"time"
)

type Task struct {
	Name string
	Url  string
}
//使用非导出类型,避免ctx key 类型冲突
type key int
var taskKey key
// 使用工具方法,注入,提取数据,而不是直接使用key 操作;

func NewTaskContext(ctx context.Context, u *Task) context.Context {
	return context.WithValue(ctx, taskKey, u)
}
func GetTaskFromCtx(ctx context.Context) (t *Task, ok bool) {
	t, ok = ctx.Value(taskKey).(*Task)
	return
}

func main() {
	addrs := []string{
		"https://baidu.com",
		"https://www.cmcm.com",
	}

	ctx, _ := context.WithTimeout(context.TODO(), 3*time.Second)
	var wg sync.WaitGroup

	for index, addr := range addrs {
		wg.Add(1)

		go GetUrl(NewTaskContext(ctx, &Task{
			Name: fmt.Sprint(index),
			Url:  addr,
		}), &wg)
	}

	wg.Wait()

	fmt.Println("main exit")
}

/**
 * ctx 传递参数和超时通知
 *
 * param: context.Context ctx
 * param: *sync.WaitGroup wg
 */
func GetUrl(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	task, ok := GetTaskFromCtx(ctx)
	if !ok {
		panic(ctx)
	}
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("task=%v exit\n", task.Name)
			return
		default:
			rsp, err := http.Get(task.Url)
			if err != nil {
				fmt.Printf("task=%v get url=%v err=%v\n", task.Name, task.Url, err)
				break
			}

			bs, err := ioutil.ReadAll(rsp.Body)
			if err != nil {
				fmt.Printf("task=%v read Body=%v err=%v\n", task.Name, rsp.Body, err)
				return
			}
			rsp.Body.Close()
			fmt.Printf("task=%v	url=%v		md5=%x\n", task.Name, task.Url, md5.Sum(bs))
		}
	}
}

context 原理说明

标准库定义和说明

type Context interface {
	// 如果存在,则到期时间
	Deadline() (deadline time.Time, ok bool)
	//  func Stream(ctx context.Context, out chan<- Value) error {
	//  	for {
	//  		v, err := DoSomething(ctx)
	//  		if err != nil {
	//  			return err
	//  		}
	//  		select {
	//  		case <-ctx.Done():
	//  			return ctx.Err()
	//  		case out <- v:
	//  		}
	//  	}
	//  }
	//
	// See https://blog.golang.org/pipelines for more examples of how to use
	// select 等待ctx 的超时channel
	Done() <-chan struct{}

	// If Done is not yet closed, Err returns nil.
	// If Done is closed, Err returns a non-nil error explaining why:
	// Canceled if the context was canceled
	// or DeadlineExceeded if the context's deadline passed.
	// After Err returns a non-nil error, successive calls to Err return the same error.
	Err() error

	// Use context values only for request-scoped data that transits
	// processes and API boundaries, not for passing optional parameters to
	// functions.
	// context 中的key value 是为了session 范围传递处理参数的,例如trace 信息
    // 但不适合传递可选择的参数

	// A key identifies a specific value in a Context. Functions that wish
	// to store values in Context typically allocate a key in a global
	// variable then use that key as the argument to context.WithValue and
	// Context.Value. A key can be any type that supports equality;
	// packages should define keys as an unexported type to avoid
	// collisions.
	// ctx 中的key 的类型建议是非导出类型,避免包之间冲突;
	// Packages that define a Context key should provide type-safe accessors
	// for the values stored using that key:
	// 并且使用包内提供的工具方法,实现对本包定义数据的存取,避免直接使用key 直接存取,可参考上边例子

	Value(key interface{}) interface{}
}
  • 标准库实现源码解读

// 空ctx 
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

// cancelctx

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
// 可以取消的ctx,取消时,会取消全部子ctx, 原理就是close(chan struct{})
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields 并发安全
	done     chan struct{}         // created lazily, closed by first cancel call 为空时初始化,被第一个取消
	children map[canceler]struct{} // set to nil by the first cancel call 取消后情况
	err      error                 // set to non-nil by the first cancel call 
}

func (c *cancelCtx) Done() <-chan struct{} { // 输出chan
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{}) // chan 要初始化
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

type stringer interface {
	String() string
}

func contextName(c Context) string {
	if s, ok := c.(stringer); ok {
		return s.String()
	}
	return reflectlite.TypeOf(c).String()
}

func (c *cancelCtx) String() string {
	return contextName(c.Context) + ".WithCancel"
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
        // 关闭后,使用当前ctx 的routine 则退出程序。
        // 并循环通知全部的子ctx 结束进程
        // 注意此处,只保证routine 收到退出信号,所以,可以配合wg 实现系统的安全推出;
		close(c.done)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

// removeChild removes a context from its parent.
// 将当前ctx 从父ctx 中删除
func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}


// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
        // 如果父ctx 早于当前时间,则使用父ctx 返回 ??
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
        // 如果没有主动cancel ,则启动计时器,到期后,取消ctx
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
    
	return c, func() { c.cancel(true, Canceled) }
}


// time ctx
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

func (c *timerCtx) String() string {
	return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
		c.deadline.String() + " [" +
		time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

同步原语与锁

Go 语言中常见的同步原语 sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once 和 sync.Cond 以及扩展原语 errgroup.Group、semaphore.Weighted 和 singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

  • mutex 锁:

参考链接:

  • 并发 https://blog.csdn.net/u013029603/article/details/81232395
  • grpc context 超时控制 https://blog.csdn.net/u013029603/article/details/81232395
WRITTEN BY:    陈贞

个人博客