mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-12-11 12:50:25 +08:00
服务访问日志改成通过事务写入,以提升写入速度
This commit is contained in:
@@ -39,9 +39,10 @@ var SharedHTTPAccessLogDAO *HTTPAccessLogDAO
|
||||
var (
|
||||
oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
|
||||
accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
|
||||
accessLogQueueMaxLength = 100_000
|
||||
accessLogQueueMaxLength = 100_000 //队列最大长度
|
||||
accessLogQueuePercent = 100 // 0-100
|
||||
accessLogCountPerSecond = 10_000 // 0 表示不限制
|
||||
accessLogCountPerSecond = 10_000 // 每秒钟写入条数,0 表示不限制
|
||||
accessLogPerTx = 100 // 单事务写入条数
|
||||
accessLogConfigJSON = []byte{}
|
||||
accessLogQueueChanged = make(chan zero.Zero, 1)
|
||||
|
||||
@@ -85,10 +86,17 @@ func init() {
|
||||
goman.New(func() {
|
||||
var ticker = time.NewTicker(1 * time.Second)
|
||||
for range ticker.C {
|
||||
var tx *dbs.Tx
|
||||
err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(tx, accessLogCountPerSecond)
|
||||
var countTxs = accessLogCountPerSecond / accessLogPerTx
|
||||
if countTxs <= 0 {
|
||||
countTxs = 1
|
||||
}
|
||||
for i := 0; i < countTxs; i++ {
|
||||
hasMore, err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(accessLogPerTx)
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error())
|
||||
} else if !hasMore {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -132,7 +140,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.
|
||||
}
|
||||
|
||||
// DumpAccessLogsFromQueue 从队列导入访问日志
|
||||
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error {
|
||||
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(size int) (hasMore bool, err error) {
|
||||
var dao = randomHTTPAccessLogDAO()
|
||||
if dao == nil {
|
||||
dao = &HTTPAccessLogDAOWrapper{
|
||||
@@ -141,6 +149,19 @@ func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) erro
|
||||
}
|
||||
}
|
||||
|
||||
if len(oldAccessLogQueue) == 0 && len(accessLogQueue) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// 开始事务
|
||||
tx, err := dao.DAO.Instance.Begin()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer func() {
|
||||
_ = tx.Commit()
|
||||
}()
|
||||
|
||||
if size <= 0 {
|
||||
size = 1_000_000
|
||||
}
|
||||
@@ -149,6 +170,8 @@ func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) erro
|
||||
var oldQueue = oldAccessLogQueue
|
||||
var newQueue = accessLogQueue
|
||||
|
||||
hasMore = true
|
||||
|
||||
Loop:
|
||||
for i := 0; i < size; i++ {
|
||||
// old
|
||||
@@ -156,7 +179,7 @@ Loop:
|
||||
case accessLog := <-oldQueue:
|
||||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
continue Loop
|
||||
default:
|
||||
@@ -168,15 +191,16 @@ Loop:
|
||||
case accessLog := <-newQueue:
|
||||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
continue Loop
|
||||
default:
|
||||
hasMore = false
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return hasMore, nil
|
||||
}
|
||||
|
||||
// CreateHTTPAccessLog 写入单条访问日志
|
||||
|
||||
@@ -21,13 +21,13 @@ func TestCreateHTTPAccessLog(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
accessLog := &pb.HTTPAccessLog{
|
||||
var accessLog = &pb.HTTPAccessLog{
|
||||
ServerId: 1,
|
||||
NodeId: 4,
|
||||
Status: 200,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
dao := randomHTTPAccessLogDAO()
|
||||
var dao = randomHTTPAccessLogDAO()
|
||||
t.Log("dao:", dao)
|
||||
|
||||
// 先初始化
|
||||
@@ -37,12 +37,59 @@ func TestCreateHTTPAccessLog(t *testing.T) {
|
||||
defer func() {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
err = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestCreateHTTPAccessLog_Tx(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
|
||||
err := NewDBNodeInitializer().loop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var accessLog = &pb.HTTPAccessLog{
|
||||
ServerId: 1,
|
||||
NodeId: 4,
|
||||
Status: 200,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
var dao = randomHTTPAccessLogDAO()
|
||||
t.Log("dao:", dao)
|
||||
|
||||
// 先初始化
|
||||
_ = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
|
||||
var before = time.Now()
|
||||
defer func() {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
tx, err = dao.DAO.Instance.Begin()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 1000; i++ {
|
||||
err = SharedHTTPAccessLogDAO.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user