diff --git a/internal/configs/api_config.go b/internal/configs/api_config.go index 2387f75e..d2c20c88 100644 --- a/internal/configs/api_config.go +++ b/internal/configs/api_config.go @@ -1,12 +1,14 @@ package configs import ( + "fmt" "github.com/go-yaml/yaml" "github.com/iwind/TeaGo/Tea" "io/ioutil" ) var sharedAPIConfig *APIConfig = nil +var PaddingId string // API节点配置 type APIConfig struct { @@ -43,6 +45,7 @@ func SharedAPIConfig() (*APIConfig, error) { // 设置数字ID func (this *APIConfig) SetNumberId(numberId int64) { this.numberId = numberId + PaddingId = fmt.Sprintf("%08d", numberId) } // 获取数字ID diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index 3e1e79cb..1fca1849 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -13,18 +13,23 @@ import ( "time" ) -var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB -var accessLogDAOMapping = map[int64]*HTTPAccessLogDAO{} // dbNodeId => DAO +var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB +var accessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO var accessLogLocker = &sync.RWMutex{} var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true +type HTTPAccessLogDAOWrapper struct { + DAO *HTTPAccessLogDAO + NodeId int64 +} + func init() { initializer := NewDBNodeInitializer() go initializer.Start() } // 获取获取DAO -func randomAccessLogDAO() (dao *HTTPAccessLogDAO) { +func randomAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) { accessLogLocker.RLock() if len(accessLogDAOMapping) == 0 { dao = nil @@ -38,6 +43,31 @@ func randomAccessLogDAO() (dao *HTTPAccessLogDAO) { return } +// 检查表格是否存在 +func findAccessLogTableName(db *dbs.DB, day string) (string, bool, error) { + config, err := db.Config() + if err != nil { + return "", false, err + } + + tableName := "edgeHTTPAccessLogs_" + day + cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn))) + + accessLogLocker.RLock() + _, ok := accessLogTableMapping[cacheKey] + accessLogLocker.RUnlock() + if ok { + return tableName, true, nil + } + + tableNames, err := db.TableNames() + if err != nil { + return tableName, false, err + } + + return tableName, lists.ContainsString(tableNames, tableName), nil +} + // 根据日期获取表名 func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { config, err := db.Config() @@ -70,7 +100,7 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { } // 创建表格 - _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `day` varchar(8) DEFAULT NULL COMMENT '日期Ymd',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;") + _, err = db.Exec("CREATE TABLE `" + tableName + "` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;") if err != nil { return tableName, err } @@ -208,7 +238,10 @@ func (this *DBNodeInitializer) loop() error { dao := &HTTPAccessLogDAO{ DAOObject: daoObject, } - accessLogDAOMapping[nodeId] = dao + accessLogDAOMapping[nodeId] = &HTTPAccessLogDAOWrapper{ + DAO: dao, + NodeId: nodeId, + } accessLogLocker.Unlock() } } diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index cac285b3..c4bf0b29 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -2,13 +2,19 @@ package models import ( "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/logs" timeutil "github.com/iwind/TeaGo/utils/time" + "sort" + "strconv" "strings" + "sync" "time" ) @@ -28,23 +34,28 @@ func NewHTTPAccessLogDAO() *HTTPAccessLogDAO { } // 创建访问日志 -func CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error { +func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error { dao := randomAccessLogDAO() if dao == nil { - dao = SharedHTTPAccessLogDAO + dao = &HTTPAccessLogDAOWrapper{ + DAO: SharedHTTPAccessLogDAO, + NodeId: 0, + } } - return CreateHTTPAccessLogsWithDAO(dao, accessLogs) + return this.CreateHTTPAccessLogsWithDAO(dao, accessLogs) } // 使用特定的DAO创建访问日志 -func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAccessLog) error { - if dao == nil { +func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(daoWrapper *HTTPAccessLogDAOWrapper, accessLogs []*pb.HTTPAccessLog) error { + if daoWrapper == nil { return errors.New("dao should not be nil") } if len(accessLogs) == 0 { return nil } + dao := daoWrapper.DAO + // TODO 改成事务批量提交,以加快速度 for _, accessLog := range accessLogs { @@ -59,7 +70,7 @@ func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAcc fields["nodeId"] = accessLog.NodeId fields["status"] = accessLog.Status fields["createdAt"] = accessLog.Timestamp - fields["day"] = day + fields["requestId"] = accessLog.RequestId + strconv.FormatInt(time.Now().UnixNano(), 10) + configs.PaddingId content, err := json.Marshal(accessLog) if err != nil { @@ -91,3 +102,137 @@ func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAcc return nil } + +// 读取往前的 单页访问日志 +func (this *HTTPAccessLogDAO) ListAccessLogs(lastRequestId string, size int64, day string, serverId int64, reverse bool) (result []*HTTPAccessLog, nextLastRequestId string, hasMore bool, err error) { + if len(day) != 8 { + return + } + + // 限制能查询的最大条数,防止占用内存过多 + if size > 1000 { + size = 1000 + } + + result, nextLastRequestId, err = this.listAccessLogs(lastRequestId, size, day, serverId, reverse) + if err != nil || int64(len(result)) < size { + return + } + + moreResult, _, _ := this.listAccessLogs(nextLastRequestId, 1, day, serverId, reverse) + hasMore = len(moreResult) > 0 + return +} + +// 读取往前的单页访问日志 +func (this *HTTPAccessLogDAO) listAccessLogs(lastRequestId string, size int64, day string, serverId int64, reverse bool) (result []*HTTPAccessLog, nextLastRequestId string, err error) { + if size <= 0 { + return nil, lastRequestId, nil + } + + accessLogLocker.RLock() + daoList := []*HTTPAccessLogDAOWrapper{} + for _, daoWrapper := range accessLogDAOMapping { + daoList = append(daoList, daoWrapper) + } + accessLogLocker.RUnlock() + + if len(daoList) == 0 { + daoList = []*HTTPAccessLogDAOWrapper{{ + DAO: SharedHTTPAccessLogDAO, + NodeId: 0, + }} + } + + locker := sync.Mutex{} + + count := len(daoList) + wg := &sync.WaitGroup{} + wg.Add(count) + for _, daoWrapper := range daoList { + go func(daoWrapper *HTTPAccessLogDAOWrapper) { + defer wg.Done() + + dao := daoWrapper.DAO + + tableName, exists, err := findAccessLogTableName(dao.Instance, day) + if !exists { + // 表格不存在则跳过 + return + } + if err != nil { + logs.Println("[DB_NODE]" + err.Error()) + return + } + + query := dao.Query() + + // 条件 + if serverId > 0 { + query.Attr("serverId", serverId) + } + + // offset + if len(lastRequestId) > 0 { + if !reverse { + query.Where("requestId<:requestId"). + Param("requestId", lastRequestId) + } else { + query.Where("requestId>:requestId"). + Param("requestId", lastRequestId) + } + } + + if !reverse { + query.Desc("requestId") + } else { + query.Asc("requestId") + } + + // 开始查询 + ones, err := query. + Table(tableName). + Limit(size). + FindAll() + if err != nil { + logs.Println("[DB_NODE]" + err.Error()) + return + } + locker.Lock() + for _, one := range ones { + accessLog := one.(*HTTPAccessLog) + result = append(result, accessLog) + } + locker.Unlock() + }(daoWrapper) + } + wg.Wait() + + if len(result) == 0 { + return nil, lastRequestId, nil + } + + // 按照requestId排序 + sort.Slice(result, func(i, j int) bool { + if !reverse { + return result[i].RequestId > result[j].RequestId + } else { + return result[i].RequestId < result[j].RequestId + } + }) + + if int64(len(result)) > size { + result = result[:size] + } + + requestId := result[len(result)-1].RequestId + if reverse { + lists.Reverse(result) + } + + if !reverse { + return result, requestId, nil + } else { + return result, requestId, nil + } +} diff --git a/internal/db/models/http_access_log_dao_test.go b/internal/db/models/http_access_log_dao_test.go index 07693a11..472d5481 100644 --- a/internal/db/models/http_access_log_dao_test.go +++ b/internal/db/models/http_access_log_dao_test.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" _ "github.com/iwind/TeaGo/bootstrap" + timeutil "github.com/iwind/TeaGo/utils/time" "testing" "time" ) @@ -22,9 +23,114 @@ func TestCreateHTTPAccessLogs(t *testing.T) { } dao := randomAccessLogDAO() t.Log("dao:", dao) - err = CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog}) + err = SharedHTTPAccessLogDAO.CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog}) if err != nil { t.Fatal(err) } t.Log("ok") } + +func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) { + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs("", 10, timeutil.Format("Ymd"), 0, false) + if err != nil { + t.Fatal(err) + } + t.Log("requestId:", requestId, "hasMore:", hasMore) + if len(accessLogs) == 0 { + t.Log("no access logs yet") + return + } + for _, accessLog := range accessLogs { + t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt))) + } +} + +func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) { + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + lastRequestId := "" + + times := 0 // 防止循环次数太多 + for { + before := time.Now() + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(lastRequestId, 2, timeutil.Format("Ymd"), 0, false) + cost := time.Since(before).Seconds() + if err != nil { + t.Fatal(err) + } + lastRequestId = requestId + if len(accessLogs) == 0 { + break + } + t.Log("===") + t.Log("requestId:", requestId[:10]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms") + for _, accessLog := range accessLogs { + t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt))) + } + + times++ + if times > 10 { + break + } + } +} + +func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) { + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + before := time.Now() + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs("16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true) + cost := time.Since(before).Seconds() + if err != nil { + t.Fatal(err) + } + t.Log("===") + t.Log("requestId:", requestId[:19]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms") + if len(accessLogs) > 0 { + t.Log("accessLog:", accessLogs[0].RequestId[:19]+"...", len(accessLogs[0].RequestId)) + } +} + +func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) { + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + lastRequestId := "" + + times := 0 // 防止循环次数太多 + for { + before := time.Now() + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false) + cost := time.Since(before).Seconds() + if err != nil { + t.Fatal(err) + } + lastRequestId = requestId + if len(accessLogs) == 0 { + break + } + t.Log("===") + t.Log("requestId:", requestId[:10]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms") + for _, accessLog := range accessLogs { + t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt))) + } + + times++ + if times > 10 { + break + } + } +} diff --git a/internal/db/models/http_access_log_model.go b/internal/db/models/http_access_log_model.go index 3dcebe7a..09d243d0 100644 --- a/internal/db/models/http_access_log_model.go +++ b/internal/db/models/http_access_log_model.go @@ -8,7 +8,7 @@ type HTTPAccessLog struct { Status uint32 `field:"status"` // 状态码 CreatedAt uint64 `field:"createdAt"` // 创建时间 Content string `field:"content"` // 日志内容 - Day string `field:"day"` // 日期Ymd + RequestId string `field:"requestId"` // 请求ID } type HTTPAccessLogOperator struct { @@ -18,7 +18,7 @@ type HTTPAccessLogOperator struct { Status interface{} // 状态码 CreatedAt interface{} // 创建时间 Content interface{} // 日志内容 - Day interface{} // 日期Ymd + RequestId interface{} // 请求ID } func NewHTTPAccessLogOperator() *HTTPAccessLogOperator { diff --git a/internal/db/models/http_access_log_model_ext.go b/internal/db/models/http_access_log_model_ext.go index 2640e7f9..8fe5c052 100644 --- a/internal/db/models/http_access_log_model_ext.go +++ b/internal/db/models/http_access_log_model_ext.go @@ -1 +1,17 @@ package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// 转换成PB对象 +func (this *HTTPAccessLog) ToPB() (*pb.HTTPAccessLog, error) { + p := &pb.HTTPAccessLog{} + err := json.Unmarshal([]byte(this.Content), p) + if err != nil { + return nil, err + } + p.RequestId = this.RequestId + return p, nil +} diff --git a/internal/rpc/services/service_http_access_log.go b/internal/rpc/services/service_http_access_log.go index 35e6d2aa..7af53a23 100644 --- a/internal/rpc/services/service_http_access_log.go +++ b/internal/rpc/services/service_http_access_log.go @@ -13,6 +13,7 @@ type HTTPAccessLogService struct { // 创建访问日志 func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) { + // 校验请求 _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) if err != nil { return nil, err @@ -22,10 +23,39 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req return &pb.CreateHTTPAccessLogsResponse{}, nil } - err = models.CreateHTTPAccessLogs(req.AccessLogs) + err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(req.AccessLogs) if err != nil { return nil, err } return &pb.CreateHTTPAccessLogsResponse{}, nil } + +// 列出单页访问日志 +func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(req.RequestId, req.Size, req.Day, req.ServerId, req.Reverse) + if err != nil { + return nil, err + } + + result := []*pb.HTTPAccessLog{} + for _, accessLog := range accessLogs { + a, err := accessLog.ToPB() + if err != nil { + return nil, err + } + result = append(result, a) + } + + return &pb.ListHTTPAccessLogsResponse{ + AccessLogs: result, + HasMore: hasMore, + RequestId: requestId, + }, nil +}