一般电商应用的订单队列架构思想

S3Ekin 发布于3月前
0 条问题

作者:林冠宏 / 指尖下的幽灵

博客: http://www.cnblogs.com/linguanh/

GitHub : https://github.com/af913337456/

目录

  • 前序
  • 一般的订单流程
  • 思考瓶颈点
  • 订单队列
    • 第一种订单队列
    • 第二种订单队列
    • 总结
  • 实现队列的选择
  • 解答
  • 实现队列的选择
  • 第二种队列的 Go 版本例子代码

前序

目前的开发工作主要是将 传统电商应用 和 区块链技术 相结合,区块链平台依然是 以太坊 ,此外地,这几天由我编写,经清华大学出版社出版的书籍,历经八月,终于出版上架了,名称是: 《区块链以太坊DApp开发实战》 ,现已可以网购。

本文所要分享的思路就是电商应用中常用的 订单队列 。

一般的订单流程

电商应用中,简单直观的用户从下单到付款,最终完成整个流程的步骤可以用下图表示:

一般电商应用的订单队列架构思想

其中, 订单信息持久化 ,就是存储数据到数据库中。而最终客户端完成支付后的 更新订单状态 的操作是由第三方支付平台进行回调设置好的回调链接 NotifyUrl ,来进行的。

补全订单状态的更新流程,如下图表示:

一般电商应用的订单队列架构思想

思考瓶颈点

服务端的直接 瓶颈点 ,首先要考虑 TPS 。去除细分点,我们主要看 订单信息持久化 瓶颈点。

在高并发业务场景中,例如 秒杀 、 优惠价抢购 等。短时间内的下单请求数会很多,如果 订单信息持久化 部分,不做优化,而是直接对数据库层进行 频繁的 读写操作,数据库会承受不了,容易成为第一个垮掉的服务,比如下图的所示的常规写单流程:

一般电商应用的订单队列架构思想

可以看到, 每 持久化一个订单信息,一般要经历网络连接操作(链接数据库),以及多个 I/O 操作。

得益于 连接池 技术,我们可以在链接数据库的时候,不用每次都重新发起一次完整的HTTP请求,而可以直接从池中获取已打开了的连接句柄,而直接使用,这点和线程池的原理差不多。

此外,我们还可以在上面的流程中加入更多的优化,例如对于一些需要读取的信息,可以事先存置到内存缓存层,并加于更新维护,这样在使用的时候,可以快速读取。

即使我们都具备了上述的一些优化手段,但是对于 写操作 的 I/O 阻塞耗时,在 高并发请求 的时候,依然容易导致数据库承受不住,容易出现 链接多开异常 , 操作超时 等问题。

在该层进行优化的操作,除了上面谈到的之外,还有下面一些手段:

  • 数据库集群,采用读写分离,减少写时压力
  • 分库,不同业务的表放到不同的数据库,会引入分布式事务问题
  • 采用队列模型削峰

每种方式有各自的特点,因为本文谈的是 订单队列 的架构思想,所以下面我们来看下如何在订单系统中引入订单队列。

订单队列

网上有不少文章谈到订单队列的做法,大部分都漏了说明请求与响应的一致性问题。

第一种订单队列 流程图:

一般电商应用的订单队列架构思想

上图是大多文章提到的队列模型,有两个没有解析的问题:

notifyUrl

首先,要肯定的是,上面的订单流程图是没有问题的。它有下面的优缺点,所提到的两个问题也是有解决方案的。

优点:

搭配中间件

缺点:

  • 多订单入队时,② 步骤的处理速度跟不上。从而导致第二点问题。
  • 实现较复杂

上面谈及的问题点,我后面都会给出解决方案。下面我们来看下另外一种订单队列流程图。

第二种订单队列 流程图:

一般电商应用的订单队列架构思想

第二种订单队列的设计模型,注意它的 同步等待 持久化处理的结果,解决了持久化与响应的一致性问题,但是有个严重的耗时等待问题,它的优缺点如下:

优点:

  1. 持久化与响应的强一致性。
  2. 持久化处理,采用排队的先来先处理,不会像上面谈到的高并发请求一起冲击数据库层面的情况。
  3. 实现简单

缺点:

  1. 多订单入队时,持久化单元处理速度跟不上,造成客户端同步等待响应。

这类订单队列,我下面会放出 Golang 实现的版本代码。

总结

对比上面两种常见的订单模型,如果从 用户体验的角度 去优先考虑,第一种不需要用户等待 持久化处理 结果的是明显优于第二种的。如果技术团队完善,且技术过硬,也应该考虑第一种的实现方式。

如果仅仅想要达到 宁愿用户等待到超时 也不愿意存储层服务被冲垮,那么有限考虑第二种。

实现队列的选择

在这里,我们进一步细分一下,实现队列模块的功能有哪些选择。

相信很多后端开发经验比较老道的同志已经想到了,使用现有的中间件,比如知名的 Redis 、 RocketMQ ,以及 Kafka 等,它们都是一种选择。

此外地,我们还可以直接编写代码,在当前的服务系统中实现一个消息队列来达到目的,下面我用图来分类下队列类型。

一般电商应用的订单队列架构思想

不同的队列实现方式,能直接导致不同的功能,也有不同的优缺点:

一级缓存优点:

  1. 一级缓存,最快。无需链接,直接从内存层获取;
  2. 如果不考虑持久化和集群,那么它实现简单。

一级缓存缺点:

  1. 如果考虑持久化和集群,那么它实现比较复杂。
  2. 不考虑持久化情况下,如果服务器断电或其它原因导致服务中断,那么排队中的订单信息将丢失

中间件的优点:

增量

中间件的缺点:

  1. 分布式部署时,需要建立链接通讯,导致读写操作需要走网络通讯。

解答

回到第一种订单模型中:

一般电商应用的订单队列架构思想

问题1:

如果订单存在第三方支付情况,① 和 ② 的一致性如何保证?

首先我们看下,不一致性的时候,会产生什么结果:

响应页面

上述的情况,明显地,只有 3 是需要恢复订单信息的,应对的方案有:

  • 当服务端支付回调接口被第三方支付平台访问时,无法找到对应的订单信息。那么先将这类支付了却没订单信息的数据存储起来先,比如存储到 表A 。同时启动一个 定时任务B 专门遍历表A,然后去订单列表寻找是否已经有了对应的订单信息,有则更新,没则继续,或跟随制定的检测策略走。
  • 当 ② 是由于服务端的 非崩溃性原因 而导致失败时:
    • 失败的时候同时将原始订单数据重新插入到 队列头部 ,等待下一次的重新持久化处理。
  • 当 ② 因服务端的 崩溃性 原因而导致失败时:
    • 定时任务B 在进行了多次检测无果后,那么根据第三方支付平台在回调时候传递过来的 订单附属信息 对订单进行恢复。
  • 整个过程订单恢复的过程,用户查看订单信息为空白。
  • 定时任务B 所在服务 最好 和回调链接 notifyUrl 所在的接口服务一致,这样能保证当 B 挂掉的时候,回调服务也跟随挂掉,然后第三方支付平台在调用回调失败的情况下,他们会有 重试逻辑 ,依赖这个,在回调服务重启时,可以完成订单信息恢复。

一般电商应用的订单队列架构思想

问题2:

如果订单存在第三方支付情况,① 完成了支付,且三方支付平台回调了 notifyUrl,而此时 ② 还在排队等待处理,这种情况又如何处理?

应对的方案参考 问题1 的 定时任务B 检测修改机制。

第二种队列的 Go 版本例子代码

定义一些常量

const (
    QueueOrderKey   = "order_queue"     
    QueueBufferSize = 1024              // 请求队列大小
    QueueHandleTime = time.Second * 7   // 单个 mission 超时时间
)

定义出入队接口,方便多种实现

// 定义出入队接口,方便多种实现
type IQueue interface {
    Push(key string,data []byte) error
    Pop(key string) ([]byte,error)
}

定义请求与响应实体

// 定义请求与响应实体
type QueueTimeoutResp struct {
    Timeout  bool  // 超时标志位
    Response chan interface{}
}
type QueueRequest struct {
    ReqId       string  `json:"req_id"`  // 单次请求 id
    Order       *model.OrderCombine `json:"order"` // 订单信息 bean
    AccessTime  int64   `json:"access_time"` // 请求时间
    ResponseChan *QueueTimeoutResp `json:"-"`
}

定义队列实体

// 定义队列实体
type Queue struct {
    mapLock sync.Mutex
    RequestChan  chan *QueueRequest // 缓存管道,装载请求
    RequestMap   map[string]*QueueTimeoutResp 
    Queue IQueue
}

实例化队列,接收接口参数

// 实例化队列,接收接口参数
func NewQueue(queue IQueue) *Queue {
    return &Queue{
        mapLock:     sync.Mutex{},
        RequestChan: make(chan *QueueRequest, QueueBufferSize),
        RequestMap:  make(map[string]*QueueTimeoutResp, QueueBufferSize),
        Queue:       queue,
    }
}

接收请求

// 接收请求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
    if req.ResponseChan == nil {
        req.ResponseChan = &QueueTimeoutResp{
            Timeout:  false,
            Response: make(chan interface{},1),
        }
    }
    userKey := key(req)  // 唯一 key 生成函数
    req.ReqId = userKey
    q.mapLock.Lock()
    q.RequestMap[userKey] = req.ResponseChan // 内存层存储对应的 req 的 resp 管道指针
    q.mapLock.Unlock()
    q.RequestChan <- req  // 接收请求
    log("userKey : ", userKey)
    ticker := time.NewTicker(QueueHandleTime) // 以超时时间 QueueHandleTime 启动一个定时器
    defer func() {
        ticker.Stop() // 释放定时器
        q.mapLock.Lock()
        delete(q.RequestMap,userKey)  // 当处理完一个 req,从 map 中移出
        q.mapLock.Unlock()
    }()

    select {
    case <-ticker.C:  // 超时
        req.ResponseChan.Timeout = true 
        Queue_TimeoutCounter++  // 辅助计数,int 类型
        log("timeout: ",userKey)
        return lghError.HandleTimeOut  // 返回超时错误的信息
    case result := <-req.ResponseChan.Response:  // req 被完整处理
        return result
    }
}

从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行

// 从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行
func (q *Queue) addToQueue() {
    for {
        req := <-q.RequestChan // 取出一个 req
        data, err := json.Marshal(req)
        if err != nil {
            log("redis queue parse req failed : ", err.Error())
            continue
        }
        if err = q.Queue.Push(QueueOrderKey, data);err != nil {  // push 入队,这里有时间消耗
            log("lpush req failed. Error : ", err.Error())
            continue
        }
        log("lpush req success. req time: ", req.AccessTime)
    }
}

取出 req 处理,该函数在 gorutine 中运行

// 取出 req 处理,该函数在 gorutine 中运行
func (q *Queue) readFromQueue() {
    for {
        data, err := q.Queue.Pop(QueueOrderKey) // pop 出队,这里也有时间消耗
        if err != nil {
            log("lpop failed. Error : ", err.Error())
            continue
        }
        if data == nil || len(data) == 0 {
            time.Sleep(time.Millisecond * 100) // 空数据的 req,停顿下再取
            continue
        }
        req := &QueueRequest{}
        if err = json.Unmarshal(data, req);err != nil {
            log("Lpop: json.Unmarshal failed. Error : ", err.Error())
            continue
        }
        userKey := req.ReqId
        q.mapLock.Lock()
        resultChan, ok := q.RequestMap[userKey] // 取出对应的 resp 管道指针
        q.mapLock.Unlock()
        if !ok {
            // 中间件重启时,比如 redis 重启而读取旧 key,会进入这里
            Queue_KeyNotFound ++ // 计数 int 类型
            log("key not found, rollback: ", userKey)
            continue
        }
        simulationTimeOutReq4(req) // 模拟出来任务的函数,入参为 req 
        if resultChan.Timeout {
            // 处理期间,已经超时,这里做可以拓展回滚操作
            Queue_MissionTimeout ++
            log("handle mission timeout: ", userKey)
            continue
        }
        log("request result send to chan succeee, userKey : ", userKey)
        ret := util.GetCommonSuccess(req.AccessTime)
        resultChan.Response <- &ret // 输入处理成功
    }
}

启动

func (q *Queue) Start()  {
    go q.addToQueue()
    go q.readFromQueue()
}

运行例子

func test(){
    ...
    runtime.GOMAXPROCS(4)
    redisQueue := NewQueue(NewFastCacheQueue())
    redisQueue.Start()
    reqNumber := testReqNumber
    wg := sync.WaitGroup{}
    wg.Add(reqNumber)
    for i :=0;i<reqNumber;i++ {
        go func(index int) {
            combine := model.OrderCombine{}
            ret := AcceptRequest(&QueueRequest{
                UserId:       int64(index),
                Order:        &combine,
                AccessTime:   time.Now().Unix(),
                ResponseChan: nil,
            })
            fmt.Println("ret: ------------- ",ret.String())
            wg.Done()
        }(i)
    }
    wg.Wait()
    time.Sleep(3*time.Second)
    fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}

最后上传一张书籍图片

一般电商应用的订单队列架构思想

查看原文: 一般电商应用的订单队列架构思想

  • tinytiger
  • redwolf
  • purplecat
  • beautifuldog
  • goldenswan
  • blackpeacock
  • purpleostrich
  • tinygoose
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。