增加对访问日志自动分表配置

This commit is contained in:
刘祥超
2022-03-09 10:01:24 +08:00
parent 85a46a9827
commit 42a6494bde
4 changed files with 61 additions and 5 deletions

View File

@@ -44,7 +44,7 @@ var (
accessLogQueueChanged = make(chan zero.Zero, 1) accessLogQueueChanged = make(chan zero.Zero, 1)
accessLogEnableAutoPartial = true // 是否启用自动分表 accessLogEnableAutoPartial = true // 是否启用自动分表
accessLogPartialRows int64 = 500_000 // 自动分表的单表最大值 accessLogRowsPerTable int64 = 500_000 // 自动分表的单表最大值
) )
type accessLogTableQuery struct { type accessLogTableQuery struct {
@@ -224,7 +224,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog
return err return err
} }
if lastId%accessLogPartialRows == 0 { if accessLogEnableAutoPartial && accessLogRowsPerTable > 0 && lastId%accessLogRowsPerTable == 0 {
SharedHTTPAccessLogManager.ResetTable(dao.Instance, day) SharedHTTPAccessLogManager.ResetTable(dao.Instance, day)
} }
@@ -652,11 +652,18 @@ func (this *HTTPAccessLogDAO) SetupQueue() {
config.MaxLength = 100_000 config.MaxLength = 100_000
} }
accessLogEnableAutoPartial = config.EnableAutoPartial
if config.RowsPerTable > 0 {
accessLogRowsPerTable = config.RowsPerTable
}
if accessLogQueueMaxLength != config.MaxLength { if accessLogQueueMaxLength != config.MaxLength {
accessLogQueueMaxLength = config.MaxLength accessLogQueueMaxLength = config.MaxLength
oldAccessLogQueue = accessLogQueue oldAccessLogQueue = accessLogQueue
accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength) accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength)
} }
remotelogs.Println("HTTP_ACCESS_LOG_QUEUE", "change queue max length: "+types.String(config.MaxLength)+", percent: "+types.String(config.Percent)+", countPerSecond: "+types.String(config.CountPerSecond)) if Tea.IsTesting() {
remotelogs.Println("HTTP_ACCESS_LOG_QUEUE", "change queue "+string(configJSON))
}
} }

View File

@@ -214,7 +214,7 @@ func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string,
} }
var lastTableName = tableNames[len(tableNames)-1] var lastTableName = tableNames[len(tableNames)-1]
if !force || !accessLogEnableAutoPartial { if !force || !accessLogEnableAutoPartial || accessLogRowsPerTable <= 0 {
hasRemoteAddrField, hasDomainField, err := this.checkTableFields(db, lastTableName) hasRemoteAddrField, hasDomainField, err := this.checkTableFields(db, lastTableName)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -235,7 +235,7 @@ func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string,
if lastId != nil { if lastId != nil {
var lastInt64Id = types.Int64(lastId) var lastInt64Id = types.Int64(lastId)
if lastInt64Id >= accessLogPartialRows { if accessLogRowsPerTable > 0 && lastInt64Id >= accessLogRowsPerTable {
// create next partial table // create next partial table
var nextTableName = "" var nextTableName = ""
if accessLogTableMainReg.MatchString(lastTableName) { if accessLogTableMainReg.MatchString(lastTableName) {

View File

@@ -69,6 +69,9 @@ var upgradeFuncs = []*upgradeVersion{
{ {
"0.4.1", upgradeV0_4_1, "0.4.1", upgradeV0_4_1,
}, },
{
"0.4.5", upgradeV0_4_5,
},
} }
// UpgradeSQLData 升级SQL数据 // UpgradeSQLData 升级SQL数据
@@ -590,3 +593,32 @@ func upgradeV0_4_1(db *dbs.DB) error {
return nil return nil
} }
// v0.4.5
func upgradeV0_4_5(db *dbs.DB) error {
// 升级访问日志自动分表
{
var dao = models.NewSysSettingDAO()
valueJSON, err := dao.ReadSetting(nil, systemconfigs.SettingCodeAccessLogQueue)
if err != nil {
return err
}
if len(valueJSON) > 0 {
var config = &serverconfigs.AccessLogQueueConfig{}
err = json.Unmarshal(valueJSON, config)
if err == nil {
config.EnableAutoPartial = true
config.RowsPerTable = 500_000
configJSON, err := json.Marshal(config)
if err == nil {
err = dao.UpdateSetting(nil, systemconfigs.SettingCodeAccessLogQueue, configJSON)
if err != nil {
return err
}
}
}
}
}
return nil
}

View File

@@ -116,3 +116,20 @@ func TestUpgradeSQLData_v0_4_1(t *testing.T) {
} }
t.Log("ok") t.Log("ok")
} }
func TestUpgradeSQLData_v0_4_5(t *testing.T) {
db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
Driver: "mysql",
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge?charset=utf8mb4&timeout=30s",
Prefix: "edge",
})
if err != nil {
t.Fatal(err)
}
err = upgradeV0_4_5(db)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}