本文介绍 golang.org/x/sync/singleflight
包的使用和原理。
建议结合源码 阅读本文
缓存击穿 在做高并发的服务时,不可避免的会遇到缓冲击穿的问题。缓冲击穿一般是说,当高并发流量缓存过期的情况下,出现大量请求从数据库读取相同数据的情况。这种情况下数据库的压力将瞬间增大。为了避免这种情况,一般有几种解决方案: 1. 缓存永不过期,缓存做主动更新。 2. 在使用缓存时,先检查缓存的过期时间,如果将要过期时,将过期时间延长到指定时间(避免其他服务也主动更新),再主动做缓存更新(更新后,设置新的超时时间)。 3. 加互斥锁,在db查询结束后,统一返回数据。(本文主要使用介绍用singleflight 来实现该方法) 这种方法的弊端是,只是单进程限制同时只能有一个请求。
如何使用 singleflight 解决缓存击穿 在singleflight 包中,提供了一个同时只运行一次方法(fn
)的接口。这个接口和我们需要解决的缓存击穿问题异曲同工,下面简单介绍包中的几个方法:
1 2 3 4 5 6 7 8 9 10 11 12 type Result struct { Val interface {} Err error Shared bool } func (g *Group) Do (key string , fn func () (interface {}, error) ) (v interface {}, err error, shared bool ) {}func (g *Group) DoChan (key string , fn func () (interface {}, error) ) <-chan Result {}func (g *Group) Forget (key string ) {}
一个简单的例子 包中提供了同步访问和异步访问两种调用。我们需要用一个简单的例子来做说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func main () { var count int32 g := &singleflight.Group{} res := []<-chan singleflight.Result{} for i := 0 ; i < 10 ; i++ { key := "hello" res = append (res, g.DoChan("getdata" , func () (interface {}, error) { iter := atomic.AddInt32(&count, 1 ) time.Sleep(time.Duration(time.Microsecond)) return key + strconv.Itoa(int (iter)), nil })) } for i := 0 ; i < 10 ; i++ { dat := <-res[i] fmt.Println(dat.Val) } }
上面例子中,我们 mock 了一个db读取的匿名方法,数据查询使用了1ms 的时间。返回值为 key + count 的值。 如果不用singleflight,我们取到的值,一定是key + 1…10,但是使用了之后,取到的结果都是 hello1
在查询db时。仅查询了一次,将相同的查询归并为一条。结果可以证明singleflight 符合我们的预期,确实可以防止缓存击穿问题的发生。
singleflight 的实现 通过对singleflight
包的使用,推测signleflight
的实现只需要在执行fn
时,判断当前是否有正在进行的fn
,如果存在则等待查询结果;如果没有,则记录并执行fn
。抽象的来说,就是希望同一个 key
指定的fn
,同时仅执行一次,减少fn
的调用次数 。
纸上得来终觉浅,绝知此事要躬行。下面看看这个包是怎么实现的:
错误类型的定义 首先是错误类型的定义, 除了正常的调用失败,一个方法的调用还可能包括 panic
错误 和 runtime.Goexit
调用,为了标记此类错误,因此定义了如下错误类型。
1 2 3 4 5 var errGoexit = errors.New("runtime.Goexit was called" )type panicError struct { value interface {} stack []byte }
执行中程序的调用 对于每一次执行fn,会构造一个call 结构体,用于将结果返回给等待的协程。doCall 则为 fn 的调用执行方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 type call struct { wg sync.WaitGroup val interface {} err error forgotten bool dups int chans []chan <- Result } func (g *Group) doCall (c *call, key string , fn func () (interface {}, error) ) { normalReturn := false recovered := false defer func () { if !normalReturn && !recovered { c.err = errGoexit } c.wg.Done() g.mu.Lock() defer g.mu.Unlock() if !c.forgotten { delete (g.m, key) } if e, ok := c.err.(*panicError); ok { if len (c.chans) > 0 { go panic (e) select {} } else { panic (e) } } else if c.err == errGoexit { } else { for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0 } } } }() func () { defer func () { if !normalReturn { if r := recover (); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } }
接口的实现 同步方式的调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (g *Group) Do (key string , fn func () (interface {}, error) ) (v interface {}, err error, shared bool ) { g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic (e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new (call) c.wg.Add(1 ) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
异步方式的调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (g *Group) DoChan (key string , fn func () (interface {}, error) ) <-chan Result { ch := make (chan Result, 1 ) g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append (c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan <- Result{ch}} c.wg.Add(1 ) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
设置下次调用不适用正在执行的结果
1 2 3 4 5 6 7 8 func (g *Group) Forget (key string ) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete (g.m, key) g.mu.Unlock() }
总结 从上述代码可以看出,做一个防止穿透的小功能,简单而不简约。需要考量的地方还是挺多(如何截获panic,如何判断goexit 等)。如下是内容总结:
runtime.Goexit
的特性, 以及如何捕获。(https://golang.org/cl/134395 )
goexit 用于退出某个协程,但是之前注册的defer 方法仍然将会被执行。
返回的error,不仅可能是正常逻辑错误,或者goexit 错误, 还有可能直接panic。
DoChan 调用方式,如果fn出现panic,该panic将无法被捕获,程序将退出。(Do 方式可以捕获panic), 而 Do 调用则可以捕获。例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 go func () { defer wg.Done() key := "hello" defer func () { if r := recover (); r != nil { fmt.Println("[[[" , r, "]]]" ) } }() _, _, _ = g.Do("getdata" , func () (interface {}, error) { iter := atomic.AddInt32(&count, 1 ) time.Sleep(time.Duration(time.Microsecond)) panic ("panic" ) return key + strconv.Itoa(int (iter)), nil }) }()