mirror of
				https://gitee.com/dromara/mayfly-go
				synced 2025-11-04 08:20:25 +08:00 
			
		
		
		
	fix: some issue
This commit is contained in:
		@@ -159,7 +159,7 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
 | 
			
		||||
						break
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				return errorx.NewBiz("get column data type... ignore~")
 | 
			
		||||
				return dbi.NewStopWalkQueryError("get column data type... ignore~")
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			updSql = fmt.Sprintf("and %s > %s", task.UpdField, updFieldDataType.DataType.SQLValue(task.UpdFieldVal))
 | 
			
		||||
 
 | 
			
		||||
@@ -333,7 +333,6 @@ func (d *dbSqlExecAppImpl) saveSqlExecLog(dbSqlExecRecord *entity.DbSqlExec, res
 | 
			
		||||
 | 
			
		||||
func (d *dbSqlExecAppImpl) doSelect(ctx context.Context, sqlExecParam *sqlExecParam) (*dto.DbSqlExecRes, error) {
 | 
			
		||||
	maxCount := config.GetDbms().MaxResultSet
 | 
			
		||||
	selectStmt := sqlExecParam.Stmt
 | 
			
		||||
	selectSql := sqlExecParam.Sql
 | 
			
		||||
	sqlExecParam.SqlExecRecord.Type = entity.DbSqlExecTypeQuery
 | 
			
		||||
 | 
			
		||||
@@ -343,49 +342,7 @@ func (d *dbSqlExecAppImpl) doSelect(ctx context.Context, sqlExecParam *sqlExecPa
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if selectStmt != nil {
 | 
			
		||||
		needCheckLimit := false
 | 
			
		||||
		var limit *sqlstmt.Limit
 | 
			
		||||
		switch stmt := selectStmt.(type) {
 | 
			
		||||
		case *sqlstmt.SimpleSelectStmt:
 | 
			
		||||
			qs := stmt.QuerySpecification
 | 
			
		||||
			limit = qs.Limit
 | 
			
		||||
			if qs.SelectElements != nil && (qs.SelectElements.Star != "" || len(qs.SelectElements.Elements) > 1) {
 | 
			
		||||
				needCheckLimit = true
 | 
			
		||||
			}
 | 
			
		||||
		case *sqlstmt.UnionSelectStmt:
 | 
			
		||||
			limit = stmt.Limit
 | 
			
		||||
			selectSql = selectStmt.GetText()
 | 
			
		||||
			needCheckLimit = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 如果配置为0,则不校验分页参数
 | 
			
		||||
		if needCheckLimit && maxCount != 0 {
 | 
			
		||||
			if limit == nil {
 | 
			
		||||
				return nil, errorx.NewBizI(ctx, imsg.ErrNoLimitStmt)
 | 
			
		||||
			}
 | 
			
		||||
			if limit.RowCount > maxCount {
 | 
			
		||||
				return nil, errorx.NewBizI(ctx, imsg.ErrLimitInvalid, "count", maxCount)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		if maxCount != 0 {
 | 
			
		||||
			if !strings.Contains(selectSql, "limit") &&
 | 
			
		||||
				// 兼容oracle rownum分页
 | 
			
		||||
				!strings.Contains(selectSql, "rownum") &&
 | 
			
		||||
				// 兼容mssql offset分页
 | 
			
		||||
				!strings.Contains(selectSql, "offset") &&
 | 
			
		||||
				// 兼容mssql top 分页  with result as ({query sql}) select top 100 * from result
 | 
			
		||||
				!strings.Contains(selectSql, " top ") {
 | 
			
		||||
				// 判断是不是count语句
 | 
			
		||||
				if !strings.Contains(selectSql, "count(") {
 | 
			
		||||
					return nil, errorx.NewBizI(ctx, imsg.ErrNoLimitStmt)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return d.doQuery(ctx, sqlExecParam.DbConn, selectSql)
 | 
			
		||||
	return d.doQuery(ctx, sqlExecParam.DbConn, selectSql, maxCount)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *dbSqlExecAppImpl) doOtherRead(ctx context.Context, sqlExecParam *sqlExecParam) (*dto.DbSqlExecRes, error) {
 | 
			
		||||
@@ -398,7 +355,7 @@ func (d *dbSqlExecAppImpl) doOtherRead(ctx context.Context, sqlExecParam *sqlExe
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return d.doQuery(ctx, sqlExecParam.DbConn, selectSql)
 | 
			
		||||
	return d.doQuery(ctx, sqlExecParam.DbConn, selectSql, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *dbSqlExecAppImpl) doExecDDL(ctx context.Context, sqlExecParam *sqlExecParam) (*dto.DbSqlExecRes, error) {
 | 
			
		||||
@@ -588,11 +545,23 @@ func (d *dbSqlExecAppImpl) doInsert(ctx context.Context, sqlExecParam *sqlExecPa
 | 
			
		||||
	return d.doExec(ctx, sqlExecParam.DbConn, sqlExecParam.Sql)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *dbSqlExecAppImpl) doQuery(ctx context.Context, dbConn *dbi.DbConn, sql string) (*dto.DbSqlExecRes, error) {
 | 
			
		||||
	cols, res, err := dbConn.QueryContext(ctx, sql)
 | 
			
		||||
func (d *dbSqlExecAppImpl) doQuery(ctx context.Context, dbConn *dbi.DbConn, sql string, maxRows int) (*dto.DbSqlExecRes, error) {
 | 
			
		||||
	res := make([]map[string]any, 0, 16)
 | 
			
		||||
	nowRows := 0
 | 
			
		||||
	cols, err := dbConn.WalkQueryRows(ctx, sql, func(row map[string]any, columns []*dbi.QueryColumn) error {
 | 
			
		||||
		nowRows++
 | 
			
		||||
		// 超过指定的最大查询记录数,则停止查询
 | 
			
		||||
		if maxRows != 0 && nowRows > maxRows {
 | 
			
		||||
			return dbi.NewStopWalkQueryError(fmt.Sprintf("exceed the maximum number of query records %d", maxRows))
 | 
			
		||||
		}
 | 
			
		||||
		res = append(res, row)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &dto.DbSqlExecRes{
 | 
			
		||||
		Sql:     sql,
 | 
			
		||||
		Columns: cols,
 | 
			
		||||
 
 | 
			
		||||
@@ -70,11 +70,7 @@ func (d *DbConn) QueryContext(ctx context.Context, querySql string, args ...any)
 | 
			
		||||
		return nil
 | 
			
		||||
	}, args...)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, wrapSqlError(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return cols, result, nil
 | 
			
		||||
	return cols, result, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 将查询结果映射至struct,可具体参考sqlx库
 | 
			
		||||
@@ -95,7 +91,15 @@ func (d *DbConn) Query2Struct(execSql string, dest any) error {
 | 
			
		||||
 | 
			
		||||
// WalkQueryRows 游标方式遍历查询结果集, walkFn返回error不为nil, 则跳出遍历并取消查询
 | 
			
		||||
func (d *DbConn) WalkQueryRows(ctx context.Context, querySql string, walkFn WalkQueryRowsFunc, args ...any) ([]*QueryColumn, error) {
 | 
			
		||||
	return d.walkQueryRows(ctx, querySql, walkFn, args...)
 | 
			
		||||
	if qcs, err := d.walkQueryRows(ctx, querySql, walkFn, args...); err != nil {
 | 
			
		||||
		// 如果是手动停止 则默认返回当前已遍历查询的数据即可
 | 
			
		||||
		if _, ok := err.(*StopWalkQueryError); ok {
 | 
			
		||||
			return qcs, nil
 | 
			
		||||
		}
 | 
			
		||||
		return qcs, wrapSqlError(err)
 | 
			
		||||
	} else {
 | 
			
		||||
		return qcs, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WalkTableRows 游标方式遍历指定表的结果集, walkFn返回error不为nil, 则跳出遍历并取消查询
 | 
			
		||||
@@ -242,3 +246,18 @@ func wrapSqlError(err error) error {
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// StopWalkQueryError 自定义的停止遍历查询错误类型
 | 
			
		||||
type StopWalkQueryError struct {
 | 
			
		||||
	Reason string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Error 实现 error 接口
 | 
			
		||||
func (e *StopWalkQueryError) Error() string {
 | 
			
		||||
	return fmt.Sprintf("stop walk query: %s", e.Reason)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewStopWalkQueryError 创建一个带有reason的StopWalkQueryError
 | 
			
		||||
func NewStopWalkQueryError(reason string) *StopWalkQueryError {
 | 
			
		||||
	return &StopWalkQueryError{Reason: reason}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -13,26 +13,31 @@ order by
 | 
			
		||||
    n.nspname
 | 
			
		||||
---------------------------------------
 | 
			
		||||
--PGSQL_TABLE_INFO 表详细信息
 | 
			
		||||
SELECT
 | 
			
		||||
    c.relname AS "tableName",
 | 
			
		||||
    obj_description(c.oid) AS "tableComment",
 | 
			
		||||
    pg_total_relation_size(c.oid) AS "dataLength",
 | 
			
		||||
    pg_indexes_size(c.oid) AS "indexLength",
 | 
			
		||||
    psut.n_live_tup AS "tableRows"
 | 
			
		||||
SELECT DISTINCT
 | 
			
		||||
  c.relname AS "tableName",
 | 
			
		||||
  COALESCE(b.description, '') AS "tableComment",
 | 
			
		||||
  pg_total_relation_size(c.oid) AS "dataLength",
 | 
			
		||||
  pg_indexes_size(c.oid) AS "indexLength",
 | 
			
		||||
  psut.n_live_tup AS "tableRows"
 | 
			
		||||
FROM
 | 
			
		||||
    pg_class c
 | 
			
		||||
JOIN
 | 
			
		||||
    pg_namespace n ON c.relnamespace = n.oid
 | 
			
		||||
JOIN
 | 
			
		||||
    pg_stat_user_tables psut ON psut.relid = c.oid
 | 
			
		||||
  pg_class c
 | 
			
		||||
  LEFT JOIN pg_description b ON c.oid = b.objoid AND b.objsubid = 0
 | 
			
		||||
  JOIN pg_stat_user_tables psut ON psut.relid = c.oid
 | 
			
		||||
WHERE
 | 
			
		||||
    has_table_privilege(c.oid, 'SELECT')
 | 
			
		||||
    and n.nspname = current_schema()
 | 
			
		||||
    {{if .tableNames}}
 | 
			
		||||
        and c.relname in ({{.tableNames}})
 | 
			
		||||
    {{end}}
 | 
			
		||||
  c.relkind = 'r'
 | 
			
		||||
  AND c.relnamespace = (
 | 
			
		||||
    SELECT
 | 
			
		||||
      oid
 | 
			
		||||
    FROM
 | 
			
		||||
      pg_namespace
 | 
			
		||||
    WHERE
 | 
			
		||||
      nspname = current_schema()
 | 
			
		||||
        {{if .tableNames}}
 | 
			
		||||
            and c.relname in ({{.tableNames}})
 | 
			
		||||
        {{end}}
 | 
			
		||||
  )
 | 
			
		||||
ORDER BY
 | 
			
		||||
    c.relname;
 | 
			
		||||
  c.relname;
 | 
			
		||||
---------------------------------------
 | 
			
		||||
--PGSQL_INDEX_INFO 表索引信息
 | 
			
		||||
SELECT a.indexname                                                         AS "indexName",
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,7 @@ import (
 | 
			
		||||
type Db struct {
 | 
			
		||||
	model.Model
 | 
			
		||||
 | 
			
		||||
	Code            string            `json:"code" gorm:"size:32;not null;index:idx_code"`
 | 
			
		||||
	Code            string            `json:"code" gorm:"size:32;not null;index:idx_db_code"`
 | 
			
		||||
	Name            string            `json:"name" gorm:"size:255;not null;"`
 | 
			
		||||
	GetDatabaseMode DbGetDatabaseMode `json:"getDatabaseMode" gorm:"comment:库名获取方式(-1.实时获取、1.指定库名)"` // 获取数据库方式
 | 
			
		||||
	Database        string            `json:"database" gorm:"size:2000;"`
 | 
			
		||||
 
 | 
			
		||||
@@ -23,8 +23,6 @@ var En = map[i18n.MsgId]string{
 | 
			
		||||
 | 
			
		||||
	ErrExistRunFailSql:      "There is an execution error in sql",
 | 
			
		||||
	ErrNeedSubmitWorkTicket: "This operation needs to submit a work ticket for approval",
 | 
			
		||||
	ErrNoLimitStmt:          "Please complete the paging information before executing",
 | 
			
		||||
	ErrLimitInvalid:         "The number of query result sets should be less than the {{.count}} number configured by the system",
 | 
			
		||||
 | 
			
		||||
	// db transfer
 | 
			
		||||
	LogDtsSave:         "dts - Save data transfer task",
 | 
			
		||||
 
 | 
			
		||||
@@ -33,8 +33,6 @@ const (
 | 
			
		||||
 | 
			
		||||
	ErrExistRunFailSql
 | 
			
		||||
	ErrNeedSubmitWorkTicket
 | 
			
		||||
	ErrNoLimitStmt
 | 
			
		||||
	ErrLimitInvalid
 | 
			
		||||
 | 
			
		||||
	// db transfer
 | 
			
		||||
	LogDtsSave
 | 
			
		||||
 
 | 
			
		||||
@@ -23,8 +23,6 @@ var Zh_CN = map[i18n.MsgId]string{
 | 
			
		||||
 | 
			
		||||
	ErrExistRunFailSql:      "存在执行错误的sql",
 | 
			
		||||
	ErrNeedSubmitWorkTicket: "该操作需要提交工单审批执行",
 | 
			
		||||
	ErrNoLimitStmt:          "请完善分页信息后执行",
 | 
			
		||||
	ErrLimitInvalid:         "查询结果集数需小于系统配置的{{.count}}条",
 | 
			
		||||
 | 
			
		||||
	// db transfer
 | 
			
		||||
	LogDtsSave:         "dts-保存数据迁移任务",
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user