diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index 861c3cb4..37d8d714 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -39,9 +39,10 @@ var SharedHTTPAccessLogDAO *HTTPAccessLogDAO var ( oldAccessLogQueue = make(chan *pb.HTTPAccessLog) accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000) - accessLogQueueMaxLength = 100_000 - accessLogQueuePercent = 100 // 0-100 - accessLogCountPerSecond = 10_000 // 0 表示不限制 + accessLogQueueMaxLength = 100_000 //队列最大长度 + accessLogQueuePercent = 100 // 0-100 + accessLogCountPerSecond = 10_000 // 每秒钟写入条数,0 表示不限制 + accessLogPerTx = 100 // 单事务写入条数 accessLogConfigJSON = []byte{} accessLogQueueChanged = make(chan zero.Zero, 1) @@ -85,10 +86,17 @@ func init() { goman.New(func() { var ticker = time.NewTicker(1 * time.Second) for range ticker.C { - var tx *dbs.Tx - err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(tx, accessLogCountPerSecond) - if err != nil { - remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error()) + var countTxs = accessLogCountPerSecond / accessLogPerTx + if countTxs <= 0 { + countTxs = 1 + } + for i := 0; i < countTxs; i++ { + hasMore, err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(accessLogPerTx) + if err != nil { + remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error()) + } else if !hasMore { + break + } } } }) @@ -132,7 +140,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb. } // DumpAccessLogsFromQueue 从队列导入访问日志 -func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error { +func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(size int) (hasMore bool, err error) { var dao = randomHTTPAccessLogDAO() if dao == nil { dao = &HTTPAccessLogDAOWrapper{ @@ -141,6 +149,19 @@ func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) erro } } + if len(oldAccessLogQueue) == 0 && len(accessLogQueue) == 0 { + return false, nil + } + + // 开始事务 + tx, err := dao.DAO.Instance.Begin() + if err != nil { + return false, err + } + defer func() { + _ = tx.Commit() + }() + if size <= 0 { size = 1_000_000 } @@ -149,6 +170,8 @@ func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) erro var oldQueue = oldAccessLogQueue var newQueue = accessLogQueue + hasMore = true + Loop: for i := 0; i < size; i++ { // old @@ -156,7 +179,7 @@ Loop: case accessLog := <-oldQueue: err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog) if err != nil { - return err + return false, err } continue Loop default: @@ -168,15 +191,16 @@ Loop: case accessLog := <-newQueue: err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog) if err != nil { - return err + return false, err } continue Loop default: + hasMore = false break Loop } } - return nil + return hasMore, nil } // CreateHTTPAccessLog 写入单条访问日志 diff --git a/internal/db/models/http_access_log_dao_test.go b/internal/db/models/http_access_log_dao_test.go index d84ffc3a..7ad5533d 100644 --- a/internal/db/models/http_access_log_dao_test.go +++ b/internal/db/models/http_access_log_dao_test.go @@ -21,13 +21,13 @@ func TestCreateHTTPAccessLog(t *testing.T) { t.Fatal(err) } - accessLog := &pb.HTTPAccessLog{ + var accessLog = &pb.HTTPAccessLog{ ServerId: 1, NodeId: 4, Status: 200, Timestamp: time.Now().Unix(), } - dao := randomHTTPAccessLogDAO() + var dao = randomHTTPAccessLogDAO() t.Log("dao:", dao) // 先初始化 @@ -37,12 +37,59 @@ func TestCreateHTTPAccessLog(t *testing.T) { defer func() { t.Log(time.Since(before).Seconds()*1000, "ms") }() + for i := 0; i < 1000; i++ { err = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog) if err != nil { t.Fatal(err) } } + + t.Log("ok") +} + +func TestCreateHTTPAccessLog_Tx(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + + err := NewDBNodeInitializer().loop() + if err != nil { + t.Fatal(err) + } + + var accessLog = &pb.HTTPAccessLog{ + ServerId: 1, + NodeId: 4, + Status: 200, + Timestamp: time.Now().Unix(), + } + var dao = randomHTTPAccessLogDAO() + t.Log("dao:", dao) + + // 先初始化 + _ = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog) + + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + tx, err = dao.DAO.Instance.Begin() + if err != nil { + t.Fatal(err) + } + for i := 0; i < 1000; i++ { + err = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog) + if err != nil { + t.Fatal(err) + } + } + err = tx.Commit() + if err != nil { + t.Fatal(err) + } + t.Log("ok") }