一行一行阅读解释这段代码:
package worker
import (
“context”
“errors”
“fmt”
“strconv”
“time”
"trade/internal/batchjobfund/constant"
"trade/internal/batchjobfund/dao"
"trade/internal/common/api/mktdata"
mktconstant "trade/internal/common/api/mktdata/constant"
"trade/internal/common/api/mktwealth"
mktwealthconstant "trade/internal/common/api/mktwealth/constant"
mktmapping "trade/internal/common/api/mktwealth/mapping"
"trade/internal/common/client"
"trade/internal/common/client/dingtalk"
"trade/internal/common/logic/msgpush"
cmapping "trade/internal/common/mapping"
"trade/internal/pkg/env"
"trade/internal/pkg/log"
custodianconstant "trade/internal/pkg/newconstant/custodian"
orderconstant "trade/internal/pkg/newconstant/order"
"trade/internal/pkg/worker"
pkgconstant "trade/pkg/constant"
model "trade/pkg/model/wealth"
"trade/pkg/utils"
trademodel "trade/internal/pkg/model/tradedb"
utildd "trade/pkg/utils/dingding"
"github.com/gogf/gf/v2/util/gconv"
"github.com/golang-module/carbon/v2"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/spf13/viper"
"gl.fotechwealth.com.local/backend/trade-lib.git/mysql"
"gorm.io/gorm"
)
type ConsolidateOrderWorker struct {
commandArgs []string
config *viper.Viper
dingTalkRestyClient dingtalk.RestyClient
mktDataClient mktdata.Client
mktWealthClient mktwealth.Client
}
func NewConsolidateOrderWorker(
commandArgs []string,
config *viper.Viper,
restyClient client.CommRestyClient,
mktDataClient mktdata.Client, mktWealthClient mktwealth.Client) worker.Worker {
return &ConsolidateOrderWorker{
commandArgs: commandArgs,
config: config,
dingTalkRestyClient: restyClient.DingTalk(),
mktDataClient: mktDataClient,
mktWealthClient: mktWealthClient,
}
}
// s1 获取合单类型 cots from client-orders
// s2 cots -> cot, 取 cot 类型的所有 client-orders cos
// s3 cos -> 合单 co
// s4 save co to DB
// prefer
// s1 [client-orders] -> co. 如无, s5
// s2 计算出 co 的(合单)类型 cot
// s3 save co to mapCOS[cot]
// s4 go to s1
// s5 save mapCOS to DB
/*
*待合单订单: 订单状态:确认下单
*cot: 产品 + 交易方向(买/卖) + 交易方式(金额/数量)
*/
type nomineeOrderInfo struct {
OrderType int json:"orderType" // 订单类型 1-申购 2-赎回
TradeMode int json:"tradeMode" // 交易方式 1-金额 2-数量
ProductCode string json:"productCode" // 产品代码
ProductType int json:"productType" // 产品类型:1-公募,2-私募,3-债券,4-票据 9-现金宝
ProductName string json:"productName" // 产品名称
Currency string json:"currency" // 币种
Symbol string json:"symbol" // 产品代码
ExpectSettleDate *time.Time json:"expectSettleDate"
ISIN string json:"isin" // ISIN
Direction int json:"direction" // 方向: 1-买入,2-卖出
FeeRate string
stat
}
type stat struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}
// // problematicOrderInfo 存储有问题的合单类型信息
// type problematicOrderInfo struct {
// ProductName string // 上手产品名称
// SubOrderIDs []int // 子订单ID列表
// }
type userOrderInfo struct {
OrderId int
BadOrderInfo bool
}
// processOrdersResult 结构体用于存储 processOrders 函数的处理结果
type processOrdersResult struct {
MapNomiOdr map[string]*nomineeOrderInfo // 存储每个合单类型的汇总信息
MapNomiOSubOrders map[string][]userOrderInfo // 存储每个合单类型对应的原始订单ID列表
}
func (t *ConsolidateOrderWorker) Run(ctx context.Context) error {
// 设置查询参数
fundType := t.commandArgs[0] // 定时任务的执行flag参数,决定查询MonetaryOrder还是NotMonetaryOrder
var offset int // 自动分配 const offset = 0
const limit = 100
// 初始化集合订单分类对象
var processOrdersResult processOrdersResult
processOrdersResult.MapNomiOdr = make(map[string]*nomineeOrderInfo)
processOrdersResult.MapNomiOSubOrders = make(map[string][]userOrderInfo)
for {
// Step1: 获取一批订单
wealthOrders, err := t.getCanConsolidateOrders(ctx, fundType, offset, limit)
if err != nil {
log.Fatalf("Error fetching orders: %v", err)
}
// 如果没有获取到订单,说明已经处理完毕
if len(wealthOrders) == 0 {
break
}
// 对订单进行基金节假日过滤
var (
nowTime = carbon.Now()
startDate = nowTime.ToDateStruct()
)
// TODO alex-todo是否会unique
symbols := lo.Map[trademodel.TWealthOrder, string](wealthOrders, func(item trademodel.TWealthOrder, index int) string {
return item.ProductSymbol
})
productHolidays, err := t.mktWealthClient.GetFundHolidays(ctx, &mktmapping.FundHoliadayParams{
Symbols: symbols,
StartDate: startDate.ToDateString(),
EndDate: startDate.AddDays(int(mktwealthconstant.FundHolidaySpan)).ToDateString(),
})
if err != nil {
log.Errorf("fail to get fund holidays,%s", err)
return err
}
wealthOrders, err = (trademodel.TWealthOrders)(wealthOrders).FiltedByFundHoliday(nowTime.ToStdTime(), productHolidays)
if err != nil {
log.Errorf("filtered by fund holiday error,%s", err)
continue
}
// Step2: 处理这批订单
// 补充遇到有问题的订单存入problem struct并且包含子订单 id 全部存入
processResult, err := t.processOrders(ctx, wealthOrders, productHolidays)
if err != nil {
log.Errorf("Error processing orders: %v", err)
return err
}
// 累积处理得到 step3 需要得对象
for k, v := range processResult.MapNomiOdr {
processOrdersResult.MapNomiOdr[k] = v
}
for k, v := range processResult.MapNomiOSubOrders {
processOrdersResult.MapNomiOSubOrders[k] = append(processOrdersResult.MapNomiOSubOrders[k], v...)
}
// 更新offset,准备获取下一批订单
// 这里要防止陷入死循环
offset += limit
}
// 转换成map[string][]int给step3使用
MapNomiOSubOrders := make(map[string][]int)
for key, userOrderInfos := range processOrdersResult.MapNomiOSubOrders {
// 创建一个新的切片来存储该合单类型的所有订单ID
orderIDs := make([]int, 0, len(userOrderInfos))
// 遍历userOrderInfos并收集每个OrderId
for _, userOrderInfo := range userOrderInfos {
orderIDs = append(orderIDs, userOrderInfo.OrderId)
}
// 将收集到的订单ID切片存储到mapOrderIds中
MapNomiOSubOrders[key] = orderIDs
}
// problematicOrderInfos := make(map[string]problematicOrderInfo)
// var isSendMsgToDingDing = processOrdersResult.IsSuccessful
// isSendMsgToDingDing := true
// Step3: 对处理的订单进行数据库操作(事务处理)
for k, v := range processOrdersResult.MapNomiOdr {
// if !isSendMsgToDingDing {
// problematicOrderInfos[k] = problematicOrderInfo{
// ProductName: v.ProductName,
// SubOrderIDs: processOrdersResult.MapNomiOSubOrders[k],
// }
// continue
// }
err := t.processAndSaveNomineeOrders(ctx, k, v,MapNomiOSubOrders, fundType)
if err != nil {
log.Errorf("Error saving consolidated orders: %v", err)
return err
}
}
// Step4: 发送告警 在总结构体里面报警
// if problematicOrderInfo != {
// for _, problematicOrder := range problematicOrderInfos {
// t.sendMsgToDingDing(ctx, "存在问题合单类型告警", fmt.Sprintf("产品名称: %s, 子订单ID列表: %v", problematicOrder.ProductName, problematicOrder.SubOrderIDs))
// }
// }
return nil
}
// getCanConsolidateOrders 根据基金类型、offset 和 limit 获取可合单的订单
func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) {
// Step1: 一次性拿出指定数量的订单在内存用来处理合单逻辑
// 根据fundType、offset和limit从数据库中获取订单
var (
tWealthOrder []model.TWealthOrder
err error
)
if fundType == constant.WorkerArgFundTypeMonetary {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_ECASH, orderconstant.W_PRODUCT_TYPE_PUB_FUND) // 只查货币型基金
if err != nil {
log.Fatal(err.Error())
}
} else {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateNotMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_PUB_FUND) // 只查非货币型基金
if err != nil {
log.Fatal(err.Error())
}
}
// 拿完订单切片返回
return tWealthOrder, nil
}
// processOrders 对一批订单进行处理
func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) {
// 构建合单类型映射、处理统计信息、计算预计结算日等逻辑
// Step2: 遍历客户订单, 在代码中计算合单
mapNomiOSubOrders := make(map[string][]userOrderInfo) // 存储每个合单类型对应的原始订单ID列表
mapNomiOdr := make(map[string]*nomineeOrderInfo) // 存储每个合单类型的汇总信息,键由产品代码、交易方向和交易模式组成
// 构建订单的合单类型映射
// 在外面做完筛选,在里面只做合单
for _, record := range orders {
ot := record.ProductCode + "," + strconv.Itoa(record.Direction)
if _, ok := mapNomiOdr[ot]; !ok {
no := &nomineeOrderInfo{
Currency: record.Currency,
Direction: record.Direction,
ISIN: record.ISIN,
Symbol: record.ProductSymbol,
OrderType: record.Direction,
ProductCode: record.ProductCode,
ProductName: record.ProductName,
ProductType: record.ProductType,
TradeMode: record.TradeMode,
FeeRate: record.FeeRate,
stat: struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}{
ApplyAmount: decimal.Zero,
ApplyQuantity: decimal.Zero,
Fee: decimal.Zero,
SourceType: record.SourceType, // 第一个客户订单的订单类型
},
}
mapNomiOdr[ot] = no
mapNomiOSubOrders[ot] = []userOrderInfo{{OrderId: record.Id, BadOrderInfo: false}}
} else {
// 将当前订单ID添加到对应的合单类型中
mapNomiOSubOrders[ot] = append(mapNomiOSubOrders[ot], userOrderInfo{OrderId: record.Id, BadOrderInfo: false})
}
// 统计信息处理
no := mapNomiOdr[ot]
netAmountD, _ := decimal.NewFromString(record.ApplyNetAmount)
quantityD, _ := decimal.NewFromString(record.ApplyQuantity)
feeAmountD, _ := decimal.NewFromString(record.ApplyFeeAmount)
// 只要存在不一致就是混合下单
if record.SourceType != no.stat.SourceType {
no.stat.SourceType = orderconstant.OrderSourceTypeMix
}
if no.TradeMode == orderconstant.W_TRADE_MODE_AMOUNT {
no.stat.ApplyAmount = no.stat.ApplyAmount.Add(netAmountD)
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
} else {
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
}
no.stat.Fee = no.stat.Fee.Add(feeAmountD)
productBasicInfo, err := t.mktWealthClient.GetProductBasicInfo(ctx, mktmapping.ProductBasicInfoParams{
ProductCode: record.ProductCode,
Channel: record.Channel,
})
if err != nil {
// TODO alex-todo 日志打印
// TODO 需要记录失败
// l.Errorf("fail to get fund holidays,%s", err)
continue
}
settlementDay := productBasicInfo.GetSettlementDayByDirection(record.Direction)
expectSettledDate, err := t.getExpectSettledDateV3(ctx, time.Now(), no.Symbol, settlementDay, fundHoliadays)
if err != nil {
// t.sendMsgToDingDing(ctx, "预计结算日计算失败", fmt.Sprintf("订单ID: %d, 产品名称: %s", record.Id, record.ProductName))
log.Errorf("t.getExpectSettledDate failed: %v", err)
continue
} else {
no.ExpectSettleDate = &expectSettledDate
}
}
processResult := &processOrdersResult{
MapNomiOdr: mapNomiOdr,
MapNomiOSubOrders: mapNomiOSubOrders,
}
return processResult, nil
}
// processAndSaveNomineeOrders 处理订单并将其保存到数据库
func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error {
// … 实现事务逻辑,包括生成集合订单、子订单、更新原始订单状态等 …
// nomiOrder 集合订单
nomiOrder := new(model.TWealthNomineeOrder)
tnow := time.Now()
nomiOrder.IsMonetaryFund = 0 // 0 非货币基金
if fundType == constant.WorkerArgFundTypeMonetary { // 货币型基金
nomiOrder.IsMonetaryFund = 1
}
nomiOrder.ApplyAmount = v.stat.ApplyAmount.String()
nomiOrder.ApplyQuantity = v.stat.ApplyQuantity.String()
nomiOrder.Fee = v.stat.Fee.String()
gconv.Struct(v, nomiOrder)
if nomiOrder.ISIN == "" {
nomiOrder.ISIN = nomiOrder.ProductCode
}
nomiOrder.OrderStatus = orderconstant.WealthNomineeOrderStatusWaiting
nomiOrder.SourceType = v.stat.SourceType.Value()
nomiOrder.SubmitTime = utils.GetPointerTime(tnow)
nomiOrder.ExpectSettleDate = v.ExpectSettleDate
// 生成订单号
orderNumberNo, err := dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
}
// no.OrderNumber 集合订单号
nomiOrder.OrderNumber = generateNomineeOrderNumber(orderNumberNo, nomiOrder.OrderType)
// https://www.tapd.cn/60236733/markdown_wikis/show/#1160236733001004373@toc2
// https://www.tapd.cn/60236733/markdown_wikis/show/#1160236733001004557@toc0
// 针对赎回订单可能会存在多上手的情况这里做了多集合订单的处理
nomiOrder, subOrderList, err := NomineeOrderSplit(ctx, nomiOrder)
if err != nil {
log.Errorf(err.Error())
return err
}
// 子集合订单生成OrderNumber
for i := range subOrderList {
var subOrderNumberNo int
subOrderNumberNo, err = dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
return err
}
subOrderList[i].OrderNumber = generateNomineeOrderNumber(subOrderNumberNo, nomiOrder.OrderType)
}
nomineeOrderList := append(subOrderList, *nomiOrder)
if err = t.iFastOrderWarn(ctx, nomineeOrderList); err != nil {
log.Error(err.Error())
return err
}
// start updating db
tx := mysql.WithContext(ctx, dao.Dao.Basedb).Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit().Error
if err != nil {
log.Errorf("tx commit error:%v", err.Error())
}
}
}()
if len(subOrderList) > 0 {
err = tx.Create(subOrderList).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
}
err = tx.Create(nomiOrder).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
err = batchFrozenCustodianHolding(ctx, tx, nomineeOrderList)
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
subOrders := mapNomiOSubOrders[key]
skip := false
for _, id := range subOrders {
order := new(model.TWealthOrder)
order.Id = id
order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
order.NomineeOrderId = nomiOrder.Id
order.ExpectSettleDate = nomiOrder.ExpectSettleDate // 预计结算日
err = tx.Updates(order).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
skip = true
break
}
}
if skip {
return err
}
return nil
}
包与导入
1 | package worker |
首先,导入了多个包,包括标准库包和项目内部的包。这些包提供了上下文管理、错误处理、日志记录、数据库操作、HTTP客户端和其他实用工具。
ConsolidateOrderWorker 结构体
1 | type ConsolidateOrderWorker struct { |
定义了一个 ConsolidateOrderWorker 结构体,包含了命令参数、配置对象以及市场数据和财富数据的客户端。
NewConsolidateOrderWorker 函数
1 | func NewConsolidateOrderWorker( |
创建并返回一个新的 ConsolidateOrderWorker 实例。
ConsolidateOrderWorker 的 Run 方法
1 | func (t *ConsolidateOrderWorker) Run(ctx context.Context) error { |
Run 方法是 ConsolidateOrderWorker 的核心方法,负责执行整个合单处理流程。主要步骤包括:
- 设置查询参数。
- 初始化结果存储结构。
- 循环获取和处理订单。
- 处理订单并保存到数据库。
getCanConsolidateOrders 方法
1 | func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) { |
根据基金类型、偏移量和限制获取可以合单的订单。
processOrders 方法
1 | func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) { |
processOrders 方法对获取的一批订单进行处理,核心步骤如下:
- 初始化用于存储订单的合单类型映射。
- 遍历订单,按照产品代码和交易方向创建或更新合单类型。
- 统计订单的金额、数量和费用信息。
- 获取产品的结算日并计算预计结算日。
processAndSaveNomineeOrders 方法
1 | func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error { |
processAndSaveNomineeOrders 方法处理并保存合单订单,主要步骤包括:
- 初始化合单订单对象并设置相关属性。
- 生成合单订单号。
- 拆分合单订单(如果需要)。
- 开始数据库事务并保存订单数据。
- 更新原始订单状态。
getCanConsolidateOrders 方法
1 | func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) { |
getCanConsolidateOrders 方法根据基金类型、偏移量和限制从数据库中获取可以合单的订单。
nomineeOrderInfo 结构体
1 | type nomineeOrderInfo struct { |
nomineeOrderInfo 结构体用于存储合单订单的信息,包括订单类型、交易模式、产品代码、产品类型、产品名称、币种、产品代码、预计结算日、ISIN、方向和费用率等属性,以及统计信息 stat。
userOrderInfo 结构体
1 | type userOrderInfo struct { |
userOrderInfo 结构体用于存储用户订单的信息,包括订单ID和是否为问题订单的标志。
processOrdersResult 结构体
1 | type processOrdersResult struct { |
processOrdersResult 结构体用于存储 processOrders 函数的处理结果,包括每个合单类型的汇总信息和每个合单类型对应的原始订单ID列表。
我现在想用userOrderInfo 结构体来保存在processAndSaveNomineeOrders 方法和processOrders 方法里面处理订单出错的时候的订单元数据信息
type userOrderInfo struct {
OrderId int
BadOrderInfo bool
}
要在 processOrders 方法和 processAndSaveNomineeOrders 方法中使用 userOrderInfo 结构体来保存处理订单出错时的订单元数据信息,你需要在订单处理出错时设置 BadOrderInfo 为 true 并将该信息保存在相应的结构中。
修改 processOrders 方法
在 processOrders 方法中,当获取产品基本信息或计算预计结算日出错时,将 BadOrderInfo 设置为 true 并记录订单信息。
1 | func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) { |
修改 processAndSaveNomineeOrders 方法
在 processAndSaveNomineeOrders 方法中,当更新订单状态出错时,将 BadOrderInfo 设置为 true 并记录订单信息。
1 | func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error { |
在 processAndSaveNomineeOrders 方法中,更新订单状态时,如果出错,将对应的 userOrderInfo 的 BadOrderInfo 设置为 true 并记录该订单的信息。这样就可以在处理订单出错时保留订单元数据信息。
通过以上修改,你就可以在 processOrders 和 processAndSaveNomineeOrders 方法中记录处理订单出错时的订单元数据信息。
这一段代码报错:userOrder.OrderId undefined (type int has no field or method OrderId)
请你综合整段代码的逻辑修复这个错误
for _, id := range subOrders {
order := new(model.TWealthOrder)
order.Id = id
order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
order.NomineeOrderId = nomiOrder.Id
order.ExpectSettleDate = nomiOrder.ExpectSettleDate // 预计结算日
err = tx.Updates(order).Error
if err != nil {
log.Error(err.Error())
for i, userOrder := range mapNomiOSubOrders[key] {
if userOrder.OrderId == id {
mapNomiOSubOrders[key][i].BadOrderInfo = true
break
}
}
tx.Rollback()
skip = true
break
}
}
根据错误信息和代码逻辑,可以看出 subOrders 的元素是 int 类型的订单ID,而 userOrder 是 userOrderInfo 类型,包含 OrderId 和 BadOrderInfo 字段。
为了修复这个错误,需要将 subOrders 的类型从 []int 改为 []userOrderInfo。这样在处理订单时可以直接访问 OrderId 和 BadOrderInfo 字段。
修改 processAndSaveNomineeOrders 方法
将 subOrders 的类型改为 []userOrderInfo,并在更新订单状态出错时设置 BadOrderInfo 为 true。
1 | func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]userOrderInfo, fundType string) error { |
修改 processOrders 方法
在 processOrders 方法中,确保 mapNomiOSubOrders 中保存的是 userOrderInfo 类型。
1 | func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) { |
通过以上修改,你可以在 processAndSaveNomineeOrders 方法和 processOrders 方法中处理订单出错时记录订单元数据信息。