feat: dbms新增支持工单流程审批

This commit is contained in:
meilin.huang
2024-02-29 22:12:50 +08:00
parent bf75483a3c
commit f93231da61
115 changed files with 3280 additions and 553 deletions

View File

@@ -7,11 +7,14 @@ import (
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
flowapp "mayfly-go/internal/flow/application"
flowentity "mayfly-go/internal/flow/domain/entity"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/jsonx"
"mayfly-go/pkg/utils/stringx"
"strconv"
"strings"
@@ -47,6 +50,8 @@ func (d *DbSqlExecRes) Merge(execRes *DbSqlExecRes) {
}
type DbSqlExec interface {
flowapp.FlowBizHandler
// 执行sql
Exec(ctx context.Context, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error)
@@ -58,7 +63,10 @@ type DbSqlExec interface {
}
type dbSqlExecAppImpl struct {
dbApp Db `inject:"DbApp"`
dbSqlExecRepo repository.DbSqlExec `inject:"DbSqlExecRepo"`
flowProcinstApp flowapp.Procinst `inject:"ProcinstApp"`
}
func createSqlExecRecord(ctx context.Context, execSqlReq *DbSqlExecReq) *entity.DbSqlExec {
@@ -67,6 +75,7 @@ func createSqlExecRecord(ctx context.Context, execSqlReq *DbSqlExecReq) *entity.
dbSqlExecRecord.Db = execSqlReq.Db
dbSqlExecRecord.Sql = execSqlReq.Sql
dbSqlExecRecord.Remark = execSqlReq.Remark
dbSqlExecRecord.Status = entity.DbSqlExecStatusSuccess
dbSqlExecRecord.FillBaseInfo(model.IdGenTypeNone, contextx.GetLoginAccount(ctx))
return dbSqlExecRecord
}
@@ -105,9 +114,9 @@ func (d *dbSqlExecAppImpl) Exec(ctx context.Context, execSqlReq *DbSqlExecReq) (
}
var execErr error
if isSelect || strings.HasPrefix(lowerSql, "show") {
execRes, execErr = doRead(ctx, execSqlReq)
execRes, execErr = d.doRead(ctx, execSqlReq)
} else {
execRes, execErr = doExec(ctx, execSqlReq.Sql, execSqlReq.DbConn)
execRes, execErr = d.doExec(ctx, execSqlReq, dbSqlExecRecord)
}
if execErr != nil {
return nil, execErr
@@ -119,29 +128,76 @@ func (d *dbSqlExecAppImpl) Exec(ctx context.Context, execSqlReq *DbSqlExecReq) (
switch stmt := stmt.(type) {
case *sqlparser.Select:
isSelect = true
execRes, err = doSelect(ctx, stmt, execSqlReq)
execRes, err = d.doSelect(ctx, stmt, execSqlReq)
case *sqlparser.Show:
isSelect = true
execRes, err = doRead(ctx, execSqlReq)
execRes, err = d.doRead(ctx, execSqlReq)
case *sqlparser.OtherRead:
isSelect = true
execRes, err = doRead(ctx, execSqlReq)
execRes, err = d.doRead(ctx, execSqlReq)
case *sqlparser.Update:
execRes, err = doUpdate(ctx, stmt, execSqlReq, dbSqlExecRecord)
execRes, err = d.doUpdate(ctx, stmt, execSqlReq, dbSqlExecRecord)
case *sqlparser.Delete:
execRes, err = doDelete(ctx, stmt, execSqlReq, dbSqlExecRecord)
execRes, err = d.doDelete(ctx, stmt, execSqlReq, dbSqlExecRecord)
case *sqlparser.Insert:
execRes, err = doInsert(ctx, stmt, execSqlReq, dbSqlExecRecord)
execRes, err = d.doInsert(ctx, stmt, execSqlReq, dbSqlExecRecord)
default:
execRes, err = doExec(ctx, execSqlReq.Sql, execSqlReq.DbConn)
execRes, err = d.doExec(ctx, execSqlReq, dbSqlExecRecord)
}
d.saveSqlExecLog(isSelect, dbSqlExecRecord)
if err != nil {
return nil, err
}
d.saveSqlExecLog(isSelect, dbSqlExecRecord)
return execRes, nil
}
func (d *dbSqlExecAppImpl) FlowBizHandle(ctx context.Context, procinstStatus flowentity.ProcinstStatus, bizKey string) error {
logx.Debugf("DbSqlExec FlowBizHandle -> bizKey: %s, procinstStatus: %s", bizKey, flowentity.ProcinstStatusEnum.GetDesc(procinstStatus))
// 流程挂起不处理
if procinstStatus == flowentity.ProcinstSuspended {
return nil
}
dbSqlExec := &entity.DbSqlExec{FlowBizKey: bizKey}
if err := d.dbSqlExecRepo.GetBy(dbSqlExec); err != nil {
logx.Errorf("flow-[%s]关联的sql执行信息不存在", bizKey)
return nil
}
if procinstStatus != flowentity.ProcinstCompleted {
dbSqlExec.Status = entity.DbSqlExecStatusNo
dbSqlExec.Res = fmt.Sprintf("流程%s", flowentity.ProcinstStatusEnum.GetDesc(procinstStatus))
return d.dbSqlExecRepo.UpdateById(ctx, dbSqlExec)
}
dbSqlExec.Status = entity.DbSqlExecStatusFail
dbConn, err := d.dbApp.GetDbConn(dbSqlExec.DbId, dbSqlExec.Db)
if err != nil {
dbSqlExec.Res = err.Error()
d.dbSqlExecRepo.UpdateById(ctx, dbSqlExec)
return err
}
rowsAffected, err := dbConn.ExecContext(ctx, dbSqlExec.Sql)
if err != nil {
dbSqlExec.Res = err.Error()
d.dbSqlExecRepo.UpdateById(ctx, dbSqlExec)
return err
}
dbSqlExec.Status = entity.DbSqlExecStatusSuccess
dbSqlExec.Res = fmt.Sprintf("执行成功,影响条数: %d", rowsAffected)
return d.dbSqlExecRepo.UpdateById(ctx, dbSqlExec)
}
func (d *dbSqlExecAppImpl) DeleteBy(ctx context.Context, condition *entity.DbSqlExec) {
d.dbSqlExecRepo.DeleteByCond(ctx, condition)
}
func (d *dbSqlExecAppImpl) GetPageList(condition *entity.DbSqlExecQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
return d.dbSqlExecRepo.GetPageList(condition, pageParam, toEntity, orderBy...)
}
// 保存sql执行记录如果是查询类则根据系统配置判断是否保存
func (d *dbSqlExecAppImpl) saveSqlExecLog(isQuery bool, dbSqlExecRecord *entity.DbSqlExec) {
if !isQuery {
@@ -156,15 +212,7 @@ func (d *dbSqlExecAppImpl) saveSqlExecLog(isQuery bool, dbSqlExecRecord *entity.
}
}
func (d *dbSqlExecAppImpl) DeleteBy(ctx context.Context, condition *entity.DbSqlExec) {
d.dbSqlExecRepo.DeleteByCond(ctx, condition)
}
func (d *dbSqlExecAppImpl) GetPageList(condition *entity.DbSqlExecQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
return d.dbSqlExecRepo.GetPageList(condition, pageParam, toEntity, orderBy...)
}
func doSelect(ctx context.Context, selectStmt *sqlparser.Select, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doSelect(ctx context.Context, selectStmt *sqlparser.Select, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error) {
selectExprsStr := sqlparser.String(selectStmt.SelectExprs)
if selectExprsStr == "*" || strings.Contains(selectExprsStr, ".*") ||
len(strings.Split(selectExprsStr, ",")) > 1 {
@@ -189,10 +237,10 @@ func doSelect(ctx context.Context, selectStmt *sqlparser.Select, execSqlReq *DbS
}
}
return doRead(ctx, execSqlReq)
return d.doRead(ctx, execSqlReq)
}
func doRead(ctx context.Context, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doRead(ctx context.Context, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error) {
dbConn := execSqlReq.DbConn
sql := execSqlReq.Sql
cols, res, err := dbConn.QueryContext(ctx, sql)
@@ -205,7 +253,7 @@ func doRead(ctx context.Context, execSqlReq *DbSqlExecReq) (*DbSqlExecRes, error
}, nil
}
func doUpdate(ctx context.Context, update *sqlparser.Update, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doUpdate(ctx context.Context, update *sqlparser.Update, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
dbConn := execSqlReq.DbConn
tableStr := sqlparser.String(update.TableExprs)
@@ -256,10 +304,10 @@ func doUpdate(ctx context.Context, update *sqlparser.Update, execSqlReq *DbSqlEx
dbSqlExec.Table = tableName
dbSqlExec.Type = entity.DbSqlExecTypeUpdate
return doExec(ctx, execSqlReq.Sql, dbConn)
return d.doExec(ctx, execSqlReq, dbSqlExec)
}
func doDelete(ctx context.Context, delete *sqlparser.Delete, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doDelete(ctx context.Context, delete *sqlparser.Delete, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
dbConn := execSqlReq.DbConn
tableStr := sqlparser.String(delete.TableExprs)
@@ -278,24 +326,47 @@ func doDelete(ctx context.Context, delete *sqlparser.Delete, execSqlReq *DbSqlEx
dbSqlExec.Table = table
dbSqlExec.Type = entity.DbSqlExecTypeDelete
return doExec(ctx, execSqlReq.Sql, dbConn)
return d.doExec(ctx, execSqlReq, dbSqlExec)
}
func doInsert(ctx context.Context, insert *sqlparser.Insert, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doInsert(ctx context.Context, insert *sqlparser.Insert, execSqlReq *DbSqlExecReq, dbSqlExec *entity.DbSqlExec) (*DbSqlExecRes, error) {
tableStr := sqlparser.String(insert.Table)
// 可能使用别名,故空格切割
table := strings.Split(tableStr, " ")[0]
dbSqlExec.Table = table
dbSqlExec.Type = entity.DbSqlExecTypeInsert
return doExec(ctx, execSqlReq.Sql, execSqlReq.DbConn)
return d.doExec(ctx, execSqlReq, dbSqlExec)
}
func doExec(ctx context.Context, sql string, dbConn *dbi.DbConn) (*DbSqlExecRes, error) {
func (d *dbSqlExecAppImpl) doExec(ctx context.Context, execSqlReq *DbSqlExecReq, dbSqlExecRecord *entity.DbSqlExec) (*DbSqlExecRes, error) {
dbConn := execSqlReq.DbConn
flowProcdefKey := dbConn.Info.FlowProcdefKey
if flowProcdefKey != "" {
bizKey := stringx.Rand(24)
// 如果该库关联了审批流程,则启动流程实例即可
_, err := d.flowProcinstApp.StartProc(ctx, flowProcdefKey, &flowapp.StarProcParam{
BizType: DbSqlExecFlowBizType,
BizKey: bizKey,
Remark: dbSqlExecRecord.Remark,
})
if err != nil {
return nil, err
}
dbSqlExecRecord.FlowBizKey = bizKey
dbSqlExecRecord.Status = entity.DbSqlExecStatusWait
return nil, nil
}
sql := execSqlReq.Sql
rowsAffected, err := dbConn.ExecContext(ctx, sql)
execRes := "success"
if err != nil {
execRes = err.Error()
dbSqlExecRecord.Status = entity.DbSqlExecStatusFail
dbSqlExecRecord.Res = err.Error()
} else {
dbSqlExecRecord.Res = fmt.Sprintf("执行成功,影响条数: %d", rowsAffected)
}
res := make([]map[string]any, 0)
resData := make(map[string]any)