diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index e5dc3a7..7c80e7b 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -58,6 +58,7 @@ func (this *FileListDB) Open(dbPath string) error { writeDB.SetMaxOpenConns(1) // TODO 耗时过长,暂时不整理数据库 + // TODO 需要根据行数来判断是否VACUUM /**_, err = db.Exec("VACUUM") if err != nil { return err @@ -109,7 +110,7 @@ func (this *FileListDB) Init() error { this.total = total // 常用语句 - this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) + this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" INDEXED BY "hash" WHERE "hash"=? AND expiredAt>? LIMIT 1`) if err != nil { return err } @@ -354,7 +355,7 @@ func (this *FileListDB) Close() error { func (this *FileListDB) initTables(times int) error { { // expiredAt - 过期时间,用来判断有无过期 - // staleAt - 陈旧最大时间,用来清理缓存 + // staleAt - 过时缓存最大时间,用来清理缓存 _, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), @@ -370,15 +371,9 @@ func (this *FileListDB) initTables(times int) error { "serverId" integer ); -CREATE INDEX IF NOT EXISTS "createdAt" -ON "` + this.itemsTableName + `" ( - "createdAt" ASC -); - -CREATE INDEX IF NOT EXISTS "expiredAt" -ON "` + this.itemsTableName + `" ( - "expiredAt" ASC -); +DROP INDEX IF EXISTS "createdAt"; +DROP INDEX IF EXISTS "expiredAt"; +DROP INDEX IF EXISTS "serverId"; CREATE INDEX IF NOT EXISTS "staleAt" ON "` + this.itemsTableName + `" ( @@ -389,11 +384,6 @@ CREATE UNIQUE INDEX IF NOT EXISTS "hash" ON "` + this.itemsTableName + `" ( "hash" ASC ); - -CREATE INDEX IF NOT EXISTS "serverId" -ON "` + this.itemsTableName + `" ( - "serverId" ASC -); `) if err != nil { diff --git a/internal/caches/max_open_files.go b/internal/caches/max_open_files.go new file mode 100644 index 0000000..95b114a --- /dev/null +++ b/internal/caches/max_open_files.go @@ -0,0 +1,83 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "sync/atomic" + "time" +) + +const ( + minOpenFilesValue int32 = 2 + maxOpenFilesValue int32 = 65535 + + modSlow int32 = 1 + modFast int32 = 2 +) + +type MaxOpenFiles struct { + step int32 + maxOpenFiles int32 + ptr *int32 + ticker *time.Ticker + mod int32 +} + +func NewMaxOpenFiles(step int32) *MaxOpenFiles { + if step <= 0 { + step = 2 + } + var f = &MaxOpenFiles{ + step: step, + maxOpenFiles: 2, + } + if teaconst.DiskIsFast { + f.maxOpenFiles = 32 + } + f.ptr = &f.maxOpenFiles + f.ticker = time.NewTicker(1 * time.Second) + goman.New(func() { + for range f.ticker.C { + var mod = atomic.LoadInt32(&f.mod) + switch mod { + case modSlow: + // we decrease more quickly, with more steps + if atomic.AddInt32(f.ptr, -step*2) <= 0 { + atomic.StoreInt32(f.ptr, minOpenFilesValue) + } + case modFast: + if atomic.AddInt32(f.ptr, step) >= maxOpenFilesValue { + atomic.StoreInt32(f.ptr, maxOpenFilesValue) + } + } + + // reset mod + atomic.StoreInt32(&f.mod, 0) + } + }) + return f +} + +func (this *MaxOpenFiles) Fast() { + if atomic.LoadInt32(&this.mod) == 0 { + this.mod = modFast + } +} + +func (this *MaxOpenFiles) Slow() { + atomic.StoreInt32(&this.mod, modSlow) +} + +func (this *MaxOpenFiles) Max() int32 { + if atomic.LoadInt32(&this.mod) == modSlow { + return 0 + } + + var v = atomic.LoadInt32(this.ptr) + if v <= minOpenFilesValue { + return minOpenFilesValue + } + return v +} diff --git a/internal/caches/max_open_files_test.go b/internal/caches/max_open_files_test.go new file mode 100644 index 0000000..814d21d --- /dev/null +++ b/internal/caches/max_open_files_test.go @@ -0,0 +1,28 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "testing" + "time" +) + +func TestNewMaxOpenFiles(t *testing.T) { + var maxOpenFiles = caches.NewMaxOpenFiles(2) + maxOpenFiles.Fast() + t.Log(maxOpenFiles.Max()) + + maxOpenFiles.Fast() + time.Sleep(1 * time.Second) + t.Log(maxOpenFiles.Max()) + + maxOpenFiles.Slow() + t.Log(maxOpenFiles.Max()) + + maxOpenFiles.Slow() + t.Log(maxOpenFiles.Max()) + + maxOpenFiles.Slow() + t.Log(maxOpenFiles.Max()) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 22a4d8b..9b4005c 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -27,7 +27,6 @@ import ( "os" "path/filepath" "regexp" - "runtime" "sort" "strconv" "strings" @@ -58,21 +57,15 @@ const ( HotItemSize = 1024 // 热点数据数量 HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 + FileTmpSuffix = ".tmp" ) var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyLocker = sync.Mutex{} -var maxOpenFiles = 3 +var maxOpenFiles = NewMaxOpenFiles(2) -func init() { - if teaconst.DiskIsFast { - maxOpenFiles = runtime.NumCPU() - } - if maxOpenFiles < 3 { - maxOpenFiles = 3 - } -} +const maxOpenFilesSlowCost = 1 * time.Microsecond // FileStorage 文件缓存 // 文件结构: @@ -263,7 +256,7 @@ func (this *FileStorage) Init() error { var count = stat.Count var size = stat.Size - cost := time.Since(before).Seconds() * 1000 + var cost = time.Since(before).Seconds() * 1000 sizeMB := strconv.FormatInt(size, 10) + " Bytes" if size > 1*sizes.G { sizeMB = fmt.Sprintf("%.3f G", float64(size)/float64(sizes.G)) @@ -435,7 +428,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz return nil, ErrFileIsWriting } - if !isFlushing && len(sharedWritingFileKeyMap) >= maxOpenFiles { + if len(sharedWritingFileKeyMap) >= int(maxOpenFiles.Max()) { sharedWritingFileKeyLocker.Unlock() return nil, ErrTooManyOpenFiles } @@ -464,6 +457,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz } var hash = stringutil.Md5(key) + + // TODO 可以只stat一次 var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err = os.Stat(dir) if err != nil { @@ -491,7 +486,15 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz // 防止并发连续写入 return nil, ErrFileIsWriting } - var tmpPath = cachePath + ".tmp" + var tmpPath = cachePath + var existsFile = false + if stat != nil { + existsFile = true + + // 如果已经存在,则增加一个.tmp后缀,防止读写冲突 + tmpPath += FileTmpSuffix + } + if isPartial { tmpPath = cachePathName + ".cache" } @@ -523,13 +526,19 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz } var flags = os.O_CREATE | os.O_WRONLY - if isNewCreated { + if isNewCreated && existsFile { flags |= os.O_TRUNC } + var before = time.Now() writer, err := os.OpenFile(tmpPath, flags, 0666) if err != nil { return nil, err } + if time.Since(before) >= maxOpenFilesSlowCost { + maxOpenFiles.Slow() + } else { + maxOpenFiles.Fast() + } var removeOnFailure = true defer func() { diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 8c49496..76b156c 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -127,8 +127,8 @@ func (this *FileWriter) Close() error { err = this.rawWriter.Close() if err != nil { _ = os.Remove(path) - } else { - err = os.Rename(path, strings.Replace(path, ".tmp", "", 1)) + } else if strings.HasSuffix(path, FileTmpSuffix) { + err = os.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1)) if err != nil { _ = os.Remove(path) } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 88baa93..a7d3bb1 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -314,7 +314,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } - this.Header().Set("X-Cache", "BYPASS, open failed") + this.Header().Set("X-Cache", "BYPASS, too many requests") return } this.cacheWriter = cacheWriter diff --git a/internal/utils/dbs/db.go b/internal/utils/dbs/db.go index fed50cc..8b8e891 100644 --- a/internal/utils/dbs/db.go +++ b/internal/utils/dbs/db.go @@ -31,6 +31,10 @@ func (this *DB) EnableStat(b bool) { this.enableStat = b } +func (this *DB) Begin() (*sql.Tx, error) { + return this.rawDB.Begin() +} + func (this *DB) Prepare(query string) (*Stmt, error) { stmt, err := this.rawDB.Prepare(query) if err != nil {