一行一行阅读解释这段代码:
package worker

import (
“context”
“errors”
“fmt”
“strconv”
“time”

"trade/internal/batchjobfund/constant"
"trade/internal/batchjobfund/dao"
"trade/internal/common/api/mktdata"
mktconstant "trade/internal/common/api/mktdata/constant"
"trade/internal/common/api/mktwealth"
mktwealthconstant "trade/internal/common/api/mktwealth/constant"
mktmapping "trade/internal/common/api/mktwealth/mapping"
"trade/internal/common/client"
"trade/internal/common/client/dingtalk"
"trade/internal/common/logic/msgpush"
cmapping "trade/internal/common/mapping"
"trade/internal/pkg/env"
"trade/internal/pkg/log"
custodianconstant "trade/internal/pkg/newconstant/custodian"
orderconstant "trade/internal/pkg/newconstant/order"
"trade/internal/pkg/worker"
pkgconstant "trade/pkg/constant"
model "trade/pkg/model/wealth"
"trade/pkg/utils"

trademodel "trade/internal/pkg/model/tradedb"
utildd "trade/pkg/utils/dingding"

"github.com/gogf/gf/v2/util/gconv"
"github.com/golang-module/carbon/v2"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/spf13/viper"
"gl.fotechwealth.com.local/backend/trade-lib.git/mysql"
"gorm.io/gorm"

)

type ConsolidateOrderWorker struct {
commandArgs []string
config *viper.Viper
dingTalkRestyClient dingtalk.RestyClient
mktDataClient mktdata.Client
mktWealthClient mktwealth.Client
}

func NewConsolidateOrderWorker(
commandArgs []string,
config *viper.Viper,
restyClient client.CommRestyClient,
mktDataClient mktdata.Client, mktWealthClient mktwealth.Client) worker.Worker {
return &ConsolidateOrderWorker{
commandArgs: commandArgs,
config: config,
dingTalkRestyClient: restyClient.DingTalk(),
mktDataClient: mktDataClient,
mktWealthClient: mktWealthClient,
}
}

// s1 获取合单类型 cots from client-orders
// s2 cots -> cot, 取 cot 类型的所有 client-orders cos
// s3 cos -> 合单 co
// s4 save co to DB

// prefer
// s1 [client-orders] -> co. 如无, s5
// s2 计算出 co 的(合单)类型 cot
// s3 save co to mapCOS[cot]
// s4 go to s1
// s5 save mapCOS to DB

/*
*待合单订单: 订单状态:确认下单
*cot: 产品 + 交易方向(买/卖) + 交易方式(金额/数量)
*/

type nomineeOrderInfo struct {
OrderType int json:"orderType" // 订单类型 1-申购 2-赎回
TradeMode int json:"tradeMode" // 交易方式 1-金额 2-数量
ProductCode string json:"productCode" // 产品代码
ProductType int json:"productType" // 产品类型:1-公募,2-私募,3-债券,4-票据 9-现金宝
ProductName string json:"productName" // 产品名称
Currency string json:"currency" // 币种
Symbol string json:"symbol" // 产品代码
ExpectSettleDate *time.Time json:"expectSettleDate"
ISIN string json:"isin" // ISIN
Direction int json:"direction" // 方向: 1-买入,2-卖出
FeeRate string
stat
}

type stat struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}

// // problematicOrderInfo 存储有问题的合单类型信息
// type problematicOrderInfo struct {
// ProductName string // 上手产品名称
// SubOrderIDs []int // 子订单ID列表
// }

type userOrderInfo struct {
OrderId int
BadOrderInfo bool
}

// processOrdersResult 结构体用于存储 processOrders 函数的处理结果
type processOrdersResult struct {
MapNomiOdr map[string]*nomineeOrderInfo // 存储每个合单类型的汇总信息
MapNomiOSubOrders map[string][]userOrderInfo // 存储每个合单类型对应的原始订单ID列表
}

func (t *ConsolidateOrderWorker) Run(ctx context.Context) error {
// 设置查询参数
fundType := t.commandArgs[0] // 定时任务的执行flag参数,决定查询MonetaryOrder还是NotMonetaryOrder
var offset int // 自动分配 const offset = 0
const limit = 100

// 初始化集合订单分类对象
var processOrdersResult processOrdersResult
processOrdersResult.MapNomiOdr = make(map[string]*nomineeOrderInfo)
processOrdersResult.MapNomiOSubOrders = make(map[string][]userOrderInfo)

for {
    // Step1: 获取一批订单
    wealthOrders, err := t.getCanConsolidateOrders(ctx, fundType, offset, limit)
    if err != nil {
        log.Fatalf("Error fetching orders: %v", err)
    }

    // 如果没有获取到订单,说明已经处理完毕
    if len(wealthOrders) == 0 {
        break
    }

    // 对订单进行基金节假日过滤
    var (
        nowTime   = carbon.Now()
        startDate = nowTime.ToDateStruct()
    )

    // TODO alex-todo是否会unique
    symbols := lo.Map[trademodel.TWealthOrder, string](wealthOrders, func(item trademodel.TWealthOrder, index int) string {
        return item.ProductSymbol
    })

    productHolidays, err := t.mktWealthClient.GetFundHolidays(ctx, &mktmapping.FundHoliadayParams{
        Symbols:   symbols,
        StartDate: startDate.ToDateString(),
        EndDate:   startDate.AddDays(int(mktwealthconstant.FundHolidaySpan)).ToDateString(),
    })
    if err != nil {
        log.Errorf("fail to get fund holidays,%s", err)
        return err
    }

    wealthOrders, err = (trademodel.TWealthOrders)(wealthOrders).FiltedByFundHoliday(nowTime.ToStdTime(), productHolidays)
    if err != nil {
        log.Errorf("filtered by fund holiday error,%s", err)
        continue
    }

    // Step2: 处理这批订单
    // 补充遇到有问题的订单存入problem struct并且包含子订单 id 全部存入
    processResult, err := t.processOrders(ctx, wealthOrders, productHolidays)
    if err != nil {
        log.Errorf("Error processing orders: %v", err)
        return err
    }

    // 累积处理得到 step3 需要得对象
    for k, v := range processResult.MapNomiOdr {
        processOrdersResult.MapNomiOdr[k] = v
    }
    for k, v := range processResult.MapNomiOSubOrders {
        processOrdersResult.MapNomiOSubOrders[k] = append(processOrdersResult.MapNomiOSubOrders[k], v...)
    }

    // 更新offset,准备获取下一批订单
    // 这里要防止陷入死循环
    offset += limit
}

// 转换成map[string][]int给step3使用
MapNomiOSubOrders := make(map[string][]int)
for key, userOrderInfos := range processOrdersResult.MapNomiOSubOrders {
    // 创建一个新的切片来存储该合单类型的所有订单ID
    orderIDs := make([]int, 0, len(userOrderInfos))

    // 遍历userOrderInfos并收集每个OrderId
    for _, userOrderInfo := range userOrderInfos {
        orderIDs = append(orderIDs, userOrderInfo.OrderId)
    }

    // 将收集到的订单ID切片存储到mapOrderIds中
    MapNomiOSubOrders[key] = orderIDs
}

// problematicOrderInfos := make(map[string]problematicOrderInfo)

// var isSendMsgToDingDing = processOrdersResult.IsSuccessful
// isSendMsgToDingDing := true
// Step3: 对处理的订单进行数据库操作(事务处理)
for k, v := range processOrdersResult.MapNomiOdr {
    // if !isSendMsgToDingDing {
    // 	problematicOrderInfos[k] = problematicOrderInfo{
    // 		ProductName: v.ProductName,
    // 		SubOrderIDs: processOrdersResult.MapNomiOSubOrders[k],
    // 	}
    // 	continue
    // }
    err := t.processAndSaveNomineeOrders(ctx, k, v,MapNomiOSubOrders, fundType)
    if err != nil {
        log.Errorf("Error saving consolidated orders: %v", err)
        return err
    }
}

// Step4: 发送告警 在总结构体里面报警
// if problematicOrderInfo !=  {
// for _, problematicOrder := range problematicOrderInfos {
// 	t.sendMsgToDingDing(ctx, "存在问题合单类型告警", fmt.Sprintf("产品名称: %s, 子订单ID列表: %v", problematicOrder.ProductName, problematicOrder.SubOrderIDs))
// }
// }

return nil

}

// getCanConsolidateOrders 根据基金类型、offset 和 limit 获取可合单的订单
func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) {
// Step1: 一次性拿出指定数量的订单在内存用来处理合单逻辑
// 根据fundType、offset和limit从数据库中获取订单
var (
tWealthOrder []model.TWealthOrder
err error
)

if fundType == constant.WorkerArgFundTypeMonetary {
    tWealthOrder, err = dao.WealthOrder.GetCanConsolidateMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_ECASH, orderconstant.W_PRODUCT_TYPE_PUB_FUND) // 只查货币型基金
    if err != nil {
        log.Fatal(err.Error())
    }
} else {
    tWealthOrder, err = dao.WealthOrder.GetCanConsolidateNotMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_PUB_FUND) // 只查非货币型基金
    if err != nil {
        log.Fatal(err.Error())
    }
}

// 拿完订单切片返回
return tWealthOrder, nil

}

// processOrders 对一批订单进行处理
func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) {
// 构建合单类型映射、处理统计信息、计算预计结算日等逻辑
// Step2: 遍历客户订单, 在代码中计算合单
mapNomiOSubOrders := make(map[string][]userOrderInfo) // 存储每个合单类型对应的原始订单ID列表
mapNomiOdr := make(map[string]*nomineeOrderInfo) // 存储每个合单类型的汇总信息,键由产品代码、交易方向和交易模式组成

// 构建订单的合单类型映射
// 在外面做完筛选,在里面只做合单
for _, record := range orders {
    ot := record.ProductCode + "," + strconv.Itoa(record.Direction)
    if _, ok := mapNomiOdr[ot]; !ok {
        no := &nomineeOrderInfo{
            Currency:    record.Currency,
            Direction:   record.Direction,
            ISIN:        record.ISIN,
            Symbol:      record.ProductSymbol,
            OrderType:   record.Direction,
            ProductCode: record.ProductCode,
            ProductName: record.ProductName,
            ProductType: record.ProductType,
            TradeMode:   record.TradeMode,
            FeeRate:     record.FeeRate,
            stat: struct {
                ApplyAmount   decimal.Decimal
                ApplyQuantity decimal.Decimal
                Fee           decimal.Decimal
                SourceType    orderconstant.OrderSourceType
            }{
                ApplyAmount:   decimal.Zero,
                ApplyQuantity: decimal.Zero,
                Fee:           decimal.Zero,
                SourceType:    record.SourceType, // 第一个客户订单的订单类型
            },
        }
        mapNomiOdr[ot] = no
        mapNomiOSubOrders[ot] = []userOrderInfo{{OrderId: record.Id, BadOrderInfo: false}}
    } else {
        // 将当前订单ID添加到对应的合单类型中
        mapNomiOSubOrders[ot] = append(mapNomiOSubOrders[ot], userOrderInfo{OrderId: record.Id, BadOrderInfo: false})
    }

    // 统计信息处理
    no := mapNomiOdr[ot]
    netAmountD, _ := decimal.NewFromString(record.ApplyNetAmount)
    quantityD, _ := decimal.NewFromString(record.ApplyQuantity)
    feeAmountD, _ := decimal.NewFromString(record.ApplyFeeAmount)

    // 只要存在不一致就是混合下单
    if record.SourceType != no.stat.SourceType {
        no.stat.SourceType = orderconstant.OrderSourceTypeMix
    }

    if no.TradeMode == orderconstant.W_TRADE_MODE_AMOUNT {
        no.stat.ApplyAmount = no.stat.ApplyAmount.Add(netAmountD)
        no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
    } else {
        no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
    }
    no.stat.Fee = no.stat.Fee.Add(feeAmountD)

    productBasicInfo, err := t.mktWealthClient.GetProductBasicInfo(ctx, mktmapping.ProductBasicInfoParams{
        ProductCode: record.ProductCode,
        Channel:     record.Channel,
    })
    if err != nil {
        // TODO alex-todo 日志打印
        // TODO 需要记录失败
        // l.Errorf("fail to get fund holidays,%s", err)
        continue
    }

    settlementDay := productBasicInfo.GetSettlementDayByDirection(record.Direction)
    expectSettledDate, err := t.getExpectSettledDateV3(ctx, time.Now(), no.Symbol, settlementDay, fundHoliadays)
    if err != nil {
        // t.sendMsgToDingDing(ctx, "预计结算日计算失败", fmt.Sprintf("订单ID: %d, 产品名称: %s", record.Id, record.ProductName))
        log.Errorf("t.getExpectSettledDate failed: %v", err)
        continue
    } else {
        no.ExpectSettleDate = &expectSettledDate
    }
}

processResult := &processOrdersResult{
    MapNomiOdr:        mapNomiOdr,
    MapNomiOSubOrders: mapNomiOSubOrders,
}

return processResult, nil

}

// processAndSaveNomineeOrders 处理订单并将其保存到数据库
func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error {
// … 实现事务逻辑,包括生成集合订单、子订单、更新原始订单状态等 …

// nomiOrder 集合订单
nomiOrder := new(model.TWealthNomineeOrder)
tnow := time.Now()
nomiOrder.IsMonetaryFund = 0                        // 0 非货币基金
if fundType == constant.WorkerArgFundTypeMonetary { // 货币型基金
    nomiOrder.IsMonetaryFund = 1
}
nomiOrder.ApplyAmount = v.stat.ApplyAmount.String()
nomiOrder.ApplyQuantity = v.stat.ApplyQuantity.String()
nomiOrder.Fee = v.stat.Fee.String()
gconv.Struct(v, nomiOrder)
if nomiOrder.ISIN == "" {
    nomiOrder.ISIN = nomiOrder.ProductCode
}
nomiOrder.OrderStatus = orderconstant.WealthNomineeOrderStatusWaiting
nomiOrder.SourceType = v.stat.SourceType.Value()
nomiOrder.SubmitTime = utils.GetPointerTime(tnow)
nomiOrder.ExpectSettleDate = v.ExpectSettleDate

// 生成订单号
orderNumberNo, err := dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
    log.Errorf(err.Error())
}
// no.OrderNumber 集合订单号
nomiOrder.OrderNumber = generateNomineeOrderNumber(orderNumberNo, nomiOrder.OrderType)

// https://www.tapd.cn/60236733/markdown_wikis/show/#1160236733001004373@toc2
// https://www.tapd.cn/60236733/markdown_wikis/show/#1160236733001004557@toc0
// 针对赎回订单可能会存在多上手的情况这里做了多集合订单的处理
nomiOrder, subOrderList, err := NomineeOrderSplit(ctx, nomiOrder)
if err != nil {
    log.Errorf(err.Error())
    return err
}

// 子集合订单生成OrderNumber
for i := range subOrderList {
    var subOrderNumberNo int
    subOrderNumberNo, err = dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
    if err != nil {
        log.Errorf(err.Error())
        return err
    }
    subOrderList[i].OrderNumber = generateNomineeOrderNumber(subOrderNumberNo, nomiOrder.OrderType)
}
nomineeOrderList := append(subOrderList, *nomiOrder)
if err = t.iFastOrderWarn(ctx, nomineeOrderList); err != nil {
    log.Error(err.Error())
    return err
}

// start updating db
tx := mysql.WithContext(ctx, dao.Dao.Basedb).Begin()
defer func() {
    if r := recover(); r != nil {
        tx.Rollback()
        panic(r)
    } else if err != nil {
        tx.Rollback()
    } else {
        err = tx.Commit().Error
        if err != nil {
            log.Errorf("tx commit error:%v", err.Error())
        }
    }
}()

if len(subOrderList) > 0 {
    err = tx.Create(subOrderList).Error
    if err != nil {
        log.Error(err.Error())
        tx.Rollback()
        return err
    }
}

err = tx.Create(nomiOrder).Error
if err != nil {
    log.Error(err.Error())
    tx.Rollback()
    return err
}

err = batchFrozenCustodianHolding(ctx, tx, nomineeOrderList)
if err != nil {
    log.Error(err.Error())
    tx.Rollback()
    return err
}

subOrders := mapNomiOSubOrders[key]
skip := false
for _, id := range subOrders {
    order := new(model.TWealthOrder)
    order.Id = id
    order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
    order.NomineeOrderId = nomiOrder.Id
    order.ExpectSettleDate = nomiOrder.ExpectSettleDate // 预计结算日
    err = tx.Updates(order).Error
    if err != nil {
        log.Error(err.Error())
        tx.Rollback()
        skip = true
        break
    }
}
if skip {
    return err
}

return nil

}

包与导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package worker

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"trade/internal/batchjobfund/constant"
"trade/internal/batchjobfund/dao"
"trade/internal/common/api/mktdata"
mktconstant "trade/internal/common/api/mktdata/constant"
"trade/internal/common/api/mktwealth"
mktwealthconstant "trade/internal/common/api/mktwealth/constant"
mktmapping "trade/internal/common/api/mktwealth/mapping"
"trade/internal/common/client"
"trade/internal/common/client/dingtalk"
"trade/internal/common/logic/msgpush"
cmapping "trade/internal/common/mapping"
"trade/internal/pkg/env"
"trade/internal/pkg/log"
custodianconstant "trade/internal/pkg/newconstant/custodian"
orderconstant "trade/internal/pkg/newconstant/order"
"trade/internal/pkg/worker"
pkgconstant "trade/pkg/constant"
model "trade/pkg/model/wealth"
"trade/pkg/utils"

trademodel "trade/internal/pkg/model/tradedb"
utildd "trade/pkg/utils/dingding"

"github.com/gogf/gf/v2/util/gconv"
"github.com/golang-module/carbon/v2"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/spf13/viper"
"gl.fotechwealth.com.local/backend/trade-lib.git/mysql"
"gorm.io/gorm"
)

首先,导入了多个包,包括标准库包和项目内部的包。这些包提供了上下文管理、错误处理、日志记录、数据库操作、HTTP客户端和其他实用工具。

ConsolidateOrderWorker 结构体

1
2
3
4
5
6
7
type ConsolidateOrderWorker struct {
commandArgs []string
config *viper.Viper
dingTalkRestyClient dingtalk.RestyClient
mktDataClient mktdata.Client
mktWealthClient mktwealth.Client
}

定义了一个 ConsolidateOrderWorker 结构体,包含了命令参数、配置对象以及市场数据和财富数据的客户端。

NewConsolidateOrderWorker 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
func NewConsolidateOrderWorker(
commandArgs []string,
config *viper.Viper,
restyClient client.CommRestyClient,
mktDataClient mktdata.Client, mktWealthClient mktwealth.Client) worker.Worker {
return &ConsolidateOrderWorker{
commandArgs: commandArgs,
config: config,
dingTalkRestyClient: restyClient.DingTalk(),
mktDataClient: mktDataClient,
mktWealthClient: mktWealthClient,
}
}

创建并返回一个新的 ConsolidateOrderWorker 实例。

ConsolidateOrderWorker 的 Run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (t *ConsolidateOrderWorker) Run(ctx context.Context) error {
// 设置查询参数
fundType := t.commandArgs[0]
var offset int
const limit = 100

var processOrdersResult processOrdersResult
processOrdersResult.MapNomiOdr = make(map[string]*nomineeOrderInfo)
processOrdersResult.MapNomiOSubOrders = make(map[string][]userOrderInfo)

for {
wealthOrders, err := t.getCanConsolidateOrders(ctx, fundType, offset, limit)
if err != nil {
log.Fatalf("Error fetching orders: %v", err)
}

if len(wealthOrders) == 0 {
break
}

nowTime := carbon.Now()
startDate := nowTime.ToDateStruct()
symbols := lo.Map[trademodel.TWealthOrder, string](wealthOrders, func(item trademodel.TWealthOrder, index int) string {
return item.ProductSymbol
})

productHolidays, err := t.mktWealthClient.GetFundHolidays(ctx, &mktmapping.FundHoliadayParams{
Symbols: symbols,
StartDate: startDate.ToDateString(),
EndDate: startDate.AddDays(int(mktwealthconstant.FundHolidaySpan)).ToDateString(),
})
if err != nil {
log.Errorf("fail to get fund holidays,%s", err)
return err
}

wealthOrders, err = (trademodel.TWealthOrders)(wealthOrders).FiltedByFundHoliday(nowTime.ToStdTime(), productHolidays)
if err != nil {
log.Errorf("filtered by fund holiday error,%s", err)
continue
}

processResult, err := t.processOrders(ctx, wealthOrders, productHolidays)
if err != nil {
log.Errorf("Error processing orders: %v", err)
return err
}

for k, v := range processResult.MapNomiOdr {
processOrdersResult.MapNomiOdr[k] = v
}
for k, v := range processResult.MapNomiOSubOrders {
processOrdersResult.MapNomiOSubOrders[k] = append(processOrdersResult.MapNomiOSubOrders[k], v...)
}

offset += limit
}

MapNomiOSubOrders := make(map[string][]int)
for key, userOrderInfos := range processOrdersResult.MapNomiOSubOrders {
orderIDs := make([]int, 0, len(userOrderInfos))
for _, userOrderInfo := range userOrderInfos {
orderIDs = append(orderIDs, userOrderInfo.OrderId)
}
MapNomiOSubOrders[key] = orderIDs
}

for k, v := range processOrdersResult.MapNomiOdr {
err := t.processAndSaveNomineeOrders(ctx, k, v, MapNomiOSubOrders, fundType)
if err != nil {
log.Errorf("Error saving consolidated orders: %v", err)
return err
}
}

return nil
}

Run 方法是 ConsolidateOrderWorker 的核心方法,负责执行整个合单处理流程。主要步骤包括:

  1. 设置查询参数。
  2. 初始化结果存储结构。
  3. 循环获取和处理订单。
  4. 处理订单并保存到数据库。

getCanConsolidateOrders 方法

1
2
3
4
5
6
7
8
9
10
11
12
func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) {
var tWealthOrder []model.TWealthOrder
var err error

if fundType == constant.WorkerArgFundTypeMonetary {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_ECASH, orderconstant.W_PRODUCT_TYPE_PUB_FUND)
} else {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateNotMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_PUB_FUND)
}

return tWealthOrder, err
}

根据基金类型、偏移量和限制获取可以合单的订单。

processOrders 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) {
mapNomiOSubOrders := make(map[string][]userOrderInfo)
mapNomiOdr := make(map[string]*nomineeOrderInfo)

for _, record := range orders {
ot := record.ProductCode + "," + strconv.Itoa(record.Direction)
if _, ok := mapNomiOdr[ot]; !ok {
no := &nomineeOrderInfo{
Currency: record.Currency,
Direction: record.Direction,
ISIN: record.ISIN,
Symbol: record.ProductSymbol,
OrderType: record.Direction,
ProductCode: record.ProductCode,
ProductName: record.ProductName,
ProductType: record.ProductType,
TradeMode: record.TradeMode,
FeeRate: record.FeeRate,
stat: struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}{
ApplyAmount: decimal.Zero,
ApplyQuantity: decimal.Zero,
Fee: decimal.Zero,
SourceType: record.SourceType,
},
}
mapNomiOdr[ot] = no
mapNomiOSubOrders[ot] = []userOrderInfo{{OrderId: record.Id, BadOrderInfo: false}}
} else {
mapNomiOSubOrders[ot] = append(mapNomiOSubOrders[ot], userOrderInfo{OrderId: record.Id, BadOrderInfo: false})
}

no := mapNomiOdr[ot]
netAmountD, _ := decimal.NewFromString(record.ApplyNetAmount)
quantityD, _ := decimal.NewFromString(record.ApplyQuantity)
feeAmountD, _ := decimal.NewFromString(record.ApplyFeeAmount)

if record.SourceType != no.stat.SourceType {
no.stat.SourceType = orderconstant.OrderSourceTypeMix
}

if no.TradeMode == orderconstant.W_TRADE_MODE_AMOUNT {
no.stat.ApplyAmount = no.stat.Apply

```go
Amount.Add(netAmountD)
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
} else {
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
}
no.stat.Fee = no.stat.Fee.Add(feeAmountD)

productBasicInfo, err := t.mktWealthClient.GetProductBasicInfo(ctx, mktmapping.ProductBasicInfoParams{
ProductCode: record.ProductCode,
Channel: record.Channel,
})
if err != nil {
log.Errorf("fail to get product basic info,%s", err)
continue
}

settlementDay := productBasicInfo.GetSettlementDayByDirection(record.Direction)
expectSettledDate, err := t.getExpectSettledDateV3(ctx, time.Now(), no.Symbol, settlementDay, fundHoliadays)
if err != nil {
log.Errorf("t.getExpectSettledDate failed: %v", err)
continue
} else {
no.ExpectSettleDate = &expectSettledDate
}
}

processResult := &processOrdersResult{
MapNomiOdr: mapNomiOdr,
MapNomiOSubOrders: mapNomiOSubOrders,
}

return processResult, nil
}

processOrders 方法对获取的一批订单进行处理,核心步骤如下:

  1. 初始化用于存储订单的合单类型映射。
  2. 遍历订单,按照产品代码和交易方向创建或更新合单类型。
  3. 统计订单的金额、数量和费用信息。
  4. 获取产品的结算日并计算预计结算日。

processAndSaveNomineeOrders 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error {
nomiOrder := new(model.TWealthNomineeOrder)
tnow := time.Now()
nomiOrder.IsMonetaryFund = 0
if fundType == constant.WorkerArgFundTypeMonetary {
nomiOrder.IsMonetaryFund = 1
}
nomiOrder.ApplyAmount = v.stat.ApplyAmount.String()
nomiOrder.ApplyQuantity = v.stat.ApplyQuantity.String()
nomiOrder.Fee = v.stat.Fee.String()
gconv.Struct(v, nomiOrder)
if nomiOrder.ISIN == "" {
nomiOrder.ISIN = nomiOrder.ProductCode
}
nomiOrder.OrderStatus = orderconstant.WealthNomineeOrderStatusWaiting
nomiOrder.SourceType = v.stat.SourceType.Value()
nomiOrder.SubmitTime = utils.GetPointerTime(tnow)
nomiOrder.ExpectSettleDate = v.ExpectSettleDate

orderNumberNo, err := dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
}
nomiOrder.OrderNumber = generateNomineeOrderNumber(orderNumberNo, nomiOrder.OrderType)

nomiOrder, subOrderList, err := NomineeOrderSplit(ctx, nomiOrder)
if err != nil {
log.Errorf(err.Error())
return err
}

for i := range subOrderList {
var subOrderNumberNo int
subOrderNumberNo, err = dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
return err
}
subOrderList[i].OrderNumber = generateNomineeOrderNumber(subOrderNumberNo, nomiOrder.OrderType)
}
nomineeOrderList := append(subOrderList, *nomiOrder)
if err = t.iFastOrderWarn(ctx, nomineeOrderList); err != nil {
log.Error(err.Error())
return err
}

tx := mysql.WithContext(ctx, dao.Dao.Basedb).Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit().Error
if err != nil {
log.Errorf("tx commit error:%v", err.Error())
}
}
}()

if len(subOrderList) > 0 {
err = tx.Create(subOrderList).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
}

err = tx.Create(nomiOrder).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

err = batchFrozenCustodianHolding(ctx, tx, nomineeOrderList)
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

subOrders := mapNomiOSubOrders[key]
skip := false
for _, id := range subOrders {
order := new(model.TWealthOrder)
order.Id = id
order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
order.NomineeOrderId = nomiOrder.Id
order.ExpectSettleDate = nomiOrder.ExpectSettleDate
err = tx.Updates(order).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
skip = true
break
}
}
if skip {
return err
}

return nil
}

processAndSaveNomineeOrders 方法处理并保存合单订单,主要步骤包括:

  1. 初始化合单订单对象并设置相关属性。
  2. 生成合单订单号。
  3. 拆分合单订单(如果需要)。
  4. 开始数据库事务并保存订单数据。
  5. 更新原始订单状态。

getCanConsolidateOrders 方法

1
2
3
4
5
6
7
8
9
10
11
12
func (t *ConsolidateOrderWorker) getCanConsolidateOrders(_ context.Context, fundType string, offset, limit int) ([]model.TWealthOrder, error) {
var tWealthOrder []model.TWealthOrder
var err error

if fundType == constant.WorkerArgFundTypeMonetary {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_ECASH, orderconstant.W_PRODUCT_TYPE_PUB_FUND)
} else {
tWealthOrder, err = dao.WealthOrder.GetCanConsolidateNotMonetaryOrders(offset, limit, orderconstant.W_PRODUCT_TYPE_PUB_FUND)
}

return tWealthOrder, err
}

getCanConsolidateOrders 方法根据基金类型、偏移量和限制从数据库中获取可以合单的订单。

nomineeOrderInfo 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type nomineeOrderInfo struct {
OrderType int `json:"orderType"`
TradeMode int `json:"tradeMode"`
ProductCode string `json:"productCode"`
ProductType int `json:"productType"`
ProductName string `json:"productName"`
Currency string `json:"currency"`
Symbol string `json:"symbol"`
ExpectSettleDate *time.Time `json:"expectSettleDate"`
ISIN string `json:"isin"`
Direction int `json:"direction"`
FeeRate string
stat
}

type stat struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}

nomineeOrderInfo 结构体用于存储合单订单的信息,包括订单类型、交易模式、产品代码、产品类型、产品名称、币种、产品代码、预计结算日、ISIN、方向和费用率等属性,以及统计信息 stat

userOrderInfo 结构体

1
2
3
4
type userOrderInfo struct {
OrderId int
BadOrderInfo bool
}

userOrderInfo 结构体用于存储用户订单的信息,包括订单ID和是否为问题订单的标志。

processOrdersResult 结构体

1
2
3
4
type processOrdersResult struct {
MapNomiOdr map[string]*nomineeOrderInfo
MapNomiOSubOrders map[string][]userOrderInfo
}

processOrdersResult 结构体用于存储 processOrders 函数的处理结果,包括每个合单类型的汇总信息和每个合单类型对应的原始订单ID列表。

我现在想用userOrderInfo 结构体来保存在processAndSaveNomineeOrders 方法和processOrders 方法里面处理订单出错的时候的订单元数据信息

type userOrderInfo struct {
OrderId int
BadOrderInfo bool
}

要在 processOrders 方法和 processAndSaveNomineeOrders 方法中使用 userOrderInfo 结构体来保存处理订单出错时的订单元数据信息,你需要在订单处理出错时设置 BadOrderInfotrue 并将该信息保存在相应的结构中。

修改 processOrders 方法

processOrders 方法中,当获取产品基本信息或计算预计结算日出错时,将 BadOrderInfo 设置为 true 并记录订单信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) {
mapNomiOSubOrders := make(map[string][]userOrderInfo)
mapNomiOdr := make(map[string]*nomineeOrderInfo)

for _, record := range orders {
ot := record.ProductCode + "," + strconv.Itoa(record.Direction)
if _, ok := mapNomiOdr[ot]; !ok {
no := &nomineeOrderInfo{
Currency: record.Currency,
Direction: record.Direction,
ISIN: record.ISIN,
Symbol: record.ProductSymbol,
OrderType: record.Direction,
ProductCode: record.ProductCode,
ProductName: record.ProductName,
ProductType: record.ProductType,
TradeMode: record.TradeMode,
FeeRate: record.FeeRate,
stat: struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}{
ApplyAmount: decimal.Zero,
ApplyQuantity: decimal.Zero,
Fee: decimal.Zero,
SourceType: record.SourceType,
},
}
mapNomiOdr[ot] = no
mapNomiOSubOrders[ot] = []userOrderInfo{{OrderId: record.Id, BadOrderInfo: false}}
} else {
mapNomiOSubOrders[ot] = append(mapNomiOSubOrders[ot], userOrderInfo{OrderId: record.Id, BadOrderInfo: false})
}

no := mapNomiOdr[ot]
netAmountD, _ := decimal.NewFromString(record.ApplyNetAmount)
quantityD, _ := decimal.NewFromString(record.ApplyQuantity)
feeAmountD, _ := decimal.NewFromString(record.ApplyFeeAmount)

if record.SourceType != no.stat.SourceType {
no.stat.SourceType = orderconstant.OrderSourceTypeMix
}

if no.TradeMode == orderconstant.W_TRADE_MODE_AMOUNT {
no.stat.ApplyAmount = no.stat.ApplyAmount.Add(netAmountD)
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
} else {
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
}
no.stat.Fee = no.stat.Fee.Add(feeAmountD)

productBasicInfo, err := t.mktWealthClient.GetProductBasicInfo(ctx, mktmapping.ProductBasicInfoParams{
ProductCode: record.ProductCode,
Channel: record.Channel,
})
if err != nil {
log.Errorf("fail to get product basic info,%s", err)
for i, orderInfo := range mapNomiOSubOrders[ot] {
if orderInfo.OrderId == record.Id {
mapNomiOSubOrders[ot][i].BadOrderInfo = true
break
}
}
continue
}

settlementDay := productBasicInfo.GetSettlementDayByDirection(record.Direction)
expectSettledDate, err := t.getExpectSettledDateV3(ctx, time.Now(), no.Symbol, settlementDay, fundHoliadays)
if err != nil {
log.Errorf("t.getExpectSettledDate failed: %v", err)
for i, orderInfo := range mapNomiOSubOrders[ot] {
if orderInfo.OrderId == record.Id {
mapNomiOSubOrders[ot][i].BadOrderInfo = true
break
}
}
continue
} else {
no.ExpectSettleDate = &expectSettledDate
}
}

processResult := &processOrdersResult{
MapNomiOdr: mapNomiOdr,
MapNomiOSubOrders: mapNomiOSubOrders,
}

return processResult, nil
}

修改 processAndSaveNomineeOrders 方法

processAndSaveNomineeOrders 方法中,当更新订单状态出错时,将 BadOrderInfo 设置为 true 并记录订单信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]int, fundType string) error {
nomiOrder := new(model.TWealthNomineeOrder)
tnow := time.Now()
nomiOrder.IsMonetaryFund = 0
if fundType == constant.WorkerArgFundTypeMonetary {
nomiOrder.IsMonetaryFund = 1
}
nomiOrder.ApplyAmount = v.stat.ApplyAmount.String()
nomiOrder.ApplyQuantity = v.stat.ApplyQuantity.String()
nomiOrder.Fee = v.stat.Fee.String()
gconv.Struct(v, nomiOrder)
if nomiOrder.ISIN == "" {
nomiOrder.ISIN = nomiOrder.ProductCode
}
nomiOrder.OrderStatus = orderconstant.WealthNomineeOrderStatusWaiting
nomiOrder.SourceType = v.stat.SourceType.Value()
nomiOrder.SubmitTime = utils.GetPointerTime(tnow)
nomiOrder.ExpectSettleDate = v.ExpectSettleDate

orderNumberNo, err := dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
}
nomiOrder.OrderNumber = generateNomineeOrderNumber(orderNumberNo, nomiOrder.OrderType)

nomiOrder, subOrderList, err := NomineeOrderSplit(ctx, nomiOrder)
if err != nil {
log.Errorf(err.Error())
return err
}

for i := range subOrderList {
var subOrderNumberNo int
subOrderNumberNo, err = dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
return err
}
subOrderList[i].OrderNumber = generateNomineeOrderNumber(subOrderNumberNo, nomiOrder.OrderType)
}
nomineeOrderList := append(subOrderList, *nomiOrder)
if err = t.iFastOrderWarn(ctx, nomineeOrderList); err != nil {
log.Error(err.Error())
return err
}

tx := mysql.WithContext(ctx, dao.Dao.Basedb).Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit().Error
if err != nil {
log.Errorf("tx commit error:%v", err.Error())
}
}
}()

if len(subOrderList) > 0 {
err = tx.Create(subOrderList).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
}

err = tx.Create(nomiOrder).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

err = batchFrozenCustodianHolding(ctx, tx, nomineeOrderList)
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

subOrders := mapNomiOSubOrders[key]
skip := false
for _, id := range subOrders {
order := new(model.TWealthOrder)
order.Id = id
order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
order.NomineeOrderId = nomiOrder.Id
order.ExpectSettleDate = nomiOrder.ExpectSettleDate
err = tx.Updates(order).Error
if err != nil {
log.Error(err.Error())
for i, userOrder := range mapNomiOSubOrders[key] {
if userOrder.OrderId == id {
mapNomiOSubOrders[key][i].BadOrderInfo = true
break
}
}
tx.Rollback()
skip = true
break
}
}
if skip {
return err
}

return nil
}

processAndSaveNomineeOrders 方法中,更新订单状态时,如果出错,将对应的 userOrderInfoBadOrderInfo 设置为 true 并记录该订单的信息。这样就可以在处理订单出错时保留订单元数据信息。

通过以上修改,你就可以在 processOrdersprocessAndSaveNomineeOrders 方法中记录处理订单出错时的订单元数据信息。

这一段代码报错:userOrder.OrderId undefined (type int has no field or method OrderId)

请你综合整段代码的逻辑修复这个错误

for _, id := range subOrders {
    order := new(model.TWealthOrder)
    order.Id = id
    order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
    order.NomineeOrderId = nomiOrder.Id
    order.ExpectSettleDate = nomiOrder.ExpectSettleDate // 预计结算日
    err = tx.Updates(order).Error
    if err != nil {
        log.Error(err.Error())
        for i, userOrder := range mapNomiOSubOrders[key] {
            if userOrder.OrderId == id {
                mapNomiOSubOrders[key][i].BadOrderInfo = true
                break
            }
        }
        tx.Rollback()
        skip = true
        break
    }
}

根据错误信息和代码逻辑,可以看出 subOrders 的元素是 int 类型的订单ID,而 userOrderuserOrderInfo 类型,包含 OrderIdBadOrderInfo 字段。

为了修复这个错误,需要将 subOrders 的类型从 []int 改为 []userOrderInfo。这样在处理订单时可以直接访问 OrderIdBadOrderInfo 字段。

修改 processAndSaveNomineeOrders 方法

subOrders 的类型改为 []userOrderInfo,并在更新订单状态出错时设置 BadOrderInfotrue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func (t *ConsolidateOrderWorker) processAndSaveNomineeOrders(ctx context.Context, key string, v *nomineeOrderInfo, mapNomiOSubOrders map[string][]userOrderInfo, fundType string) error {
nomiOrder := new(model.TWealthNomineeOrder)
tnow := time.Now()
nomiOrder.IsMonetaryFund = 0
if fundType == constant.WorkerArgFundTypeMonetary {
nomiOrder.IsMonetaryFund = 1
}
nomiOrder.ApplyAmount = v.stat.ApplyAmount.String()
nomiOrder.ApplyQuantity = v.stat.ApplyQuantity.String()
nomiOrder.Fee = v.stat.Fee.String()
gconv.Struct(v, nomiOrder)
if nomiOrder.ISIN == "" {
nomiOrder.ISIN = nomiOrder.ProductCode
}
nomiOrder.OrderStatus = orderconstant.WealthNomineeOrderStatusWaiting
nomiOrder.SourceType = v.stat.SourceType.Value()
nomiOrder.SubmitTime = utils.GetPointerTime(tnow)
nomiOrder.ExpectSettleDate = v.ExpectSettleDate

orderNumberNo, err := dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
}
nomiOrder.OrderNumber = generateNomineeOrderNumber(orderNumberNo, nomiOrder.OrderType)

nomiOrder, subOrderList, err := NomineeOrderSplit(ctx, nomiOrder)
if err != nil {
log.Errorf(err.Error())
return err
}

for i := range subOrderList {
var subOrderNumberNo int
subOrderNumberNo, err = dao.Dao.Access.GenIdNo(nil, pkgconstant.T_ID_TYPE_WEALTH_NOMINEE_ORDER_RULE1_CHAR)
if err != nil {
log.Errorf(err.Error())
return err
}
subOrderList[i].OrderNumber = generateNomineeOrderNumber(subOrderNumberNo, nomiOrder.OrderType)
}
nomineeOrderList := append(subOrderList, *nomiOrder)
if err = t.iFastOrderWarn(ctx, nomineeOrderList); err != nil {
log.Error(err.Error())
return err
}

tx := mysql.WithContext(ctx, dao.Dao.Basedb).Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit().Error
if err != nil {
log.Errorf("tx commit error:%v", err.Error())
}
}
}()

if len(subOrderList) > 0 {
err = tx.Create(subOrderList).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}
}

err = tx.Create(nomiOrder).Error
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

err = batchFrozenCustodianHolding(ctx, tx, nomineeOrderList)
if err != nil {
log.Error(err.Error())
tx.Rollback()
return err
}

subOrders := mapNomiOSubOrders[key]
skip := false
for _, userOrderInfo := range subOrders {
order := new(model.TWealthOrder)
order.Id = userOrderInfo.OrderId
order.RofNomineeOrderStatus = orderconstant.W_ORDER_ROFNOMINEEORDERSTATUS_CONSOLIDATED
order.NomineeOrderId = nomiOrder.Id
order.ExpectSettleDate = nomiOrder.ExpectSettleDate
err = tx.Updates(order).Error
if err != nil {
log.Error(err.Error())
userOrderInfo.BadOrderInfo = true
tx.Rollback()
skip = true
break
}
}
if skip {
return err
}

return nil
}

修改 processOrders 方法

processOrders 方法中,确保 mapNomiOSubOrders 中保存的是 userOrderInfo 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (t *ConsolidateOrderWorker) processOrders(ctx context.Context, orders []model.TWealthOrder, fundHoliadays []*mktmapping.FundHoliaday) (*processOrdersResult, error) {
mapNomiOSubOrders := make(map[string][]userOrderInfo)
mapNomiOdr := make(map[string]*nomineeOrderInfo)

for _, record := range orders {
ot := record.ProductCode + "," + strconv.Itoa(record.Direction)
if _, ok := mapNomiOdr[ot]; !ok {
no := &nomineeOrderInfo{
Currency: record.Currency,
Direction: record.Direction,
ISIN: record.ISIN,
Symbol: record.ProductSymbol,
OrderType: record.Direction,
ProductCode: record.ProductCode,
ProductName: record.ProductName,
ProductType: record.ProductType,
TradeMode: record.TradeMode,
FeeRate: record.FeeRate,
stat: struct {
ApplyAmount decimal.Decimal
ApplyQuantity decimal.Decimal
Fee decimal.Decimal
SourceType orderconstant.OrderSourceType
}{
ApplyAmount: decimal.Zero,
ApplyQuantity: decimal.Zero,
Fee: decimal.Zero,
SourceType: record.SourceType,
},
}
mapNomiOdr[ot] = no
mapNomiOSubOrders[ot] = []userOrderInfo{{OrderId: record.Id, BadOrderInfo: false}}
} else {
mapNomiOSubOrders[ot] = append(mapNomiOSubOrders[ot], userOrderInfo{OrderId: record.Id, BadOrderInfo: false})
}

no := mapNomiOdr[ot]
netAmountD, _ := decimal.NewFromString(record.ApplyNetAmount)
quantityD, _ := decimal.NewFromString(record.ApplyQuantity)
feeAmountD, _ := decimal.NewFromString(record.ApplyFeeAmount)

if record.SourceType != no.stat.SourceType {
no.stat.SourceType = orderconstant.OrderSourceTypeMix
}

if no.TradeMode == orderconstant.W_TRADE_MODE_AMOUNT {
no.stat.ApplyAmount = no.stat.ApplyAmount.Add(netAmountD)
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
} else {
no.stat.ApplyQuantity = no.stat.ApplyQuantity.Add(quantityD)
}
no.stat.Fee = no.stat.Fee.Add(feeAmountD)

productBasicInfo, err := t.mktWealthClient.GetProductBasicInfo(ctx, mktmapping.ProductBasicInfoParams{
ProductCode: record.ProductCode,
Channel: record.Channel,
})
if err != nil {
log.Errorf("fail to get product basic info,%s", err)
for i, orderInfo := range mapNomiOSubOrders[ot] {
if orderInfo.OrderId == record.Id {
mapNomiOSubOrders[ot][i].BadOrderInfo = true
break
}
}
continue
}

settlementDay := productBasicInfo.GetSettlementDayByDirection(record.Direction)
expectSettledDate, err := t.getExpectSettledDateV3(ctx, time.Now(), no.Symbol, settlementDay, fundHoliadays)
if err != nil {
log.Errorf("t.getExpectSettledDate failed: %v", err)
for i, orderInfo := range mapNomiOSubOrders[ot] {
if orderInfo.OrderId == record.Id {
mapNomiOSubOrders[ot][i].BadOrderInfo = true
break
}
}
continue
} else {
no.ExpectSettleDate = &expectSettledDate
}
}

processResult := &processOrdersResult{
MapNomiOdr: mapNomiOdr,
MapNomiOSubOrders: mapNomiOSubOrders,
}

return processResult, nil
}

通过以上修改,你可以在 processAndSaveNomineeOrders 方法和 processOrders 方法中处理订单出错时记录订单元数据信息。