我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:
期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。
完整示例代码:https://github.com/nivin-studio/go-zero-mall
首先,我们来看一下整体的服务拆分图:
DTM
介绍DTM 是一款 golang
开发的分布式事务管理器,解决了跨数据库、跨服务、跨语言栈更新数据的一致性问题。
绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。
而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题
go-zero
使用 DTM
首先我们回顾下 第五章 订单服务 中 order rpc
服务中 Create
接口处理逻辑。方法里判断了用户和产品的合法性,以及产品库存是否充足,最后通过 OrderModel
创建了一个新的订单,以及调用 product rpc
服务 Update
的接口更新了产品的库存。
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return nil, err } // 查询产品是否存在 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{ Id: in.Pid, }) if err != nil { return nil, err } // 判断产品库存是否充足 if productRes.Stock <= 0 { return nil, status.Error(500, "产品库存不足") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } res, err := l.svcCtx.OrderModel.Insert(&newOrder) if err != nil { return nil, status.Error(500, err.Error()) } newOrder.Id, err = res.LastInsertId() if err != nil { return nil, status.Error(500, err.Error()) } _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{ Id: productRes.Id, Name: productRes.Name, Desc: productRes.Desc, Stock: productRes.Stock - 1, Amount: productRes.Amount, Status: productRes.Status, }) if err != nil { return nil, err } return &order.CreateResponse{ Id: newOrder.Id, }, nil }
之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。
因为这里的产品库存更新是跨服务操作的,也没有办法使用本地事务来处理,所以我们需要使用分布式事务来处理它。这里我们需要借助 DTM
的 SAGA
协议来实现订单创建和产品库存更新的跨服务分布式事务操作。
大家可以先移步到 DTM
的文档先了接下 SAGA事务模式。
DTM
服务配置参见 第一章 环境搭建,修改 dtm->config.yml
配置文件。我们只要修改 MicroService
中的 Target
,EndPoint
配置即可,将 dtm
注册到 etcd
中。
# ...... # 微服务 MicroService: Driver: 'dtm-driver-gozero' # 要处理注册/发现的驱动程序的名称 Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址 EndPoint: 'dtm:36790' # ......
dtm_barrier
数据表微服务是一个分布式系统,因此可能发生各种异常,例如网络抖动导致重复请求,这类的异常会让业务处理异常复杂。而 DTM
中,首创了 子事务屏障 技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。
使用 DTM
提供的子事务屏障技术则需要在业务数据库中创建子事务屏障相关的表,建表语句如下:
create database if not exists dtm_barrier /*!40100 DEFAULT CHARACTER SET utf8mb4 */ ; drop table if exists dtm_barrier.barrier; create table if not exists dtm_barrier.barrier( id bigint(22) PRIMARY KEY AUTO_INCREMENT, trans_type varchar(45) default '', gid varchar(128) default '', branch_id varchar(128) default '', op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '' comment 'the branch type who insert this record', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time), UNIQUE key(gid, branch_id, op, barrier_id) );
注意:库名和表名请勿修改,如果您自定义了表名,请在使用前调用
dtmcli.SetBarrierTableName
。
OrderModel
和 ProductModel
在每一个子事务中,很多操作逻辑,需要使用到本地事务,所以我们添加一些 model
方法兼容 DTM
的子事务屏障
$ vim mall/service/order/model/ordermodel.go
package model ...... type ( OrderModel interface { TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) TxUpdate(tx *sql.Tx, data *Order) error } ) ...... func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet) ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status) return ret, err } func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error { productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder) return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id) }, productIdKey) return err } func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) { var resp Order query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table) err := m.QueryRowNoCache(&resp, query, uid) switch err { case nil: return &resp, nil case sqlc.ErrNotFound: return nil, ErrNotFound default: return nil, err } }
$ vim mall/service/product/model/productmodel.go
package model ...... type ( ProductModel interface { TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) } ) ...... func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) { productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id) return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table) return tx.Exec(query, delta, delta, id) }, productIdKey) }
product rpc
服务添加 DecrStock
, DecrStockRevert
接口方法
我们需要为 product rpc
服务添加 DecrStock
、DecrStockRevert
两个接口方法,分别用于产品库存更新 和 产品库存更新的补偿。
$ vim mall/service/product/rpc/product.proto
syntax = "proto3"; package productclient; option go_package = "product"; ...... // 减产品库存 message DecrStockRequest { int64 id = 1; int64 num = 2; } message DecrStockResponse { } // 减产品库存 service Product { ...... rpc DecrStock(DecrStockRequest) returns(DecrStockResponse); rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse); }
提示:修改后使用 goctl 工具重新生成下代码。
实现 DecrStock
接口方法
在这里只有库存不足时,我们不需要再重试,直接回滚。
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic import ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type DecrStockLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic { return &DecrStockLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1) if err != nil { return err } affected, err := result.RowsAffected() // 库存不足,返回子事务失败 if err == nil && affected == 0 { return dtmcli.ErrFailure } return err }) // 这种情况是库存不足,不再重试,走回滚 if err == dtmcli.ErrFailure { return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } if err != nil { return nil, err } return &product.DecrStockResponse{}, nil }
实现 DecrStockRevert
接口方法
在 DecrStock
接口方法中,产品库存是减去指定的数量,在这里我们把它给加回来。这样产品库存就回到在 DecrStock
接口方法减去之前的数量。
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic import ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type DecrStockRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic { return &DecrStockRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1) return err }) if err != nil { return nil, err } return &product.DecrStockResponse{}, nil }
order rpc
服务添加 CreateRevert
接口方法
order rpc
服务中已经有 Create
接口方法、我们需要创建它的补偿接口方法 DecrStockRevert
。
$ vim mall/service/order/rpc/order.proto
syntax = "proto3"; package orderclient; option go_package = "order"; ...... service Order { rpc Create(CreateRequest) returns(CreateResponse); rpc CreateRevert(CreateRequest) returns(CreateResponse); ...... }
提示:修改后使用 goctl 工具重新生成下代码。
修改 Create
接口方法
原来 Create
接口方法中产品库存判断和更新操作,我们已经在 product rpc
DecrStock
接口方法中实现了,所以我们这里只要创建订单一个操作即可。
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic import ( "context" "database/sql" "fmt" "mall/service/order/model" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type CreateLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic { return &CreateLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } // 创建订单 _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder) if err != nil { return fmt.Errorf("订单创建失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil }
实现 CreateRevert
接口方法
在这个接口中我们查询用户刚刚创建的订单,把订单的状态改为 9(无效状态)
。
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic import ( "context" "database/sql" "fmt" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type CreateRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic { return &CreateRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } // 查询用户最新创建的订单 resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid) if err != nil { return fmt.Errorf("订单不存在") } // 修改订单状态9,标识订单已失效,并更新订单 resOrder.Status = 9 err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder) if err != nil { return fmt.Errorf("订单更新失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil }
order api
服务我们把 order rpc
服务 Create
、CreateRevert
接口方法,product rpc
服务 DecrStock
、DecrStockRevert
接口方法,提到 order api
服务中做成一个以 SAGA事务模式
的分布式事务操作。
pproduct rpc
依赖配置$ vim mall/service/order/api/etc/order.yaml
Name: Order Host: 0.0.0.0 Port: 8002 ...... OrderRpc: Etcd: Hosts: - etcd:2379 Key: order.rpc ProductRpc: Etcd: Hosts: - etcd:2379 Key: product.rpc
pproduct rpc
服务配置的实例化$ vim mall/service/order/api/internal/config/config.go
package config import ( "github.com/tal-tech/go-zero/rest" "github.com/tal-tech/go-zero/zrpc" ) type Config struct { rest.RestConf Auth struct { AccessSecret string AccessExpire int64 } OrderRpc zrpc.RpcClientConf ProductRpc zrpc.RpcClientConf }
pproduct rpc
的依赖$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc import ( "mall/service/order/api/internal/config" "mall/service/order/rpc/orderclient" "mall/service/product/rpc/productclient" "github.com/tal-tech/go-zero/zrpc" ) type ServiceContext struct { Config config.Config OrderRpc orderclient.Order ProductRpc productclient.Product } func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, OrderRpc: orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)), ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)), } }
gozero
的 dtm
驱动$ vim mall/service/order/api/order.go
package main import ( ...... _ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动 ) var configFile = flag.String("f", "etc/order.yaml", "the config file") func main() { ...... }
order api
Create
接口方法$ vim mall/service/order/api/internal/logic/createlogic.go
package logic import ( "context" "mall/service/order/api/internal/svc" "mall/service/order/api/internal/types" "mall/service/order/rpc/order" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "google.golang.org/grpc/status" ) type CreateLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic { return CreateLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) { // 获取 OrderRpc BuildTarget orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创建异常") } // 获取 ProductRpc BuildTarget productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创建异常") } // dtm 服务的 etcd 注册地址 var dtmServer = "etcd://etcd:2379/dtmservice" // 创建一个gid gid := dtmgrpc.MustGenGid(dtmServer) // 创建一个saga协议的事务 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid). Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{ Uid: req.Uid, Pid: req.Pid, Amount: req.Amount, Status: 0, }). Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{ Id: req.Pid, Num: 1, }) // 事务提交 err = saga.Submit() if err != nil { return nil, status.Error(500, err.Error()) } return &types.CreateResponse{}, nil }
提示:
SagaGrpc.Add
方法第一个参数action
是微服务grpc
访问的方法路径,这个方法路径需要分别去以下文件中寻找。 <br>mall/service/order/rpc/order/order.pb.go
<br>mall/service/product/rpc/product/product.pb.go
<br> 按关键字Invoke
搜索即可找到。
go-zero
+ DTM
postman
调用 /api/product/create
接口,创建一个产品,库存 stock
为 1
。postman
调用 /api/order/create
接口,创建一个订单,产品ID pid
为 1
。1
已经变成了 0
。barrier
里的数据,我们可以看出两个服务的操作均已经完成。1
的库存已经是 0
了, 使用 postman
调用 /api/order/create
接口,再创建一个订单。2
产品ID为 1
的数据,它的订单数据状态为 9
。barrier
里的数据,我们可以看出(gid = fqYS8CbYbK8GkL8SCuTRUF)
第一个服务(branch_id = 01)
子事务屏障操作是正常,第二个服务(branch_id = 02)
子事务屏障操作失败,要求补偿。于是两个服务都发生了补偿的操作记录。这个分布式事务的操作流程
DTM
服务会调 order rpc
Create
接口进行创建订单处理。DTM
服务再调 product rpc
DecrStock
接口,这个接口的里通过 pid
更新产品库存,因产品库存不足,抛出事务失败。DTM
服务发起补偿机制,调 order rpc
CreateRevert
接口进行订单的补偿处理。DTM
服务发起补偿机制,调 product rpc
DecrStockRevert
接口进行产品库存更新的补偿处理。但是因为在 product rpc
DecrStock
接口的子事务屏障内,业务处理并未成功。所以在 DecrStockRevert
接口里不会执行子事务屏障内的业务逻辑。1
库存修改为100,然后在 product rpc
DecrStock
接口方法中子事务屏障外,人为的制造异常失败。postman
调用 /api/order/create
接口,再创建一个订单,产品ID pid
为 1
。3
的订单,它的订单数据状态为 9
。产品数据表ID为 1
的产品,它的库存还是 100
且数据更新时间也发生了变化。barrier
里的数据,我们可以看出(gid = ZbjYHv2jNra7RMwyWjB5Lc)
第一个服务(branch_id = 01)
子事务屏障操作是正常,第二个服务(branch_id = 02)
子事务屏障操作也是正常。因为在 product rpc
DecrStock
接口方法中子事务屏障外,我们人为的制造异常失败,所以两个服务发生了补偿的操作记录。大家可以对比下 测试分布式事务失败流程1 与 测试分布式事务失败流程2 不同之处,是不是能发现和体会到 DTM
的这个子事务屏障技术的强大之处。
子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。
https://github.com/zeromicro/go-zero
https://gitee.com/kevwan/go-zero
欢迎使用 go-zero
并 star 支持我们!
关注『微服务实践』公众号并点击 交流群 获取社区群二维码。
|