Skip to content

Instantly share code, notes, and snippets.

@weedge
Last active December 31, 2022 03:42
Show Gist options
  • Save weedge/fcbc8e4a2889c6230d62f6f35de6862d to your computer and use it in GitHub Desktop.
Save weedge/fcbc8e4a2889c6230d62f6f35de6862d to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
type assetIncrHandler func(ctx context.Context) (incrAssetCn int)
type IAssetCallBack interface {
getAsset(ctx context.Context) (assetDto *UserAssetDto, err error)
}
type UserAssetDto struct {
AssetCn int `json:"assetCn"`
AssetType int `json:"assetType"`
UserId int64 `json:"userId"`
}
func (m *UserAssetDto) getAsset(ctx context.Context) (assetDto *UserAssetDto, err error) {
// mock
return &UserAssetDto{AssetCn: 10000, AssetType: m.AssetType, UserId: m.UserId}, nil
}
type ILocalTx interface {
UserAssetChangeTx(ctx context.Context, key string, lockerKey, eventMsgKey string, cb IAssetCallBack, handle assetIncrHandler) error
CheckEventMsg(ctx context.Context, eventMsgKey string) (bool, error)
}
type AssetTxMsgListener struct {
localTx ILocalTx
}
type DemoLocalTx struct{}
func (m *DemoLocalTx) UserAssetChangeTx(ctx context.Context, key string, lockerKey, eventMsgKey string, cb IAssetCallBack, handle assetIncrHandler) error {
println("this is demo local tx to run UserAssetChangeTx")
return nil
}
func (m *DemoLocalTx) CheckEventMsg(ctx context.Context, eventMsgKey string) (bool, error) {
println("this is demo local tx to run CheckEventMsg")
return false, nil
}
func NewAssetTxMsgListener(ltx ILocalTx) *AssetTxMsgListener {
return &AssetTxMsgListener{
localTx: ltx,
}
}
// https://github.com/apache/rocketmq-client-go/pull/982
func (m *AssetTxMsgListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
userId, _ := strconv.ParseInt(msg.GetProperty("userId"), 10, 64)
assetType, _ := strconv.Atoi(msg.GetProperty("assetType"))
dto := &UserAssetDto{AssetType: assetType, UserId: userId}
err := m.localTx.UserAssetChangeTx(context.Background(), msg.GetProperty("assetKey"),
msg.GetProperty("lockerKey"), msg.GetProperty("eMsgKey"), dto, func(ctx context.Context) (incrAssetCn int) {
return 500
})
if err != nil {
return primitive.RollbackMessageState
}
return primitive.CommitMessageState
}
func (m *AssetTxMsgListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
res, err := m.localTx.CheckEventMsg(context.Background(), msg.GetProperty("eMsgKey"))
if err != nil {
return primitive.UnknowState
}
if res {
return primitive.CommitMessageState
}
return primitive.UnknowState
}
type InteractGiftEventMsgData struct {
RoomId int64 `json:"roomId"`
InteractId int64 `json:"interactId"`
UserId int64 `json:"userId"`
RecUserId int64 `json:"recUserId"`
Record string `json:"record"`
RecordOp string `json:"recordOp"`
GiftId int64 `json:"giftId"`
}
func main() {
rlog.SetLogLevel("error")
namesrvs := []string{"127.0.0.1:9876"}
groupName := "P_GID_GIFT_ASSET_CHANGE"
traceCfg := &primitive.TraceConfig{
Access: primitive.Local,
Resolver: primitive.NewPassthroughResolver(namesrvs),
GroupName: groupName,
}
p, _ := rocketmq.NewTransactionProducer(
NewAssetTxMsgListener(&DemoLocalTx{}),
producer.WithNsResolver(primitive.NewPassthroughResolver(namesrvs)),
producer.WithGroupName(groupName),
producer.WithRetry(2),
producer.WithTrace(traceCfg),
)
err := p.Start()
if err != nil {
log.Printf("start producer error: %s\n", err.Error())
os.Exit(1)
}
userId := int64(100)
rawMsg, _ := json.Marshal(&InteractGiftEventMsgData{
RoomId: 1110,
InteractId: 110,
UserId: userId,
RecUserId: 123,
Record: "gift",
RecordOp: "sendGift",
GiftId: 120,
})
msg := primitive.NewMessage("TOPIC_ASSET_CHANGE_EVENT", rawMsg)
eventId := "testEventId"
msg.WithProperties(map[string]string{"eventId": eventId, "eventType": "interactGift",
"opUserId": strconv.FormatInt(userId, 10), "assetType": "1",
"assetKey": "", "lockerKey": "", "eMsgKey": ""})
msg.WithKeys([]string{eventId})
msg.WithTag("TAG_SEND_GIFT")
//notice: transaction msg can't batch, delay send~~
res, err := p.SendMessageInTransaction(context.Background(), msg)
if err != nil {
log.Printf("send message error: %s\n", err)
} else {
log.Printf("send message success: result=%s\n", res.String())
}
// get signal notify quit ( syscall.SIGTERM(kill -15),syscall.SIGINT(kill -2) )
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
err = p.Shutdown()
if err != nil {
log.Printf("shutdown producer error: %s", err.Error())
}
}
@weedge
Copy link
Author

weedge commented Dec 31, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment