From f71103c259ed10337dd1b1def8554090c1296724 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 19 Jan 2021 16:13:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E6=89=8B=E5=B7=A5=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/db_node_initializer.go | 2 +- internal/nodes/api_node.go | 1 + internal/rpc/services/service_db.go | 104 ++++++++++++++++++++ internal/rpc/services/service_db_test.go | 19 ++++ internal/tasks/server_access_log_cleaner.go | 2 +- 5 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 internal/rpc/services/service_db.go create mode 100644 internal/rpc/services/service_db_test.go diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index 197bbda3..d37110b1 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -109,7 +109,7 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { } // 创建表格 - _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`)\n) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4;") + _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';") if err != nil { return tableName, err } diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index c7feb5e7..2ba099d2 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -225,6 +225,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterUserAccessKeyServiceServer(rpcServer, &services.UserAccessKeyService{}) pb.RegisterSysLockerServiceServer(rpcServer, &services.SysLockerService{}) pb.RegisterNodeTaskServiceServer(rpcServer, &services.NodeTaskService{}) + pb.RegisterDBServiceServer(rpcServer, &services.DBService{}) err := rpcServer.Serve(listener) if err != nil { return errors.New("[API_NODE]start rpc failed: " + err.Error()) diff --git a/internal/rpc/services/service_db.go b/internal/rpc/services/service_db.go new file mode 100644 index 00000000..1e913406 --- /dev/null +++ b/internal/rpc/services/service_db.go @@ -0,0 +1,104 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "strings" +) + +// 数据库相关服务 +type DBService struct { + BaseService +} + +// 获取所有表信息 +func (this *DBService) FindAllDBTables(ctx context.Context, req *pb.FindAllDBTablesRequest) (*pb.FindAllDBTablesResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + db, err := dbs.Default() + if err != nil { + return nil, err + } + ones, _, err := db.FindOnes("SELECT * FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name()) + if err != nil { + return nil, err + } + pbTables := []*pb.DBTable{} + for _, one := range ones { + lowerTableName := strings.ToLower(one.GetString("TABLE_NAME")) + canDelete := false + canClean := false + if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") { + canDelete = true + canClean = true + } else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs"}, lowerTableName) { + canClean = true + } + + pbTables = append(pbTables, &pb.DBTable{ + Name: one.GetString("TABLE_NAME"), + Schema: one.GetString("TABLE_SCHEMA"), + Type: one.GetString("TABLE_TYPE"), + Engine: one.GetString("ENGINE"), + Rows: one.GetInt64("TABLE_ROWS"), + DataLength: one.GetInt64("DATA_LENGTH"), + IndexLength: one.GetInt64("INDEX_LENGTH"), + Comment: one.GetString("TABLE_COMMENT"), + Collation: one.GetString("TABLE_COLLATION"), + IsBaseTable: one.GetString("TABLE_TYPE") == "BASE TABLE", + CanClean: canClean, + CanDelete: canDelete, + }) + } + + return &pb.FindAllDBTablesResponse{DbTables: pbTables}, nil +} + +// 删除表 +func (this *DBService) DeleteDBTable(ctx context.Context, req *pb.DeleteDBTableRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + db, err := dbs.Default() + if err != nil { + return nil, err + } + + // 检查是否能够删除 + if !strings.HasPrefix(strings.ToLower(req.DbTable), "edgehttpaccesslogs_") { + return nil, errors.New("forbidden to delete the table") + } + + _, err = db.Exec("DROP TABLE `" + req.DbTable + "`") + if err != nil { + return nil, err + } + return this.Success() +} + +// 清空表 +func (this *DBService) TruncateDBTable(ctx context.Context, req *pb.TruncateDBTableRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + db, err := dbs.Default() + if err != nil { + return nil, err + } + + _, err = db.Exec("TRUNCATE TABLE `" + req.DbTable + "`") + if err != nil { + return nil, err + } + return this.Success() +} diff --git a/internal/rpc/services/service_db_test.go b/internal/rpc/services/service_db_test.go new file mode 100644 index 00000000..33c3b081 --- /dev/null +++ b/internal/rpc/services/service_db_test.go @@ -0,0 +1,19 @@ +package services + +import ( + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" + "testing" +) + +func TestDBService_FindAllDBTables(t *testing.T) { + db, err := dbs.Default() + if err != nil { + t.Fatal(err) + } + ones, _, err := db.FindOnes("SELECT * FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name()) + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(ones, t) +} diff --git a/internal/tasks/server_access_log_cleaner.go b/internal/tasks/server_access_log_cleaner.go index 55bb7a89..1669c81c 100644 --- a/internal/tasks/server_access_log_cleaner.go +++ b/internal/tasks/server_access_log_cleaner.go @@ -105,7 +105,7 @@ func (this *ServerAccessLogCleaner) cleanDB(db *dbs.DB, endDay string) error { if len(tableName) == 0 { continue } - ok, err := regexp.MatchString(`^edgeHTTPAccessLogs_(\d{8})$`, tableName) + ok, err := regexp.MatchString(`^(?i)edgeHTTPAccessLogs_(\d{8})$`, tableName) if err != nil { return err }