diff --git a/internal/db/db_test.go b/internal/db/db_test.go index 25d92fb7..f13cac9e 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -1,13 +1,44 @@ package db import ( + "database/sql" + "database/sql/driver" + _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/dbs" "testing" + "time" ) -func TestDB(t *testing.T) { +func TestDB_Env(t *testing.T) { Tea.Env = "prod" t.Log(dbs.Default()) } + +func TestDB_Instance(t *testing.T) { + for i := 0; i < 10; i++ { + db, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/db_edge?charset=utf8mb4&timeout=30s") + if err != nil { + t.Fatal(i, "open:", err) + } + db.SetConnMaxIdleTime(time.Minute * 3) + db.SetConnMaxLifetime(time.Minute * 3) + db.SetMaxIdleConns(0) + db.SetMaxOpenConns(1) + go func(db *sql.DB, i int) { + for j := 0; j < 100; j++ { + err := db.Ping() + if err != nil { + if err == driver.ErrBadConn { + return + } + t.Fatal(i, "exec:", err) + } + time.Sleep(1 * time.Second) + } + t.Log(i, "ok", db) + }(db, i) + } + time.Sleep(100 * time.Second) +} diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index d37110b1..774f5489 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -27,7 +27,7 @@ type HTTPAccessLogDAOWrapper struct { func init() { initializer := NewDBNodeInitializer() - dbs.OnReady(func() { + dbs.OnReadyDone(func() { go initializer.Start() }) } diff --git a/internal/rpc/services/service_db_node.go b/internal/rpc/services/service_db_node.go index 644293d6..90205798 100644 --- a/internal/rpc/services/service_db_node.go +++ b/internal/rpc/services/service_db_node.go @@ -2,12 +2,18 @@ package services import ( "context" + "errors" "github.com/TeaOSLab/EdgeAPI/internal/db/models" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" + "strings" ) +var db *dbs.DB + // 数据库节点相关服务 type DBNodeService struct { BaseService @@ -27,7 +33,7 @@ func (this *DBNodeService) CreateDBNode(ctx context.Context, req *pb.CreateDBNod if err != nil { return nil, err } - return &pb.CreateDBNodeResponse{NodeId: nodeId}, nil + return &pb.CreateDBNodeResponse{DbNodeId: nodeId}, nil } // 修改数据库节点 @@ -40,7 +46,7 @@ func (this *DBNodeService) UpdateDBNode(ctx context.Context, req *pb.UpdateDBNod tx := this.NullTx() - err = models.SharedDBNodeDAO.UpdateNode(tx, req.NodeId, req.IsOn, req.Name, req.Description, req.Host, req.Port, req.Database, req.Username, req.Password, req.Charset) + err = models.SharedDBNodeDAO.UpdateNode(tx, req.DbNodeId, req.IsOn, req.Name, req.Description, req.Host, req.Port, req.Database, req.Username, req.Password, req.Charset) if err != nil { return nil, err } @@ -57,7 +63,7 @@ func (this *DBNodeService) DeleteDBNode(ctx context.Context, req *pb.DeleteDBNod tx := this.NullTx() - err = models.SharedDBNodeDAO.DisableDBNode(tx, req.NodeId) + err = models.SharedDBNodeDAO.DisableDBNode(tx, req.DbNodeId) if err != nil { return nil, err } @@ -98,6 +104,29 @@ func (this *DBNodeService) ListEnabledDBNodes(ctx context.Context, req *pb.ListE result := []*pb.DBNode{} for _, node := range nodes { + status := &pb.DBNodeStatus{} + + // 是否能够连接 + if node.IsOn == 1 { + db, err = dbs.NewInstanceFromConfig(node.DBConfig()) + if err != nil { + status.Error = err.Error() + } else { + one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name()) + if err != nil { + status.Error = err.Error() + _ = db.Close() + } else if one == nil { + status.Error = "unable to read size from database server" + _ = db.Close() + } else { + status.IsOk = true + status.Size = one.GetInt64("size") + _ = db.Close() + } + } + } + result = append(result, &pb.DBNode{ Id: int64(node.Id), Name: node.Name, @@ -109,9 +138,10 @@ func (this *DBNodeService) ListEnabledDBNodes(ctx context.Context, req *pb.ListE Username: node.Username, Password: node.Password, Charset: node.Charset, + Status: status, }) } - return &pb.ListEnabledDBNodesResponse{Nodes: result}, nil + return &pb.ListEnabledDBNodesResponse{DbNodes: result}, nil } // 根据ID查找可用的数据库节点 @@ -124,14 +154,14 @@ func (this *DBNodeService) FindEnabledDBNode(ctx context.Context, req *pb.FindEn tx := this.NullTx() - node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.NodeId) + node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId) if err != nil { return nil, err } if node == nil { - return &pb.FindEnabledDBNodeResponse{Node: nil}, nil + return &pb.FindEnabledDBNodeResponse{DbNode: nil}, nil } - return &pb.FindEnabledDBNodeResponse{Node: &pb.DBNode{ + return &pb.FindEnabledDBNodeResponse{DbNode: &pb.DBNode{ Id: int64(node.Id), Name: node.Name, Description: node.Description, @@ -144,3 +174,126 @@ func (this *DBNodeService) FindEnabledDBNode(ctx context.Context, req *pb.FindEn Charset: node.Charset, }}, nil } + +// 获取所有表信息 +func (this *DBNodeService) FindAllDBNodeTables(ctx context.Context, req *pb.FindAllDBNodeTablesRequest) (*pb.FindAllDBNodeTablesResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, dbs.ErrNotFound + } + db, err := dbs.NewInstanceFromConfig(node.DBConfig()) + if err != nil { + return nil, err + } + defer func() { + _ = db.Close() + }() + + ones, _, err := db.FindOnes("SELECT * FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name()) + if err != nil { + return nil, err + } + pbTables := []*pb.DBTable{} + for _, one := range ones { + lowerTableName := strings.ToLower(one.GetString("TABLE_NAME")) + canDelete := false + canClean := false + if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") { + canDelete = true + canClean = true + } else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs"}, lowerTableName) { + canClean = true + } + + pbTables = append(pbTables, &pb.DBTable{ + Name: one.GetString("TABLE_NAME"), + Schema: one.GetString("TABLE_SCHEMA"), + Type: one.GetString("TABLE_TYPE"), + Engine: one.GetString("ENGINE"), + Rows: one.GetInt64("TABLE_ROWS"), + DataLength: one.GetInt64("DATA_LENGTH"), + IndexLength: one.GetInt64("INDEX_LENGTH"), + Comment: one.GetString("TABLE_COMMENT"), + Collation: one.GetString("TABLE_COLLATION"), + IsBaseTable: one.GetString("TABLE_TYPE") == "BASE TABLE", + CanClean: canClean, + CanDelete: canDelete, + }) + } + + return &pb.FindAllDBNodeTablesResponse{DbNodeTables: pbTables}, nil +} + +// 删除表 +func (this *DBNodeService) DeleteDBNodeTable(ctx context.Context, req *pb.DeleteDBNodeTableRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, dbs.ErrNotFound + } + db, err := dbs.NewInstanceFromConfig(node.DBConfig()) + if err != nil { + return nil, err + } + defer func() { + _ = db.Close() + }() + + // 检查是否能够删除 + if !strings.HasPrefix(strings.ToLower(req.DbNodeTable), "edgehttpaccesslogs_") { + return nil, errors.New("forbidden to delete the table") + } + + _, err = db.Exec("DROP TABLE `" + req.DbNodeTable + "`") + if err != nil { + return nil, err + } + return this.Success() +} + +// 清空表 +func (this *DBNodeService) TruncateDBNodeTable(ctx context.Context, req *pb.TruncateDBNodeTableRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, dbs.ErrNotFound + } + db, err := dbs.NewInstanceFromConfig(node.DBConfig()) + if err != nil { + return nil, err + } + defer func() { + _ = db.Close() + }() + + _, err = db.Exec("TRUNCATE TABLE `" + req.DbNodeTable + "`") + if err != nil { + return nil, err + } + return this.Success() +}