diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index 07289018..f7d7b218 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -1,16 +1,23 @@ package models import ( + "bytes" "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/goman" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeAPI/internal/zero" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" + "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "net" @@ -26,10 +33,53 @@ type HTTPAccessLogDAO dbs.DAO var SharedHTTPAccessLogDAO *HTTPAccessLogDAO +// 队列 +var oldAccessLogQueue = make(chan *pb.HTTPAccessLog) +var accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000) +var accessLogQueueMaxLength = 100_000 +var accessLogQueuePercent = 100 // 0-100 +var accessLogCountPerSecond = 10_000 // 0 表示不限制 +var accessLogConfigJSON = []byte{} +var accessLogQueueChanged = make(chan zero.Zero, 1) + func init() { dbs.OnReady(func() { SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO() }) + + // 队列相关 + dbs.OnReadyDone(func() { + // 检查队列变化 + goman.New(func() { + var ticker = time.NewTicker(60 * time.Second) + + // 先执行一次初始化 + SharedHTTPAccessLogDAO.SetupQueue() + + // 循环执行 + for { + select { + case <-ticker.C: + SharedHTTPAccessLogDAO.SetupQueue() + case <-accessLogQueueChanged: + SharedHTTPAccessLogDAO.SetupQueue() + } + } + }) + + // 导出队列内容 + goman.New(func() { + var ticker = time.NewTicker(1 * time.Second) + for range ticker.C { + var tx *dbs.Tx + err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(tx, accessLogCountPerSecond) + if err != nil { + remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error()) + } + } + }) + }) + } func NewHTTPAccessLogDAO() *HTTPAccessLogDAO { @@ -45,6 +95,31 @@ func NewHTTPAccessLogDAO() *HTTPAccessLogDAO { // CreateHTTPAccessLogs 创建访问日志 func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.HTTPAccessLog) error { + // 写入队列 + var queue = accessLogQueue // 这样写非常重要,防止在写入过程中队列有切换 + for _, accessLog := range accessLogs { + if accessLog.FirewallPolicyId == 0 { // 如果是WAF记录,则采取采样率 + // 采样率 + if accessLogQueuePercent <= 0 { + return nil + } + if accessLogQueuePercent < 100 && rands.Int(1, 100) > accessLogQueuePercent { + return nil + } + } + + select { + case queue <- accessLog: + default: + // 超出的丢弃 + } + } + + return nil +} + +// DumpAccessLogsFromQueue 从队列导入访问日志 +func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error { dao := randomHTTPAccessLogDAO() if dao == nil { dao = &HTTPAccessLogDAOWrapper{ @@ -52,79 +127,102 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb. NodeId: 0, } } - return this.CreateHTTPAccessLogsWithDAO(tx, dao, accessLogs) + + if size <= 0 { + size = 1_000_000 + } + + // 复制变量,防止中途改变 + var oldQueue = oldAccessLogQueue + var newQueue = accessLogQueue + +Loop: + for i := 0; i < size; i++ { + // old + select { + case accessLog := <-oldQueue: + err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog) + if err != nil { + return err + } + continue Loop + default: + + } + + // new + select { + case accessLog := <-newQueue: + err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog) + if err != nil { + return err + } + continue Loop + default: + break Loop + } + } + + return nil } -// CreateHTTPAccessLogsWithDAO 使用特定的DAO创建访问日志 -func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(tx *dbs.Tx, daoWrapper *HTTPAccessLogDAOWrapper, accessLogs []*pb.HTTPAccessLog) error { - if daoWrapper == nil { - return errors.New("dao should not be nil") - } - if len(accessLogs) == 0 { - return nil +// CreateHTTPAccessLog 写入单条访问日志 +func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error { + day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) + tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false) + if err != nil { + return err } - dao := daoWrapper.DAO + fields := map[string]interface{}{} + fields["serverId"] = accessLog.ServerId + fields["nodeId"] = accessLog.NodeId + fields["status"] = accessLog.Status + fields["createdAt"] = accessLog.Timestamp + fields["requestId"] = accessLog.RequestId + fields["firewallPolicyId"] = accessLog.FirewallPolicyId + fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId + fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId + fields["firewallRuleId"] = accessLog.FirewallRuleId - // TODO 改成事务批量提交,以加快速度 + if len(accessLog.RequestBody) > 0 { + fields["requestBody"] = accessLog.RequestBody + accessLog.RequestBody = nil + } - for _, accessLog := range accessLogs { - day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) - tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false) - if err != nil { - return err - } + if tableDef.HasRemoteAddr { + fields["remoteAddr"] = accessLog.RemoteAddr + } + if tableDef.HasDomain { + fields["domain"] = accessLog.Host + } - fields := map[string]interface{}{} - fields["serverId"] = accessLog.ServerId - fields["nodeId"] = accessLog.NodeId - fields["status"] = accessLog.Status - fields["createdAt"] = accessLog.Timestamp - fields["requestId"] = accessLog.RequestId - fields["firewallPolicyId"] = accessLog.FirewallPolicyId - fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId - fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId - fields["firewallRuleId"] = accessLog.FirewallRuleId + content, err := json.Marshal(accessLog) + if err != nil { + return err + } + fields["content"] = content - if len(accessLog.RequestBody) > 0 { - fields["requestBody"] = accessLog.RequestBody - accessLog.RequestBody = nil - } - - if tableDef.HasRemoteAddr { - fields["remoteAddr"] = accessLog.RemoteAddr - } - if tableDef.HasDomain { - fields["domain"] = accessLog.Host - } - - content, err := json.Marshal(accessLog) - if err != nil { - return err - } - fields["content"] = content - - _, err = dao.Query(tx). - Table(tableDef.Name). - Sets(fields). - Insert() - if err != nil { - // 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试 - if strings.Contains(err.Error(), "1146") { - tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true) - if err != nil { - return err - } - _, err = dao.Query(tx). - Table(tableDef.Name). - Sets(fields). - Insert() - if err != nil { - return err - } - } else { - logs.Println("HTTP_ACCESS_LOG", err.Error()) + _, err = dao.Query(tx). + Table(tableDef.Name). + Sets(fields). + Insert() + if err != nil { + // 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试 + if strings.Contains(err.Error(), "1146") { + tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true) + if err != nil { + return err } + _, err = dao.Query(tx). + Table(tableDef.Name). + Sets(fields). + Insert() + if err != nil { + return err + } + } else { + remotelogs.Error("HTTP_ACCESS_LOG", err.Error()) } } @@ -468,3 +566,42 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s wg.Wait() return result, nil } + +// SetupQueue 建立队列 +func (this *HTTPAccessLogDAO) SetupQueue() { + configJSON, err := SharedSysSettingDAO.ReadSetting(nil, systemconfigs.SettingCodeAccessLogQueue) + if err != nil { + remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "read settings failed: "+err.Error()) + return + } + + if len(configJSON) == 0 { + return + } + + if bytes.Compare(accessLogConfigJSON, configJSON) == 0 { + return + } + accessLogConfigJSON = configJSON + + var config = &serverconfigs.AccessLogQueueConfig{} + err = json.Unmarshal(configJSON, config) + if err != nil { + remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "decode settings failed: "+err.Error()) + return + } + + accessLogQueuePercent = config.Percent + accessLogCountPerSecond = config.CountPerSecond + if config.MaxLength <= 0 { + config.MaxLength = 100_000 + } + + if accessLogQueueMaxLength != config.MaxLength { + accessLogQueueMaxLength = config.MaxLength + oldAccessLogQueue = accessLogQueue + accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength) + } + + remotelogs.Println("HTTP_ACCESS_LOG_QUEUE", "change queue max length: "+types.String(config.MaxLength)+", percent: "+types.String(config.Percent)+", countPerSecond: "+types.String(config.CountPerSecond)) +} diff --git a/internal/db/models/sys_setting_dao.go b/internal/db/models/sys_setting_dao.go index d73f1a5e..d1c5b9da 100644 --- a/internal/db/models/sys_setting_dao.go +++ b/internal/db/models/sys_setting_dao.go @@ -3,6 +3,8 @@ package models import ( "encoding/json" "fmt" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeAPI/internal/zero" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" _ "github.com/go-sql-driver/mysql" @@ -40,6 +42,16 @@ func (this *SysSettingDAO) UpdateSetting(tx *dbs.Tx, codeFormat string, valueJSO countRetries := 3 var lastErr error + + defer func() { + if lastErr == nil { + err := this.NotifyUpdate(tx, codeFormat) + if err != nil { + remotelogs.Error("SysSettingDAO", "notify update failed: "+err.Error()) + } + } + }() + for i := 0; i < countRetries; i++ { settingId, err := this.Query(tx). Attr("code", codeFormat). @@ -61,6 +73,8 @@ func (this *SysSettingDAO) UpdateSetting(tx *dbs.Tx, codeFormat string, valueJSO // 因为错误的原因可能是因为code冲突,所以这里我们继续执行 continue } + + lastErr = nil return nil } @@ -72,6 +86,8 @@ func (this *SysSettingDAO) UpdateSetting(tx *dbs.Tx, codeFormat string, valueJSO if err != nil { return err } + lastErr = nil + break } return lastErr @@ -121,3 +137,12 @@ func (this *SysSettingDAO) ReadGlobalConfig(tx *dbs.Tx) (*serverconfigs.GlobalCo } return config, nil } + +// NotifyUpdate 通知更改 +func (this *SysSettingDAO) NotifyUpdate(tx *dbs.Tx, code string) error { + switch code { + case systemconfigs.SettingCodeAccessLogQueue: + accessLogQueueChanged <- zero.New() + } + return nil +} diff --git a/internal/events/utils.go b/internal/events/utils.go index 8c45e95c..14b02f3a 100644 --- a/internal/events/utils.go +++ b/internal/events/utils.go @@ -5,7 +5,7 @@ import "sync" var eventsMap = map[string][]func(){} // event => []callbacks var locker = sync.Mutex{} -// 增加事件回调 +// On 增加事件回调 func On(event string, callback func()) { locker.Lock() defer locker.Unlock() @@ -15,7 +15,7 @@ func On(event string, callback func()) { eventsMap[event] = callbacks } -// 通知事件 +// Notify 通知事件 func Notify(event string) { locker.Lock() callbacks, _ := eventsMap[event] diff --git a/internal/zero/zero.go b/internal/zero/zero.go new file mode 100644 index 00000000..d5ecb9a7 --- /dev/null +++ b/internal/zero/zero.go @@ -0,0 +1,9 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package zero + +type Zero = struct{} + +func New() Zero { + return Zero{} +} diff --git a/internal/zero/zero_test.go b/internal/zero/zero_test.go new file mode 100644 index 00000000..08cfc262 --- /dev/null +++ b/internal/zero/zero_test.go @@ -0,0 +1,41 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package zero + +import ( + "runtime" + "testing" +) + +func TestZero_Chan(t *testing.T) { + var stat1 = &runtime.MemStats{} + runtime.ReadMemStats(stat1) + + var m = make(chan Zero, 2_000_000) + for i := 0; i < 1_000_000; i++ { + m <- New() + } + + var stat2 = &runtime.MemStats{} + runtime.ReadMemStats(stat2) + t.Log(stat2.HeapInuse, stat1.HeapInuse, stat2.HeapInuse-stat1.HeapInuse, "B") + t.Log(len(m)) +} + +func TestZero_Map(t *testing.T) { + var stat1 = &runtime.MemStats{} + runtime.ReadMemStats(stat1) + + var m = map[int]Zero{} + for i := 0; i < 1_000_000; i++ { + m[i] = New() + } + + var stat2 = &runtime.MemStats{} + runtime.ReadMemStats(stat2) + t.Log((stat2.HeapInuse-stat1.HeapInuse)/1024/1024, "MB") + t.Log(len(m)) + + _, ok := m[1024] + t.Log(ok) +}