hystrix-go 源码分析

EmersonYvette 发布于27天前
0 条问题

阅读源码的过程,就像是在像武侠小说里阅读武功秘籍一样,分析高手的一招一式,提炼出精髓,来增强自己的内力。

之前的帖子说了一下 微服务的雪崩效应和常见的解决方案 ,太水,没有上代码怎么叫解决方案。 github 上有很多开源的库来解决 雪崩问题 ,比较出名的是 Netflix 的开源库 hystrix 。集 流量控制 、 熔断 、 容错 等于一身的 java 语言的库。今天分析的源码库是 hystrix-go ,他是 hystrix 的的 go 语言版,应该是说简化版本,用很少的代码量实现了主要功能。很推荐朋友们有时间读一读。

使用简单

hystrix 的使用是非常简单的,同步执行,直接调用 Do 方法。

err := hystrix.Do("my_command", func() error {
   // talk to other services
   return nil
}, func(err error) error {
   // do this when services are down
   return nil
})

异步执行 Go 方法,内部实现是启动了一个 gorouting ,如果想得到自定义方法的数据,需要你传 channel 来处理数据,或者输出。返回的 error 也是一个 channel

output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
    // talk to other services
    output <- true
    return nil
}, nil)

select {
case out := <-output:
    // success
case err := <-errors:
    // failure

大概的执行流程图

hystrix-go 源码分析

其实方法 Do 和 Go 方法内部都是调用了 hystrix.GoC 方法,只是 Do 方法处理了异步的过程

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
    done := make(chan struct{}, 1)
    r := func(ctx context.Context) error {
        err := run(ctx)
        if err != nil {
            return err
        }
        done <- struct{}{}
        return nil
    }
    f := func(ctx context.Context, e error) error {
        err := fallback(ctx, e)
        if err != nil {
            return err
        }
        done <- struct{}{}
        return nil
    }
    var errChan chan error
    if fallback == nil {
        errChan = GoC(ctx, name, r, nil)
    } else {
        errChan = GoC(ctx, name, r, f)
    }

    select {
    case <-done:
        return nil
    case err := <-errChan:
        return err
    }
}

自定义Command配置

在调用 Do Go 等方法之前我们可以先自定义一些配置

hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
        Timeout:                int(time.Second * 3),
        MaxConcurrentRequests:  100,
        SleepWindow:            int(time.Second * 5),
        RequestVolumeThreshold: 30,
        ErrorPercentThreshold: 50,
    })

    err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
        // ...
        return nil
    }, func(i context.Context, e error) error {
        // ...
        return e
    })

我大要说了一下 CommandConfig 第个字段的意义:

  • Timeout: 执行command的超时时间。 默认时间是1000毫秒
  • MaxConcurrentRequests:command的最大并发量 默认值是10
  • SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。 默认值是5000毫秒
  • RequestVolumeThreshold: 一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。 默认值是20
  • ErrorPercentThreshold:错误百分比,请求数量大于等于 RequestVolumeThreshold 并且错误率到达这个百分比后就会启动 熔断 默认值是50

当然如果不配置他们,会使用 默认值

讲完了怎么用,接下来就是分析源码了。我是从下层到上层的顺序分析代码和执行流程

统计控制器

每一个Command都会有一个默认统计控制器,当然也可以添加多个自定义的控制器。

默认的统计控制器 DefaultMetricCollector 保存着 熔断器 的所有状态, 调用次数 , 失败次数 , 被拒绝次数 等等

type DefaultMetricCollector struct {
    mutex *sync.RWMutex

    numRequests *rolling.Number
    errors      *rolling.Number

    successes               *rolling.Number
    failures                *rolling.Number
    rejects                 *rolling.Number
    shortCircuits           *rolling.Number
    timeouts                *rolling.Number
    contextCanceled         *rolling.Number
    contextDeadlineExceeded *rolling.Number

    fallbackSuccesses *rolling.Number
    fallbackFailures  *rolling.Number
    totalDuration     *rolling.Timing
    runDuration       *rolling.Timing
}

最主要的还是要看一下 rolling.Number , rolling.Number 才是状态最终保存的地方

Number 保存了10秒内的 Buckets 数据信息,每一个 Bucket 的统计时长为1秒

hystrix-go 源码分析

type Number struct {
    Buckets map[int64]*numberBucket
    Mutex   *sync.RWMutex
}

type numberBucket struct {
    Value float64
}

字典字段 Buckets map[int64]*numberBucket 中的 Key 保存的是当前时间

可能你会好奇 Number 是如何保证只保存10秒内的数据的。每一次对 熔断器 的状态进行修改时, Number 都要先得到当前的时间(秒级)的 Bucket 不存在则创建。

func (r *Number) getCurrentBucket() *numberBucket {
    now := time.Now().Unix()
    var bucket *numberBucket
    var ok bool

    if bucket, ok = r.Buckets[now]; !ok {
        bucket = &numberBucket{}
        r.Buckets[now] = bucket
    }

    return bucket
}

修改完后去掉10秒外的数据

func (r *Number) removeOldBuckets() {
    now := time.Now().Unix() - 10

    for timestamp := range r.Buckets {
        // TODO: configurable rolling window
        if timestamp <= now {
            delete(r.Buckets, timestamp)
        }
    }
}

比如 Increment 方法,先得到 Bucket 再删除旧的数据

func (r *Number) Increment(i float64) {
    if i == 0 {
        return
    }

    r.Mutex.Lock()
    defer r.Mutex.Unlock()

    b := r.getCurrentBucket()
    b.Value += i
    r.removeOldBuckets()
}

统计控制器是最基层和最重要的一个实现,上层所有的执行判断都是基于他的数据进行逻辑处理的

上报执行状态信息

断路器-->执行-->上报执行状态信息-->保存到相应的Buckets

hystrix-go 源码分析

每一次断路器逻辑的执行都会上报执行过程中的状态,

// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
    // ...
    circuit.mutex.RLock()
    o := circuit.open
    circuit.mutex.RUnlock()
    if eventTypes[0] == "success" && o {
        circuit.setClose()
    }
    var concurrencyInUse float64
    if circuit.executorPool.Max > 0 {
        concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
    }
    select {
    case circuit.metrics.Updates <- &commandExecution{
        Types:            eventTypes,
        Start:            start,
        RunDuration:      runDuration,
        ConcurrencyInUse: concurrencyInUse,
    }:
    default:
        return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
    }

    return nil
}

circuit.metrics.Updates 这个信道就是处理上报信息的,上报执行状态自信的结构是 metricExchange ,结构体很简单只有4个字段。要的就是

  • channel 字段 Updates 他是一个有 buffer 的 channel 默认的数量是 2000 个,所有的状态信息都在他里面
  • metricCollectors 字段,就是保存的具体的这个 command 执行过程中的各种信息
type metricExchange struct {
    Name    string
    Updates chan *commandExecution
    Mutex   *sync.RWMutex

    metricCollectors []metricCollector.MetricCollector
}

type commandExecution struct {
    Types            []string      `json:"types"`
    Start            time.Time     `json:"start_time"`
    RunDuration      time.Duration `json:"run_duration"`
    ConcurrencyInUse float64       `json:"concurrency_inuse"`
}

func newMetricExchange(name string) *metricExchange {
    m := &metricExchange{}
    m.Name = name

    m.Updates = make(chan *commandExecution, 2000)
    m.Mutex = &sync.RWMutex{}
    m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
    m.Reset()

    go m.Monitor()

    return m
}

在执行 newMetricExchange 的时候会启动一个协程 go m.Monitor() 去监控 Updates 的数据,然后上报给 metricCollectors 保存执行的信息数据比如前面提到的 调用次数 , 失败次数 , 被拒绝次数 等等

func (m *metricExchange) Monitor() {
    for update := range m.Updates {
        // we only grab a read lock to make sure Reset() isn't changing the numbers.
        m.Mutex.RLock()

        totalDuration := time.Since(update.Start)
        wg := &sync.WaitGroup{}
        for _, collector := range m.metricCollectors {
            wg.Add(1)
            go m.IncrementMetrics(wg, collector, update, totalDuration)
        }
        wg.Wait()

        m.Mutex.RUnlock()
    }
}

更新调用的是 go m.IncrementMetrics(wg, collector, update, totalDuration) ,里面判断了他的状态

func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
    // granular metrics
    r := metricCollector.MetricResult{
        Attempts:         1,
        TotalDuration:    totalDuration,
        RunDuration:      update.RunDuration,
        ConcurrencyInUse: update.ConcurrencyInUse,
    }
    switch update.Types[0] {
    case "success":
        r.Successes = 1
    case "failure":
        r.Failures = 1
        r.Errors = 1
    case "rejected":
        r.Rejects = 1
        r.Errors = 1
    // ...
    }
    // ...
    collector.Update(r)
    wg.Done()
}

流量控制

hystrix-go 对流量控制的代码是很简单的。用了一个简单的令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。得不到令牌就拒绝,拒绝后调用用户设置的 callback 方法,如果没有设置就不执行。

结构体 executorPool 就是 hystrix-go 流量控制 的具体实现。字段 Max 就是每秒最大的并发值。

type executorPool struct {
    Name    string
    Metrics *poolMetrics
    Max     int
    Tickets chan *struct{}
}

在创建 executorPool 的时候,会根据 Max 值来创建 令牌 。Max值如果没有设置会使用默认值 10

func newExecutorPool(name string) *executorPool {
    p := &executorPool{}
    p.Name = name
    p.Metrics = newPoolMetrics(name)
    p.Max = getSettings(name).MaxConcurrentRequests

    p.Tickets = make(chan *struct{}, p.Max)
    for i := 0; i < p.Max; i++ {
        p.Tickets <- &struct{}{}
    }

    return p
}

流量控制上报状态

注意一下字段 Metrics 他用于统计执行数量,比如: 执行的总数量 , 最大的并发数 具体的代码就不贴上来了。这个数量也可以显露出,供可视化程序直观的表现出来。

令牌使用完后是需要返还的,返回的时候才会做上面所说的统计工作。

func (p *executorPool) Return(ticket *struct{}) {
    if ticket == nil {
        return
    }

    p.Metrics.Updates <- poolMetricsUpdate{
        activeCount: p.ActiveCount(),
    }
    p.Tickets <- ticket
}

func (p *executorPool) ActiveCount() int {
    return p.Max - len(p.Tickets)
}

一次Command的执行的流程

上面把 统计控制器 、 流量控制 、 上报执行状态 讲完了,主要的实现也就讲的差不多了。最后就是串一次command的执行都经历了啥:

err := hystrix.Do("my_command", func() error {
    // talk to other services
    return nil
}, func(err error) error {
    // do this when services are down
    return nil
})

hystrix 在执行一次command的前面也有提到过会调用 GoC 方法,下面我把代码贴出来来, 篇幅问题去掉了一些代码 ,主要逻辑都在。就是在 判断断路器是否已打开 , 得到Ticket 得不到就限流, 执行我们自己的的方法 , 判断context是否Done或者执行是否超时
当然,每次执行结果都要 上报执行状态 ,最后要 返还Ticket

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
    cmd := &command{
        run:      run,
        fallback: fallback,
        start:    time.Now(),
        errChan:  make(chan error, 1),
        finished: make(chan bool, 1),
    }
    //得到断路器,不存在则创建
    circuit, _, err := GetCircuit(name)
    if err != nil {
        cmd.errChan <- err
        return cmd.errChan
    }
    //...
    // 返还ticket
    returnTicket := func() {
        // ...
        cmd.circuit.executorPool.Return(cmd.ticket)
    }
    // 上报执行状态
    reportAllEvent := func() {
        err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
        // ...
    }
    go func() {
        defer func() { cmd.finished <- true }()
        // 查看断路器是否已打开
        if !cmd.circuit.AllowRequest() {
            // ...
            returnOnce.Do(func() {
                returnTicket()
                cmd.errorWithFallback(ctx, ErrCircuitOpen)
                reportAllEvent()
            })
            return
        }
        // ...
        // 获取ticket 如果得不到就限流
        select {
        case cmd.ticket = <-circuit.executorPool.Tickets:
            ticketChecked = true
            ticketCond.Signal()
            cmd.Unlock()
        default:
            // ...
            returnOnce.Do(func() {
                returnTicket()
                cmd.errorWithFallback(ctx, ErrMaxConcurrency)
                reportAllEvent()
            })
            return
        }
        // 执行我们自已的方法,并上报执行信息
        returnOnce.Do(func() {
            defer reportAllEvent()
            cmd.runDuration = time.Since(runStart)
            returnTicket()
            if runErr != nil {
                cmd.errorWithFallback(ctx, runErr)
                return
            }
            cmd.reportEvent("success")
        })
    }()
    // 等待context是否被结束,或执行者超时,并上报
    go func() {
        timer := time.NewTimer(getSettings(name).Timeout)
        defer timer.Stop()

        select {
        case <-cmd.finished:
            // returnOnce has been executed in another goroutine
        case <-ctx.Done():
            // ...
            return
        case <-timer.C:
            // ...
        }
    }()

    return cmd.errChan
}

dashboard 可视化hystrix的上报信息

代码中 StreamHandler 就是把所有 断路器 的状态以流的方式不断的推送到 dashboard . 这部分代码我就不用说了,很简单。

需要在你的服务端加3行代码,启动我们的流服务

hystrixStreamHandler := hystrix.NewStreamHandler()
    hystrixStreamHandler.Start()
    go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)

dashboard 我使用的是 docker 版。

docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest

hystrix-go 源码分析

在下面输入你服务的地址,我是

http://192.168.1.67:81/hystrix.stream

hystrix-go 源码分析

如果是集群可以使用 Turbine 进行监控,有时间大家自己来看吧

hystrix-go 源码分析

查看原文: hystrix-go 源码分析

  • beautifulbutterfly
  • browntiger
  • crazypanda
  • purplegoose
  • whitetiger
  • heavyfish
  • TommyFrederic
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。