From 8fce802185150f32ff1a79abe30b82b9b64dc94c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Sat, 10 Oct 2020 11:49:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E8=AF=B7=E6=B1=82=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 10 + internal/db/models/db_node_dao.go | 11 + internal/db/models/db_node_initializer.go | 227 ++++++++++++++++++ .../db/models/db_node_initializer_test.go | 45 ++++ internal/db/models/http_access_log_dao.go | 93 +++++++ .../db/models/http_access_log_dao_test.go | 30 +++ internal/db/models/http_access_log_model.go | 26 ++ .../db/models/http_access_log_model_ext.go | 1 + internal/db/models/node_model.go | 2 + internal/db/models/server_dao_test.go | 2 +- internal/nodes/api_node.go | 1 + .../rpc/services/service_http_access_log.go | 31 +++ 13 files changed, 479 insertions(+), 2 deletions(-) create mode 100644 internal/db/models/db_node_initializer.go create mode 100644 internal/db/models/db_node_initializer_test.go create mode 100644 internal/db/models/http_access_log_dao.go create mode 100644 internal/db/models/http_access_log_dao_test.go create mode 100644 internal/db/models/http_access_log_model.go create mode 100644 internal/db/models/http_access_log_model_ext.go create mode 100644 internal/rpc/services/service_http_access_log.go diff --git a/go.mod b/go.mod index 9327115c..2ca25604 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.4.2 - github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 + github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a github.com/pkg/sftp v1.12.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a google.golang.org/grpc v1.32.0 diff --git a/go.sum b/go.sum index b6751b0b..4b73a308 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,16 @@ github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e h1:/xn7wUvlwaoA5IkdBUc github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 h1:7OZC/Qy7Z/hK9vG6YQOwHNOUPunSImYYJMiIfvuDQZ0= github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201009105423-b2d996734307 h1:Vzrn/Q1cvSPVaieSq1PKJ234GBNOQjaCnU/VdiSfNVs= +github.com/iwind/TeaGo v0.0.0-20201009105423-b2d996734307/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201009112016-344e63215665 h1:qqsdkRWRd8/1uTbbdTx3l9CFkUdUmjhn3tJc/Dc7uEE= +github.com/iwind/TeaGo v0.0.0-20201009112016-344e63215665/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201009115948-1a609db84e26 h1:IvpeMnLs3RWyPsSjFBhg1O6DF+w6G9o1/wPBqhmGE8w= +github.com/iwind/TeaGo v0.0.0-20201009115948-1a609db84e26/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201009121920-5ab994526a80 h1:GFObMYO7mqeE+2n/4AACqCLlPg+GNNZCAb+pPIF70NE= +github.com/iwind/TeaGo v0.0.0-20201009121920-5ab994526a80/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a h1:sO6uDbQOEe6/tIB3o31vn6eD/JmkKGErKgcOA/Cpb+Q= +github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= diff --git a/internal/db/models/db_node_dao.go b/internal/db/models/db_node_dao.go index a2ed7eda..579b1d50 100644 --- a/internal/db/models/db_node_dao.go +++ b/internal/db/models/db_node_dao.go @@ -124,3 +124,14 @@ func (this *DBNodeDAO) UpdateNode(nodeId int64, isOn bool, name string, descript _, err := this.Save(op) return err } + +// 查找所有可用的数据库节点 +func (this *DBNodeDAO) FindAllEnabledAndOnDBNodes() (result []*DBNode, err error) { + _, err = this.Query(). + State(DBNodeStateEnabled). + Attr("isOn", true). + Slice(&result). + DescPk(). + FindAll() + return +} diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go new file mode 100644 index 00000000..3e1e79cb --- /dev/null +++ b/internal/db/models/db_node_initializer.go @@ -0,0 +1,227 @@ +package models + +import ( + "fmt" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/logs" + timeutil "github.com/iwind/TeaGo/utils/time" + "hash/crc32" + "strconv" + "strings" + "sync" + "time" +) + +var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB +var accessLogDAOMapping = map[int64]*HTTPAccessLogDAO{} // dbNodeId => DAO +var accessLogLocker = &sync.RWMutex{} +var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true + +func init() { + initializer := NewDBNodeInitializer() + go initializer.Start() +} + +// 获取获取DAO +func randomAccessLogDAO() (dao *HTTPAccessLogDAO) { + accessLogLocker.RLock() + if len(accessLogDAOMapping) == 0 { + dao = nil + } else { + for _, d := range accessLogDAOMapping { + dao = d + break + } + } + accessLogLocker.RUnlock() + return +} + +// 根据日期获取表名 +func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { + config, err := db.Config() + if err != nil { + return "", err + } + + tableName := "edgeHTTPAccessLogs_" + day + cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn))) + + if !force { + accessLogLocker.RLock() + _, ok := accessLogTableMapping[cacheKey] + accessLogLocker.RUnlock() + if ok { + return tableName, nil + } + } + + tableNames, err := db.TableNames() + if err != nil { + return tableName, err + } + + if lists.ContainsString(tableNames, tableName) { + accessLogLocker.Lock() + accessLogTableMapping[cacheKey] = true + accessLogLocker.Unlock() + return tableName, nil + } + + // 创建表格 + _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `day` varchar(8) DEFAULT NULL COMMENT '日期Ymd',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;") + if err != nil { + return tableName, err + } + + accessLogLocker.Lock() + accessLogTableMapping[cacheKey] = true + accessLogLocker.Unlock() + + return tableName, nil +} + +// 初始化数据库连接 +type DBNodeInitializer struct { +} + +func NewDBNodeInitializer() *DBNodeInitializer { + return &DBNodeInitializer{} +} + +// 启动 +func (this *DBNodeInitializer) Start() { + // 初始运行 + err := this.loop() + if err != nil { + logs.Println("[DB_NODE]" + err.Error()) + } + + // 定时运行 + ticker := time.NewTicker(60 * time.Second) + for range ticker.C { + err := this.loop() + if err != nil { + logs.Println("[DB_NODE]" + err.Error()) + } + } +} + +// 单次运行 +func (this *DBNodeInitializer) loop() error { + dbNodes, err := SharedDBNodeDAO.FindAllEnabledAndOnDBNodes() + if err != nil { + return err + } + + nodeIds := []int64{} + for _, node := range dbNodes { + nodeIds = append(nodeIds, int64(node.Id)) + } + + // 关掉老的 + accessLogLocker.Lock() + closingDbs := []*dbs.DB{} + for nodeId, db := range accessLogDBMapping { + if !this.containsInt64(nodeIds, nodeId) { + closingDbs = append(closingDbs, db) + delete(accessLogDBMapping, nodeId) + delete(accessLogDAOMapping, nodeId) + logs.Println("[DB_NODE]close db node '" + strconv.FormatInt(nodeId, 10) + "'") + } + } + accessLogLocker.Unlock() + for _, db := range closingDbs { + _ = db.Close() + } + + // 启动新的 + for _, node := range dbNodes { + nodeId := int64(node.Id) + accessLogLocker.Lock() + db, ok := accessLogDBMapping[nodeId] + accessLogLocker.Unlock() + + dsn := node.Username + ":" + node.Password + "@tcp(" + node.Host + ":" + fmt.Sprintf("%d", node.Port) + ")/" + node.Database + "?charset=utf8mb4&timeout=10s" + + if ok { + // 检查配置是否有变化 + oldConfig, err := db.Config() + if err != nil { + logs.Println("[DB_NODE]read database old config failed: " + err.Error()) + continue + } + + // 如果有变化则关闭 + if oldConfig.Dsn != dsn { + _ = db.Close() + db = nil + } + } + + if db == nil { + config := &dbs.DBConfig{ + Driver: "mysql", + Dsn: dsn, + Prefix: "edge", + } + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + logs.Println("[DB_NODE]initialize database config failed: " + err.Error()) + continue + } + + // 检查表是否存在 + tableName, err := findAccessLogTable(db, timeutil.Format("Ymd"), false) + if err != nil { + if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误 + logs.Println("[DB_NODE]create first table in database node failed: " + err.Error()) + + // 创建节点日志 + createLogErr := SharedNodeLogDAO.CreateLog(NodeRoleDatabase, nodeId, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix()) + if createLogErr != nil { + logs.Println("[NODE_LOG]" + createLogErr.Error()) + } + + continue + } else { + err = nil + } + } + + daoObject := dbs.DAOObject{ + Instance: db, + DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")", + Table: tableName, + PkName: "id", + Model: new(HTTPAccessLog), + } + err = daoObject.Init() + if err != nil { + logs.Println("[DB_NODE]initialize dao failed: " + err.Error()) + continue + } + + accessLogLocker.Lock() + accessLogDBMapping[nodeId] = db + dao := &HTTPAccessLogDAO{ + DAOObject: daoObject, + } + accessLogDAOMapping[nodeId] = dao + accessLogLocker.Unlock() + } + } + + return nil +} + +// 判断是否包含某数字 +func (this *DBNodeInitializer) containsInt64(values []int64, value int64) bool { + for _, v := range values { + if v == value { + return true + } + } + return false +} diff --git a/internal/db/models/db_node_initializer_test.go b/internal/db/models/db_node_initializer_test.go new file mode 100644 index 00000000..21d7cbd2 --- /dev/null +++ b/internal/db/models/db_node_initializer_test.go @@ -0,0 +1,45 @@ +package models + +import ( + "runtime" + "testing" + "time" +) + +func TestDBNodeInitializer_loop(t *testing.T) { + initializer := NewDBNodeInitializer() + err := initializer.loop() + if err != nil { + t.Fatal(err) + } + t.Log(len(accessLogDBMapping), len(accessLogDAOMapping)) +} + +func TestFindAccessLogTable(t *testing.T) { + before := time.Now() + db := SharedHTTPAccessLogDAO.Instance + tableName, err := findAccessLogTable(db, "20201010", false) + if err != nil { + t.Fatal(err) + } + t.Log(tableName) + t.Log(time.Since(before).Seconds()*1000, "ms") + + before = time.Now() + tableName, err = findAccessLogTable(db, "20201010", false) + + if err != nil { + t.Fatal(err) + } + t.Log(tableName) + t.Log(time.Since(before).Seconds()*1000, "ms") +} + +func BenchmarkFindAccessLogTable(b *testing.B) { + db := SharedHTTPAccessLogDAO.Instance + + runtime.GOMAXPROCS(1) + for i := 0; i < b.N; i++ { + _, _ = findAccessLogTable(db, "20201010", false) + } +} diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go new file mode 100644 index 00000000..cac285b3 --- /dev/null +++ b/internal/db/models/http_access_log_dao.go @@ -0,0 +1,93 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + timeutil "github.com/iwind/TeaGo/utils/time" + "strings" + "time" +) + +type HTTPAccessLogDAO dbs.DAO + +var SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO() + +func NewHTTPAccessLogDAO() *HTTPAccessLogDAO { + return dbs.NewDAO(&HTTPAccessLogDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeHTTPAccessLogs", + Model: new(HTTPAccessLog), + PkName: "id", + }, + }).(*HTTPAccessLogDAO) +} + +// 创建访问日志 +func CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error { + dao := randomAccessLogDAO() + if dao == nil { + dao = SharedHTTPAccessLogDAO + } + return CreateHTTPAccessLogsWithDAO(dao, accessLogs) +} + +// 使用特定的DAO创建访问日志 +func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAccessLog) error { + if dao == nil { + return errors.New("dao should not be nil") + } + if len(accessLogs) == 0 { + return nil + } + + // TODO 改成事务批量提交,以加快速度 + + for _, accessLog := range accessLogs { + day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) + table, err := findAccessLogTable(dao.Instance, day, false) + if err != nil { + return err + } + + fields := map[string]interface{}{} + fields["serverId"] = accessLog.ServerId + fields["nodeId"] = accessLog.NodeId + fields["status"] = accessLog.Status + fields["createdAt"] = accessLog.Timestamp + fields["day"] = day + + content, err := json.Marshal(accessLog) + if err != nil { + return err + } + fields["content"] = content + + _, err = dao.Query(). + Table(table). + Sets(fields). + Insert() + if err != nil { + // 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试 + if strings.Contains(err.Error(), "1146") { + table, err = findAccessLogTable(dao.Instance, day, true) + if err != nil { + return err + } + _, err = dao.Query(). + Table(table). + Sets(fields). + Insert() + if err != nil { + return err + } + } + } + } + + return nil +} diff --git a/internal/db/models/http_access_log_dao_test.go b/internal/db/models/http_access_log_dao_test.go new file mode 100644 index 00000000..07693a11 --- /dev/null +++ b/internal/db/models/http_access_log_dao_test.go @@ -0,0 +1,30 @@ +package models + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" + "testing" + "time" +) + +func TestCreateHTTPAccessLogs(t *testing.T) { + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + accessLog := &pb.HTTPAccessLog{ + ServerId: 1, + NodeId: 4, + Status: 200, + Timestamp: time.Now().Unix(), + } + dao := randomAccessLogDAO() + t.Log("dao:", dao) + err = CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog}) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/db/models/http_access_log_model.go b/internal/db/models/http_access_log_model.go new file mode 100644 index 00000000..3dcebe7a --- /dev/null +++ b/internal/db/models/http_access_log_model.go @@ -0,0 +1,26 @@ +package models + +// +type HTTPAccessLog struct { + Id uint64 `field:"id"` // ID + ServerId uint32 `field:"serverId"` // 服务ID + NodeId uint32 `field:"nodeId"` // 节点ID + Status uint32 `field:"status"` // 状态码 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + Content string `field:"content"` // 日志内容 + Day string `field:"day"` // 日期Ymd +} + +type HTTPAccessLogOperator struct { + Id interface{} // ID + ServerId interface{} // 服务ID + NodeId interface{} // 节点ID + Status interface{} // 状态码 + CreatedAt interface{} // 创建时间 + Content interface{} // 日志内容 + Day interface{} // 日期Ymd +} + +func NewHTTPAccessLogOperator() *HTTPAccessLogOperator { + return &HTTPAccessLogOperator{} +} diff --git a/internal/db/models/http_access_log_model_ext.go b/internal/db/models/http_access_log_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/http_access_log_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/node_model.go b/internal/db/models/node_model.go index 3dde6665..338b8c85 100644 --- a/internal/db/models/node_model.go +++ b/internal/db/models/node_model.go @@ -22,6 +22,7 @@ type Node struct { InstallStatus string `field:"installStatus"` // 安装状态 State uint8 `field:"state"` // 状态 ConnectedAPINodes string `field:"connectedAPINodes"` // 当前连接的API节点 + MaxCPU uint32 `field:"maxCPU"` // 可以使用的最多CPU } type NodeOperator struct { @@ -45,6 +46,7 @@ type NodeOperator struct { InstallStatus interface{} // 安装状态 State interface{} // 状态 ConnectedAPINodes interface{} // 当前连接的API节点 + MaxCPU interface{} // 可以使用的最多CPU } func NewNodeOperator() *NodeOperator { diff --git a/internal/db/models/server_dao_test.go b/internal/db/models/server_dao_test.go index d0f1842b..c8580793 100644 --- a/internal/db/models/server_dao_test.go +++ b/internal/db/models/server_dao_test.go @@ -26,7 +26,7 @@ func TestServerDAO_UpdateServerConfig(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = SharedServerDAO.UpdateServerConfig(1, configJSON) + _, err = SharedServerDAO.UpdateServerConfig(1, configJSON, false) if err != nil { t.Fatal(err) } diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 295ac495..cef98927 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -165,6 +165,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterHTTPFirewallRuleSetServiceServer(rpcServer, &services.HTTPFirewallRuleSetService{}) pb.RegisterDBNodeServiceServer(rpcServer, &services.DBNodeService{}) pb.RegisterNodeLogServiceServer(rpcServer, &services.NodeLogService{}) + pb.RegisterHTTPAccessLogServiceServer(rpcServer, &services.HTTPAccessLogService{}) err := rpcServer.Serve(listener) if err != nil { return errors.New("[API]start rpc failed: " + err.Error()) diff --git a/internal/rpc/services/service_http_access_log.go b/internal/rpc/services/service_http_access_log.go new file mode 100644 index 00000000..35e6d2aa --- /dev/null +++ b/internal/rpc/services/service_http_access_log.go @@ -0,0 +1,31 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// 访问日志相关服务 +type HTTPAccessLogService struct { +} + +// 创建访问日志 +func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) + if err != nil { + return nil, err + } + + if len(req.AccessLogs) == 0 { + return &pb.CreateHTTPAccessLogsResponse{}, nil + } + + err = models.CreateHTTPAccessLogs(req.AccessLogs) + if err != nil { + return nil, err + } + + return &pb.CreateHTTPAccessLogsResponse{}, nil +}