实现实时展示访问日志

This commit is contained in:
GoEdgeLab
2020-10-10 19:21:32 +08:00
parent a95d4cbbfa
commit 911e57b1f2
7 changed files with 348 additions and 15 deletions

View File

@@ -1,12 +1,14 @@
package configs package configs
import ( import (
"fmt"
"github.com/go-yaml/yaml" "github.com/go-yaml/yaml"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"io/ioutil" "io/ioutil"
) )
var sharedAPIConfig *APIConfig = nil var sharedAPIConfig *APIConfig = nil
var PaddingId string
// API节点配置 // API节点配置
type APIConfig struct { type APIConfig struct {
@@ -43,6 +45,7 @@ func SharedAPIConfig() (*APIConfig, error) {
// 设置数字ID // 设置数字ID
func (this *APIConfig) SetNumberId(numberId int64) { func (this *APIConfig) SetNumberId(numberId int64) {
this.numberId = numberId this.numberId = numberId
PaddingId = fmt.Sprintf("%08d", numberId)
} }
// 获取数字ID // 获取数字ID

View File

@@ -14,17 +14,22 @@ import (
) )
var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB
var accessLogDAOMapping = map[int64]*HTTPAccessLogDAO{} // dbNodeId => DAO var accessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO
var accessLogLocker = &sync.RWMutex{} var accessLogLocker = &sync.RWMutex{}
var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
type HTTPAccessLogDAOWrapper struct {
DAO *HTTPAccessLogDAO
NodeId int64
}
func init() { func init() {
initializer := NewDBNodeInitializer() initializer := NewDBNodeInitializer()
go initializer.Start() go initializer.Start()
} }
// 获取获取DAO // 获取获取DAO
func randomAccessLogDAO() (dao *HTTPAccessLogDAO) { func randomAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
accessLogLocker.RLock() accessLogLocker.RLock()
if len(accessLogDAOMapping) == 0 { if len(accessLogDAOMapping) == 0 {
dao = nil dao = nil
@@ -38,6 +43,31 @@ func randomAccessLogDAO() (dao *HTTPAccessLogDAO) {
return return
} }
// 检查表格是否存在
func findAccessLogTableName(db *dbs.DB, day string) (string, bool, error) {
config, err := db.Config()
if err != nil {
return "", false, err
}
tableName := "edgeHTTPAccessLogs_" + day
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
accessLogLocker.RLock()
_, ok := accessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, true, nil
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, false, err
}
return tableName, lists.ContainsString(tableNames, tableName), nil
}
// 根据日期获取表名 // 根据日期获取表名
func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
config, err := db.Config() config, err := db.Config()
@@ -70,7 +100,7 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
} }
// 创建表格 // 创建表格
_, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `day` varchar(8) DEFAULT NULL COMMENT '日期Ymd',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;") _, err = db.Exec("CREATE TABLE `" + tableName + "` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;")
if err != nil { if err != nil {
return tableName, err return tableName, err
} }
@@ -208,7 +238,10 @@ func (this *DBNodeInitializer) loop() error {
dao := &HTTPAccessLogDAO{ dao := &HTTPAccessLogDAO{
DAOObject: daoObject, DAOObject: daoObject,
} }
accessLogDAOMapping[nodeId] = dao accessLogDAOMapping[nodeId] = &HTTPAccessLogDAOWrapper{
DAO: dao,
NodeId: nodeId,
}
accessLogLocker.Unlock() accessLogLocker.Unlock()
} }
} }

View File

@@ -2,13 +2,19 @@ package models
import ( import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"sort"
"strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@@ -28,23 +34,28 @@ func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
} }
// 创建访问日志 // 创建访问日志
func CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error { func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error {
dao := randomAccessLogDAO() dao := randomAccessLogDAO()
if dao == nil { if dao == nil {
dao = SharedHTTPAccessLogDAO dao = &HTTPAccessLogDAOWrapper{
DAO: SharedHTTPAccessLogDAO,
NodeId: 0,
} }
return CreateHTTPAccessLogsWithDAO(dao, accessLogs) }
return this.CreateHTTPAccessLogsWithDAO(dao, accessLogs)
} }
// 使用特定的DAO创建访问日志 // 使用特定的DAO创建访问日志
func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAccessLog) error { func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(daoWrapper *HTTPAccessLogDAOWrapper, accessLogs []*pb.HTTPAccessLog) error {
if dao == nil { if daoWrapper == nil {
return errors.New("dao should not be nil") return errors.New("dao should not be nil")
} }
if len(accessLogs) == 0 { if len(accessLogs) == 0 {
return nil return nil
} }
dao := daoWrapper.DAO
// TODO 改成事务批量提交,以加快速度 // TODO 改成事务批量提交,以加快速度
for _, accessLog := range accessLogs { for _, accessLog := range accessLogs {
@@ -59,7 +70,7 @@ func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAcc
fields["nodeId"] = accessLog.NodeId fields["nodeId"] = accessLog.NodeId
fields["status"] = accessLog.Status fields["status"] = accessLog.Status
fields["createdAt"] = accessLog.Timestamp fields["createdAt"] = accessLog.Timestamp
fields["day"] = day fields["requestId"] = accessLog.RequestId + strconv.FormatInt(time.Now().UnixNano(), 10) + configs.PaddingId
content, err := json.Marshal(accessLog) content, err := json.Marshal(accessLog)
if err != nil { if err != nil {
@@ -91,3 +102,137 @@ func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAcc
return nil return nil
} }
// 读取往前的 单页访问日志
func (this *HTTPAccessLogDAO) ListAccessLogs(lastRequestId string, size int64, day string, serverId int64, reverse bool) (result []*HTTPAccessLog, nextLastRequestId string, hasMore bool, err error) {
if len(day) != 8 {
return
}
// 限制能查询的最大条数,防止占用内存过多
if size > 1000 {
size = 1000
}
result, nextLastRequestId, err = this.listAccessLogs(lastRequestId, size, day, serverId, reverse)
if err != nil || int64(len(result)) < size {
return
}
moreResult, _, _ := this.listAccessLogs(nextLastRequestId, 1, day, serverId, reverse)
hasMore = len(moreResult) > 0
return
}
// 读取往前的单页访问日志
func (this *HTTPAccessLogDAO) listAccessLogs(lastRequestId string, size int64, day string, serverId int64, reverse bool) (result []*HTTPAccessLog, nextLastRequestId string, err error) {
if size <= 0 {
return nil, lastRequestId, nil
}
accessLogLocker.RLock()
daoList := []*HTTPAccessLogDAOWrapper{}
for _, daoWrapper := range accessLogDAOMapping {
daoList = append(daoList, daoWrapper)
}
accessLogLocker.RUnlock()
if len(daoList) == 0 {
daoList = []*HTTPAccessLogDAOWrapper{{
DAO: SharedHTTPAccessLogDAO,
NodeId: 0,
}}
}
locker := sync.Mutex{}
count := len(daoList)
wg := &sync.WaitGroup{}
wg.Add(count)
for _, daoWrapper := range daoList {
go func(daoWrapper *HTTPAccessLogDAOWrapper) {
defer wg.Done()
dao := daoWrapper.DAO
tableName, exists, err := findAccessLogTableName(dao.Instance, day)
if !exists {
// 表格不存在则跳过
return
}
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
query := dao.Query()
// 条件
if serverId > 0 {
query.Attr("serverId", serverId)
}
// offset
if len(lastRequestId) > 0 {
if !reverse {
query.Where("requestId<:requestId").
Param("requestId", lastRequestId)
} else {
query.Where("requestId>:requestId").
Param("requestId", lastRequestId)
}
}
if !reverse {
query.Desc("requestId")
} else {
query.Asc("requestId")
}
// 开始查询
ones, err := query.
Table(tableName).
Limit(size).
FindAll()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
locker.Lock()
for _, one := range ones {
accessLog := one.(*HTTPAccessLog)
result = append(result, accessLog)
}
locker.Unlock()
}(daoWrapper)
}
wg.Wait()
if len(result) == 0 {
return nil, lastRequestId, nil
}
// 按照requestId排序
sort.Slice(result, func(i, j int) bool {
if !reverse {
return result[i].RequestId > result[j].RequestId
} else {
return result[i].RequestId < result[j].RequestId
}
})
if int64(len(result)) > size {
result = result[:size]
}
requestId := result[len(result)-1].RequestId
if reverse {
lists.Reverse(result)
}
if !reverse {
return result, requestId, nil
} else {
return result, requestId, nil
}
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap" _ "github.com/iwind/TeaGo/bootstrap"
timeutil "github.com/iwind/TeaGo/utils/time"
"testing" "testing"
"time" "time"
) )
@@ -22,9 +23,114 @@ func TestCreateHTTPAccessLogs(t *testing.T) {
} }
dao := randomAccessLogDAO() dao := randomAccessLogDAO()
t.Log("dao:", dao) t.Log("dao:", dao)
err = CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog}) err = SharedHTTPAccessLogDAO.CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("ok") t.Log("ok")
} }
func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) {
err := NewDBNodeInitializer().loop()
if err != nil {
t.Fatal(err)
}
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs("", 10, timeutil.Format("Ymd"), 0, false)
if err != nil {
t.Fatal(err)
}
t.Log("requestId:", requestId, "hasMore:", hasMore)
if len(accessLogs) == 0 {
t.Log("no access logs yet")
return
}
for _, accessLog := range accessLogs {
t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt)))
}
}
func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) {
err := NewDBNodeInitializer().loop()
if err != nil {
t.Fatal(err)
}
lastRequestId := ""
times := 0 // 防止循环次数太多
for {
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(lastRequestId, 2, timeutil.Format("Ymd"), 0, false)
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)
}
lastRequestId = requestId
if len(accessLogs) == 0 {
break
}
t.Log("===")
t.Log("requestId:", requestId[:10]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms")
for _, accessLog := range accessLogs {
t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt)))
}
times++
if times > 10 {
break
}
}
}
func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) {
err := NewDBNodeInitializer().loop()
if err != nil {
t.Fatal(err)
}
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs("16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true)
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)
}
t.Log("===")
t.Log("requestId:", requestId[:19]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms")
if len(accessLogs) > 0 {
t.Log("accessLog:", accessLogs[0].RequestId[:19]+"...", len(accessLogs[0].RequestId))
}
}
func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) {
err := NewDBNodeInitializer().loop()
if err != nil {
t.Fatal(err)
}
lastRequestId := ""
times := 0 // 防止循环次数太多
for {
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false)
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)
}
lastRequestId = requestId
if len(accessLogs) == 0 {
break
}
t.Log("===")
t.Log("requestId:", requestId[:10]+"...", "hasMore:", hasMore, "cost:", cost*1000, "ms")
for _, accessLog := range accessLogs {
t.Log(accessLog.Id, accessLog.CreatedAt, timeutil.FormatTime("H:i:s", int64(accessLog.CreatedAt)))
}
times++
if times > 10 {
break
}
}
}

View File

@@ -8,7 +8,7 @@ type HTTPAccessLog struct {
Status uint32 `field:"status"` // 状态码 Status uint32 `field:"status"` // 状态码
CreatedAt uint64 `field:"createdAt"` // 创建时间 CreatedAt uint64 `field:"createdAt"` // 创建时间
Content string `field:"content"` // 日志内容 Content string `field:"content"` // 日志内容
Day string `field:"day"` // 日期Ymd RequestId string `field:"requestId"` // 请求ID
} }
type HTTPAccessLogOperator struct { type HTTPAccessLogOperator struct {
@@ -18,7 +18,7 @@ type HTTPAccessLogOperator struct {
Status interface{} // 状态码 Status interface{} // 状态码
CreatedAt interface{} // 创建时间 CreatedAt interface{} // 创建时间
Content interface{} // 日志内容 Content interface{} // 日志内容
Day interface{} // 日期Ymd RequestId interface{} // 请求ID
} }
func NewHTTPAccessLogOperator() *HTTPAccessLogOperator { func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {

View File

@@ -1 +1,17 @@
package models package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 转换成PB对象
func (this *HTTPAccessLog) ToPB() (*pb.HTTPAccessLog, error) {
p := &pb.HTTPAccessLog{}
err := json.Unmarshal([]byte(this.Content), p)
if err != nil {
return nil, err
}
p.RequestId = this.RequestId
return p, nil
}

View File

@@ -13,6 +13,7 @@ type HTTPAccessLogService struct {
// 创建访问日志 // 创建访问日志
func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) { func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) {
// 校验请求
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -22,10 +23,39 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req
return &pb.CreateHTTPAccessLogsResponse{}, nil return &pb.CreateHTTPAccessLogsResponse{}, nil
} }
err = models.CreateHTTPAccessLogs(req.AccessLogs) err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(req.AccessLogs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pb.CreateHTTPAccessLogsResponse{}, nil return &pb.CreateHTTPAccessLogsResponse{}, nil
} }
// 列出单页访问日志
func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) {
// 校验请求
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
if err != nil {
return nil, err
}
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(req.RequestId, req.Size, req.Day, req.ServerId, req.Reverse)
if err != nil {
return nil, err
}
result := []*pb.HTTPAccessLog{}
for _, accessLog := range accessLogs {
a, err := accessLog.ToPB()
if err != nil {
return nil, err
}
result = append(result, a)
}
return &pb.ListHTTPAccessLogsResponse{
AccessLogs: result,
HasMore: hasMore,
RequestId: requestId,
}, nil
}