package application import ( "context" "mayfly-go/internal/flow/domain/entity" "mayfly-go/internal/flow/domain/repository" "mayfly-go/pkg/base" "mayfly-go/pkg/errorx" "mayfly-go/pkg/eventbus" "mayfly-go/pkg/gox" "mayfly-go/pkg/logx" "mayfly-go/pkg/model" "mayfly-go/pkg/utils/collx" ) type Execution interface { base.App[*entity.Execution] Init() // CreateExecution 创建执行流(初始化) CreateExecution(ctx context.Context, procinst *entity.Procinst) error // ContinueExecution 推进执行流继续执行后续节点 ContinueExecution(ctx *ExecutionCtx) error // MoveTo 移动执行流到指定节点 MoveTo(ctx *ExecutionCtx, node *entity.FlowNode) error CreateChildExecution(context.Context, *entity.Execution) (*entity.Execution, error) // CancelExecution 取消流程实例的所有执行流 CancelExecution(ctx context.Context, procinstId uint64) error } type executionAppImpl struct { base.AppImpl[*entity.Execution, repository.Execution] hisProcinstOpApp HisProcinstOp `inject:"T"` } var _ (Execution) = (*executionAppImpl)(nil) func (e *executionAppImpl) Init() { nodeBehaviorRegistry.Register(&StartNodeBehavior{}) nodeBehaviorRegistry.Register(&EndNodeBehavior{}) nodeBehaviorRegistry.Register(&UserTaskNodeBehavior{}) nodeBehaviorRegistry.Register(&AiTaskNodeBehavior{}) const subId = "ExecutionApp" flowEventBus.Subscribe(EventTopicFlowProcinstCreate, subId, func(ctx context.Context, event *eventbus.Event[any]) error { procinst := event.Val.(*entity.Procinst) return e.CreateExecution(ctx, procinst) }) flowEventBus.Subscribe(EventTopicFlowProcinstCancel, subId, func(ctx context.Context, event *eventbus.Event[any]) error { procinstId := event.Val.(uint64) return e.CancelExecution(ctx, procinstId) }) } func (e *executionAppImpl) CreateExecution(ctx context.Context, procinst *entity.Procinst) error { flowDef := procinst.GetFlowDef() startNodes := flowDef.GetNodeByType(FlowNodeTypeStart) if len(startNodes) == 0 || len(startNodes) > 1 { return errorx.NewBiz("start node not found or more than one") } startNode := startNodes[0] execution := &entity.Execution{ ProcinstId: procinst.Id, ParentId: 0, NodeKey: startNode.Key, NodeName: startNode.Name, NodeType: startNode.Type, State: entity.ExectionStateSuspended, } if err := e.GetByCond(execution); err == nil { // 已存在挂起的流程实例,则直接激活继续执行 execution.State = entity.ExectionStateActive } else { execution.State = entity.ExectionStateActive execution.IsConcurrent = -1 } return e.Tx(ctx, func(ctx context.Context) error { if err := e.Save(ctx, execution); err != nil { return err } return e.executeNode(NewExecutionCtx(ctx, procinst, execution)) }) } func (e *executionAppImpl) ContinueExecution(ctx *ExecutionCtx) error { // 合并流程实例与执行流变量,执行流变量优先级高 vars := collx.MapMerge(ctx.ProcinsVars, ctx.ExecutionVars) nextNode, err := ctx.GetNextNode(vars) if err != nil { return err } return e.MoveTo(ctx, nextNode) } func (e *executionAppImpl) MoveTo(ctx *ExecutionCtx, nextNode *entity.FlowNode) error { execution := ctx.Execution if execution == nil { return errorx.NewBiz("execution is nil") } // 记录当前节点结束 if err := e.hisProcinstOpApp.RecordEnd(ctx, "copmpleted"); err != nil { return err } // 下一个节点为空,说明流程已结束 if nextNode == nil { return nil } // 执行流推进下一节点,并执行节点逻辑 execution.NodeKey = nextNode.Key execution.NodeName = nextNode.Name execution.NodeType = nextNode.Type return e.Tx(ctx, func(c context.Context) error { // 更新当前执行流的最新节点信息 if err := e.Save(c, execution); err != nil { return err } if execution.State == entity.ExectionStateActive { // 继续推进执行节点 return e.executeNode(ctx) } return nil }) } func (e *executionAppImpl) CreateChildExecution(ctx context.Context, parentExection *entity.Execution) (*entity.Execution, error) { execution := &entity.Execution{ ProcinstId: parentExection.ProcinstId, ParentId: parentExection.Id, } return execution, e.Save(ctx, execution) } func (e *executionAppImpl) CancelExecution(ctx context.Context, procinstId uint64) error { executions, err := e.ListByCond(model.NewCond(). Eq("procinst_id", procinstId). In("state", collx.AsArray(entity.ExectionStateActive, entity.ExectionStateSuspended))) if err != nil { return err } return e.Tx(ctx, func(ctx context.Context) error { for _, execution := range executions { execution.State = entity.ExectionStateTerminated if err := e.UpdateById(ctx, execution); err != nil { return err } } return nil }) } func (e *executionAppImpl) executeNode(ctx *ExecutionCtx) error { flowNode := ctx.GetFlowNode() node, err := nodeBehaviorRegistry.GetNode(flowNode.Type) if err != nil { return err } // 节点开始操作记录 if err := e.hisProcinstOpApp.RecordStart(ctx); err != nil { return err } // 执行节点逻辑 if node.IsAsync() { gox.Go(func() { if err := node.Execute(ctx); err != nil { logx.Errorf("async execute node error: %v, procinst_id: %d, node_key: %s", err, ctx.Procinst.Id, flowNode.Key) } }) return nil } return node.Execute(ctx) }