开篇
这周在看内部一个熔断限流包时,发现它是基于一个开源项目 hystrix-go
实现了,因此有了这篇文章。
Hystrix
是由 Netflex
开发的一款开源组件,提供了基础的熔断功能。 Hystrix
将降级的策略封装在 Command
中,提供了 run
和 fallback
两个方法,前者表示正常的逻辑,比如微服务之间的调用……,如果发生了故障,再执行 fallback
方法返回结果,我们可以把它理解成保底操作。如果正常逻辑在短时间内频繁发生故障,那么可能会触发短路,也就是之后的请求不再执行 run
,而是直接执行 fallback
。更多关于 Hystrix
的信息可以查看 https://github.com/Netflix/Hystrix
,而hystrix-go
则是用 go
实现的 hystrix
版,更确切的说,是简化版。只是上一次更新还是 2018年 的一次 pr
,也就毕业了?
为什么需要这些工具?
比如一个微服务化的产品线上,每一个服务都专注于自己的业务,并对外提供相应的服务接口,或者依赖于外部服务的某个逻辑接口,就像下面这样。
虽然导致雪崩的发生不仅仅这一种,但是我们需要采取一定的措施,来保证不让这个噩梦发生。而 hystrix-go
就很好的提供了 熔断和降级的措施。它的主要思想在于,设置一些阀值,比如最大并发数(当并发数大于设置的并发数,拦截),错误率百分比(请求数量大于等于设置 的阀值,并且错误率达到设置的百分比时,触发熔断)以及熔断尝试恢复时间等 。
hystrix-go
的使用非常简单,你可以调用它的 Go
或者 Do
方法,只是 Go
方法是异步的方式。而 Do
方法是同步方式。我们从一个简单的例子开启。
_ = hystrix.Do("wuqq", func() error { // talk to other services _, err := http.Get("https://www.baidu.com/") if err != nil { fmt.Println("get error:%v",err) return err } return nil }, func(err error) error { fmt.Printf("handle error:%v\n", err) return nil })
Do
函数需要三个参数,第一个参数 commmand
名称,你可以把每个名称当成一个独立当服务,第二个参数是处理正常的逻辑,比如 http
调用服务,返回参数是 err
。如果处理|调用失败,那么就执行第三个参数逻辑, 我们称为保底操作。由于服务错误率过高导致熔断器开启,那么之后的请求也直接回调此函数。
既然熔断器是按照配置的规则而进行是否开启的操作,那么我们当然可以设置我们想要的值。
hystrix.ConfigureCommand("wuqq", hystrix.CommandConfig{ Timeout: int(3 * time.Second), MaxConcurrentRequests: 10, SleepWindow: 5000, RequestVolumeThreshold: 10, ErrorPercentThreshold: 30, }) _ = hystrix.Do("wuqq", func() error { // talk to other services _, err := http.Get("https://www.baidu.com/") if err != nil { fmt.Println("get error:%v",err) return err } return nil }, func(err error) error { fmt.Printf("handle error:%v\n", err) return nil })
稍微解释一下上面配置的值含义:
command
的超时时间。command
的最大并发量 。SleepWindow
的时间就是控制过多久后去尝试服务是否可用了。RequestVolumeThreshold
并且错误率到达这个百分比后就会启动熔断
当然你不设置的话,那么自动走的默认值。
我们再来看一个简单的例子:
package mainimport ( "fmt" "github.com/afex/hystrix-go/hystrix" "net/http" "time")type Handle struct{}func (h *Handle) ServeHTTP(r http.ResponseWriter, request *http.Request) { h.Common(r, request)}func (h *Handle) Common(r http.ResponseWriter, request *http.Request) { hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{ Timeout: int(3 * time.Second), MaxConcurrentRequests: 10, SleepWindow: 5000, RequestVolumeThreshold: 20, ErrorPercentThreshold: 30, }) msg := "success" _ = hystrix.Do("mycommand", func() error { _, err := http.Get("https://www.baidu.com") if err != nil { fmt.Printf("请求失败:%v", err) return err } return nil }, func(err error) error { fmt.Printf("handle error:%v\n", err) msg = "error" return nil }) r.Write([]byte(msg))}func main() { http.ListenAndServe(":8090", &Handle{})}
我们开启了一个 http
服务,监听端口号 8090
,所有请求的处理逻辑都在 Common
方法中,在这个方法中,我们主要是发起一次 http
请求,请求成功响应success
,如果失败,响应失败原因。
我们再写另一个简单程序,并发 11
次的请求 8090
端口。
package mainimport ( "fmt" "io/ioutil" "net/http" "sync" "time")var client *http.Clientfunc init() { tr := &http.Transport{ MaxIdleConns: 100, IdleConnTimeout: 1 * time.Second, } client = &http.Client{Transport: tr}}type info struct { Data interface{} `json:"data"`}func main() { var wg sync.WaitGroup for i := 0; i < 11; i++ { wg.Add(1) go func(int2 int) { defer wg.Done() req, err := http.NewRequest("GET", "http://localhost:8090", nil) if err != nil { fmt.Printf("初始化http客户端处错误:%v", err) return } resp, err := client.Do(req) if err != nil { fmt.Printf("初始化http客户端处错误:%v", err) return } defer resp.Body.Close() nByte, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Printf("读取http数据失败:%v", err) return } fmt.Printf("接收到到值:%v\n", string(nByte)) }(i) } wg.Wait() fmt.Printf("请求完毕\n")}
由于我们配置 MaxConcurrentRequests
为10,那么意味着还有个 g 请求会失败:
具体逻辑在判断当前请求是否可以调用依赖
if !cmd.circuit.AllowRequest() { ...... return }
func (circuit *CircuitBreaker) AllowRequest() bool { return !circuit.IsOpen() || circuit.allowSingleTest()}func (circuit *CircuitBreaker) allowSingleTest() bool { circuit.mutex.RLock() defer circuit.mutex.RUnlock() now := time.Now().UnixNano() openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { / swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) //这一句才是关键 if swapped { log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name) } return swapped } return false}
这段代码首先判断了熔断器是否开启,并且当前时间大于 上一次开启熔断器的时间+ SleepWindow
的时间,如果条件都符合的话,更新此熔断器最新的 openedOrLastTestedTime
,是通过 CompareAndSwapInt64
原子操作完成的,意外着必然只会有一个成功。
此时熔断器还是半开的状态,接着如果能拿到令牌,执行run
函数(也就是Do传入的第二个简单封装后的函数),发起 http
请求,如果成功,上报成功状态,关闭熔断器。如果失败,那么熔断器依旧开启。
以上就是大体的流程讲解,下一篇文章将解读核心源码以及进一步当思考。
更多相关技术文章,请访问go语言教程栏目!
以上就是详解 hystrix-go 使用与原理的详细内容,更多请关注其它相关文章!