diff --git a/build/build.sh b/build/build.sh index f19dbbd..3daa224 100755 --- a/build/build.sh +++ b/build/build.sh @@ -33,6 +33,7 @@ function build() { mkdir $DIST/bin mkdir $DIST/configs mkdir $DIST/logs + mkdir $DIST/data fi cp $ROOT/configs/api.template.yaml $DIST/configs diff --git a/build/data/.gitignore b/build/data/.gitignore new file mode 100644 index 0000000..ed0f64e --- /dev/null +++ b/build/data/.gitignore @@ -0,0 +1 @@ +index.* \ No newline at end of file diff --git a/go.mod b/go.mod index bd675f3..f67c743 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7 github.com/lionsoul2014/ip2region v2.2.0-release+incompatible + github.com/mattn/go-sqlite3 v1.14.7 github.com/mssola/user_agent v0.5.2 github.com/shirou/gopsutil v2.20.9+incompatible golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 diff --git a/go.sum b/go.sum index a230ea6..7657d02 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI= +github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= +github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= diff --git a/internal/caches/item.go b/internal/caches/item.go index 3d3cd31..1f066f8 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -10,12 +10,12 @@ const ( ) type Item struct { - Type ItemType - Key string - ExpiredAt int64 - HeaderSize int64 - BodySize int64 - MetaSize int64 + Type ItemType `json:"type"` + Key string `json:"key"` + ExpiredAt int64 `json:"expiredAt"` + HeaderSize int64 `json:"headerSize"` + BodySize int64 `json:"bodySize"` + MetaSize int64 `json:"metaSize"` } func (this *Item) IsExpired() bool { diff --git a/internal/caches/list.go b/internal/caches/list.go index 13462de..8f56f38 100644 --- a/internal/caches/list.go +++ b/internal/caches/list.go @@ -1,145 +1,33 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + package caches -import ( - "strings" - "sync" -) +type ListInterface interface { + Init() error -// 缓存列表管理 -type List struct { - m map[string]*Item // hash => item - locker sync.RWMutex - onAdd func(item *Item) - onRemove func(item *Item) -} - -func NewList() *List { - return &List{ - m: map[string]*Item{}, - } -} - -func (this *List) Reset() { - this.locker.Lock() - this.m = map[string]*Item{} - this.locker.Unlock() -} - -func (this *List) Add(hash string, item *Item) { - this.locker.Lock() - if this.onAdd != nil { - this.onAdd(item) - } - this.m[hash] = item - this.locker.Unlock() -} - -func (this *List) Exist(hash string) bool { - this.locker.RLock() - defer this.locker.RUnlock() - - item, ok := this.m[hash] - if !ok { - return false - } - - return !item.IsExpired() -} - -// 根据前缀进行查找 -func (this *List) FindKeysWithPrefix(prefix string) (keys []string) { - this.locker.RLock() - defer this.locker.RUnlock() - - // TODO 需要优化性能,支持千万级数据低于1s的处理速度 - for _, item := range this.m { - if strings.HasPrefix(item.Key, prefix) { - keys = append(keys, item.Key) - } - } - return -} - -func (this *List) Remove(hash string) { - this.locker.Lock() - - item, ok := this.m[hash] - if ok { - if this.onRemove != nil { - this.onRemove(item) - } - delete(this.m, hash) - } - - this.locker.Unlock() -} - -// 清理过期的缓存 -// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 -// callback 每次发现过期key的调用 -func (this *List) Purge(count int, callback func(hash string)) { - this.locker.Lock() - deletedHashList := []string{} - for hash, item := range this.m { - if count <= 0 { - break - } - - if item.IsExpired() { - if this.onRemove != nil { - this.onRemove(item) - } - delete(this.m, hash) - deletedHashList = append(deletedHashList, hash) - } - - count-- - } - this.locker.Unlock() - - // 执行外部操作 - for _, hash := range deletedHashList { - if callback != nil { - callback(hash) - } - } -} - -func (this *List) Stat(check func(hash string) bool) *Stat { - this.locker.RLock() - defer this.locker.RUnlock() - - result := &Stat{ - Count: 0, - Size: 0, - } - for hash, item := range this.m { - if !item.IsExpired() { - // 检查文件是否存在、内容是否正确等 - if check != nil && check(hash) { - result.Count++ - result.ValueSize += item.Size() - result.Size += item.TotalSize() - } - } - } - return result -} - -// 总数量 -func (this *List) Count() int64 { - this.locker.RLock() - count := int64(len(this.m)) - this.locker.RUnlock() - return count -} - -// 添加事件 -func (this *List) OnAdd(f func(item *Item)) { - this.onAdd = f -} - -// 删除事件 -func (this *List) OnRemove(f func(item *Item)) { - this.onRemove = f + Reset() error + + Add(hash string, item *Item) error + + Exist(hash string) (bool, error) + + // FindKeysWithPrefix 根据前缀进行查找 + FindKeysWithPrefix(prefix string) (keys []string, err error) + + Remove(hash string) error + + Purge(count int, callback func(hash string) error) error + + CleanAll() error + + Stat(check func(hash string) bool) (*Stat, error) + + // Count 总数量 + Count() (int64, error) + + // OnAdd 添加事件 + OnAdd(f func(item *Item)) + + // OnRemove 删除事件 + OnRemove(f func(item *Item)) } diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go new file mode 100644 index 0000000..f6e0fe3 --- /dev/null +++ b/internal/caches/list_file.go @@ -0,0 +1,260 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "database/sql" + _ "github.com/mattn/go-sqlite3" + "sync/atomic" + "time" +) + +// FileList 文件缓存列表管理 +type FileList struct { + dir string + db *sql.DB + total int64 + + onAdd func(item *Item) + onRemove func(item *Item) +} + +func NewFileList(dir string) ListInterface { + return &FileList{dir: dir} +} + +func (this *FileList) Init() error { + db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc") + if err != nil { + return err + } + db.SetMaxOpenConns(1) + + _, err = db.Exec("VACUUM") + if err != nil { + return err + } + + // 创建 + // TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容 + // 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确 + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" ( + "hash" varchar(32), + "key" varchar(1024), + "headerSize" integer DEFAULT 0, + "bodySize" integer DEFAULT 0, + "metaSize" integer DEFAULT 0, + "expiredAt" integer DEFAULT 0, + "accessedAt" integer DEFAULT 0 +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hash" +ON "cacheItems" ( + "hash" +); +CREATE INDEX IF NOT EXISTS "expiredAt" +ON "cacheItems" ( + "expiredAt" +); +CREATE INDEX IF NOT EXISTS "accessedAt" +ON "cacheItems" ( + "accessedAt" +); +`) + if err != nil { + return err + } + + this.db = db + + // 读取总数量 + row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems") + if row.Err() != nil { + return row.Err() + } + var total int64 + err = row.Scan(&total) + if err != nil { + return err + } + this.total = total + + return nil +} + +func (this *FileList) Reset() error { + // 不错任何事情 + return nil +} + +func (this *FileList) Add(hash string, item *Item) error { + _, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt) + if err != nil { + return err + } + + atomic.AddInt64(&this.total, 1) + + if this.onAdd != nil { + this.onAdd(item) + } + return nil +} + +func (this *FileList) Exist(hash string) (bool, error) { + row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) + if row == nil { + return false, nil + } + if row.Err() != nil { + return false, row.Err() + } + var bodySize int + err := row.Scan(&bodySize) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + return true, nil +} + +// FindKeysWithPrefix 根据前缀进行查找 +func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) { + if len(prefix) == 0 { + return + } + + // TODO 需要优化上千万结果的情况 + + rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix) + if err != nil { + return nil, err + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var key string + err = rows.Scan(&key) + if err != nil { + return nil, err + } + keys = append(keys, key) + } + + return +} + +func (this *FileList) Remove(hash string) error { + row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) + if row.Err() != nil { + return row.Err() + } + + var item = &Item{Type: ItemTypeFile} + err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt) + if err != nil { + if err == sql.ErrNoRows { + return nil + } + return err + } + + _, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash) + if err != nil { + return err + } + + atomic.AddInt64(&this.total, -1) + + if this.onRemove != nil { + this.onRemove(item) + } + + return nil +} + +// Purge 清理过期的缓存 +// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 +// callback 每次发现过期key的调用 +func (this *FileList) Purge(count int, callback func(hash string) error) error { + if count <= 0 { + count = 1000 + } + + rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + hashStrings := []string{} + for rows.Next() { + var hash string + err = rows.Scan(&hash) + if err != nil { + return err + } + hashStrings = append(hashStrings, hash) + } + + // 不在 rows.Next() 循环中操作是为了避免死锁 + for _, hash := range hashStrings { + err = this.Remove(hash) + if err != nil { + return err + } + + err = callback(hash) + if err != nil { + return err + } + } + + return nil +} + +func (this *FileList) CleanAll() error { + _, err := this.db.Exec("DELETE FROM cacheItems") + if err != nil { + return err + } + atomic.StoreInt64(&this.total, 0) + return nil +} + +func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) { + // 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些 + row := this.db.QueryRow("SELECT COUNT(*), SUM(headerSize+bodySize+metaSize), SUM(headerSize+bodySize) FROM cacheItems") + if row.Err() != nil { + return nil, row.Err() + } + stat := &Stat{} + err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize) + if err != nil { + return nil, err + } + + return stat, nil +} + +// Count 总数量 +// 常用的方法,所以避免直接查询数据库 +func (this *FileList) Count() (int64, error) { + return atomic.LoadInt64(&this.total), nil +} + +// OnAdd 添加事件 +func (this *FileList) OnAdd(f func(item *Item)) { + this.onAdd = f +} + +// OnRemove 删除事件 +func (this *FileList) OnRemove(f func(item *Item)) { + this.onRemove = f +} diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go new file mode 100644 index 0000000..8923d7a --- /dev/null +++ b/internal/caches/list_file_test.go @@ -0,0 +1,172 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "github.com/iwind/TeaGo/Tea" + stringutil "github.com/iwind/TeaGo/utils/string" + "strconv" + "testing" + "time" +) + +func TestFileList_Init(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileList_Add(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + err = list.Add(stringutil.Md5("123456"), &Item{ + Key: "123456", + ExpiredAt: time.Now().Unix(), + HeaderSize: 1, + MetaSize: 2, + BodySize: 3, + }) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileList_Add_Many(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + for i := 0; i < 100_0000; i++ { + u := "http://edge.teaos.cn/123456" + strconv.Itoa(i) + err = list.Add(stringutil.Md5(u), &Item{ + Key: u, + ExpiredAt: time.Now().Unix(), + HeaderSize: 1, + MetaSize: 2, + BodySize: 3, + }) + if err != nil { + t.Fatal(err) + } + } + t.Log("ok") +} + +func TestFileList_Exist(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + { + exists, err := list.Exist(stringutil.Md5("123456")) + if err != nil { + t.Fatal(err) + } + t.Log("exists:", exists) + } + { + exists, err := list.Exist(stringutil.Md5("654321")) + if err != nil { + t.Fatal(err) + } + t.Log("exists:", exists) + } +} + +func TestFileList_FindKeysWithPrefix(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + before := time.Now() + keys, err := list.FindKeysWithPrefix("1234") + if err != nil { + t.Fatal(err) + } + t.Log("keys:", keys) + t.Log(time.Since(before).Seconds()*1000, "ms") +} + +func TestFileList_Remove(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + list.OnRemove(func(item *Item) { + t.Logf("remove %#v", item) + }) + err = list.Remove(stringutil.Md5("123456")) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileList_Purge(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + err = list.Purge(2, func(hash string) error { + t.Log(hash) + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileList_Stat(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + stat, err := list.Stat(nil) + if err != nil { + t.Fatal(err) + } + t.Log("count:", stat.Count, "size:", stat.Size, "valueSize:", stat.ValueSize) +} + +func TestFileList_Count(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + before := time.Now() + count, err := list.Count() + if err != nil { + t.Fatal(err) + } + t.Log("count:", count) + t.Log(time.Since(before).Seconds()*1000, "ms") +} + +func TestFileList_CleanAll(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + err = list.CleanAll() + if err != nil { + t.Fatal(err) + } + t.Log("ok") + t.Log(list.Count()) +} diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go new file mode 100644 index 0000000..5ab2132 --- /dev/null +++ b/internal/caches/list_memory.go @@ -0,0 +1,161 @@ +package caches + +import ( + "strings" + "sync" +) + +// MemoryList 内存缓存列表管理 +type MemoryList struct { + m map[string]*Item // hash => item + locker sync.RWMutex + onAdd func(item *Item) + onRemove func(item *Item) +} + +func NewMemoryList() ListInterface { + return &MemoryList{ + m: map[string]*Item{}, + } +} + +func (this *MemoryList) Init() error { + // 内存列表不需要初始化 + return nil +} + +func (this *MemoryList) Reset() error { + this.locker.Lock() + this.m = map[string]*Item{} + this.locker.Unlock() + return nil +} + +func (this *MemoryList) Add(hash string, item *Item) error { + this.locker.Lock() + if this.onAdd != nil { + this.onAdd(item) + } + this.m[hash] = item + this.locker.Unlock() + return nil +} + +func (this *MemoryList) Exist(hash string) (bool, error) { + this.locker.RLock() + defer this.locker.RUnlock() + + item, ok := this.m[hash] + if !ok { + return false, nil + } + + return !item.IsExpired(), nil +} + +// FindKeysWithPrefix 根据前缀进行查找 +func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) { + this.locker.RLock() + defer this.locker.RUnlock() + + // TODO 需要优化性能,支持千万级数据低于1s的处理速度 + for _, item := range this.m { + if strings.HasPrefix(item.Key, prefix) { + keys = append(keys, item.Key) + } + } + return +} + +func (this *MemoryList) Remove(hash string) error { + this.locker.Lock() + + item, ok := this.m[hash] + if ok { + if this.onRemove != nil { + this.onRemove(item) + } + delete(this.m, hash) + } + + this.locker.Unlock() + return nil +} + +// Purge 清理过期的缓存 +// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 +// callback 每次发现过期key的调用 +func (this *MemoryList) Purge(count int, callback func(hash string) error) error { + this.locker.Lock() + deletedHashList := []string{} + for hash, item := range this.m { + if count <= 0 { + break + } + + if item.IsExpired() { + if this.onRemove != nil { + this.onRemove(item) + } + delete(this.m, hash) + deletedHashList = append(deletedHashList, hash) + } + + count-- + } + this.locker.Unlock() + + // 执行外部操作 + for _, hash := range deletedHashList { + if callback != nil { + err := callback(hash) + if err != nil { + return err + } + } + } + return nil +} + +func (this *MemoryList) CleanAll() error { + return this.Reset() +} + +func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) { + this.locker.RLock() + defer this.locker.RUnlock() + + result := &Stat{ + Count: 0, + Size: 0, + } + for hash, item := range this.m { + if !item.IsExpired() { + // 检查文件是否存在、内容是否正确等 + if check != nil && check(hash) { + result.Count++ + result.ValueSize += item.Size() + result.Size += item.TotalSize() + } + } + } + return result, nil +} + +// Count 总数量 +func (this *MemoryList) Count() (int64, error) { + this.locker.RLock() + count := int64(len(this.m)) + this.locker.RUnlock() + return count, nil +} + +// OnAdd 添加事件 +func (this *MemoryList) OnAdd(f func(item *Item)) { + this.onAdd = f +} + +// OnRemove 删除事件 +func (this *MemoryList) OnRemove(f func(item *Item)) { + this.onRemove = f +} diff --git a/internal/caches/list_test.go b/internal/caches/list_memory_test.go similarity index 71% rename from internal/caches/list_test.go rename to internal/caches/list_memory_test.go index 1213c25..d0135fb 100644 --- a/internal/caches/list_test.go +++ b/internal/caches/list_memory_test.go @@ -10,13 +10,13 @@ import ( ) func TestList_Add(t *testing.T) { - list := NewList() - list.Add("a", &Item{ + list := &MemoryList{} + _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("b", &Item{ + _ = list.Add("b", &Item{ Key: "b1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, @@ -25,72 +25,73 @@ func TestList_Add(t *testing.T) { } func TestList_Remove(t *testing.T) { - list := NewList() - list.Add("a", &Item{ + list := &MemoryList{} + _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("b", &Item{ + _ = list.Add("b", &Item{ Key: "b1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Remove("b") + _ = list.Remove("b") t.Log(list.m) } func TestList_Purge(t *testing.T) { - list := NewList() - list.Add("a", &Item{ + list := &MemoryList{} + _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("b", &Item{ + _ = list.Add("b", &Item{ Key: "b1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("c", &Item{ + _ = list.Add("c", &Item{ Key: "c1", ExpiredAt: time.Now().Unix() - 3600, HeaderSize: 1024, }) - list.Add("d", &Item{ + _ = list.Add("d", &Item{ Key: "d1", ExpiredAt: time.Now().Unix() - 2, HeaderSize: 1024, }) - list.Purge(100, func(hash string) { + _ = list.Purge(100, func(hash string) error { t.Log("delete:", hash) + return nil }) t.Log(list.m) } func TestList_Stat(t *testing.T) { - list := NewList() - list.Add("a", &Item{ + list := &MemoryList{} + _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("b", &Item{ + _ = list.Add("b", &Item{ Key: "b1", ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - list.Add("c", &Item{ + _ = list.Add("c", &Item{ Key: "c1", ExpiredAt: time.Now().Unix(), HeaderSize: 1024, }) - list.Add("d", &Item{ + _ = list.Add("d", &Item{ Key: "d1", ExpiredAt: time.Now().Unix() - 2, HeaderSize: 1024, }) - result := list.Stat(func(hash string) bool { + result, _ := list.Stat(func(hash string) bool { // 随机测试 rand.Seed(time.Now().UnixNano()) return rand.Int()%2 == 0 @@ -99,11 +100,11 @@ func TestList_Stat(t *testing.T) { } func TestList_FindKeysWithPrefix(t *testing.T) { - list := NewList() + list := &MemoryList{} before := time.Now() for i := 0; i < 1_000_000; i++ { key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" - list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ + _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ Key: key, ExpiredAt: 0, BodySize: 0, @@ -113,7 +114,10 @@ func TestList_FindKeysWithPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") before = time.Now() - keys := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000") + keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000") + if err != nil { + t.Fatal(err) + } t.Log(len(keys)) t.Log(time.Since(before).Seconds()*1000, "ms") } diff --git a/internal/caches/reader.go b/internal/caches/reader.go index 47fa058..aa9c7f0 100644 --- a/internal/caches/reader.go +++ b/internal/caches/reader.go @@ -3,27 +3,27 @@ package caches type ReaderFunc func(n int) (goNext bool, err error) type Reader interface { - // 初始化 + // Init 初始化 Init() error - // 状态码 + // Status 状态码 Status() int - // 读取Header + // ReadHeader 读取Header ReadHeader(buf []byte, callback ReaderFunc) error - // 读取Body + // ReadBody 读取Body ReadBody(buf []byte, callback ReaderFunc) error - // 读取某个范围内的Body + // ReadBodyRange 读取某个范围内的Body ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error - // Header Size + // HeaderSize Header Size HeaderSize() int64 - // Body Size + // BodySize Body Size BodySize() int64 - // 关闭 + // Close 关闭 Close() error } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 8b3a224..9591139 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -49,7 +49,7 @@ type FileStorage struct { memoryStorage *MemoryStorage // 一级缓存 totalSize int64 - list *List + list ListInterface locker sync.RWMutex ticker *utils.Ticker } @@ -57,7 +57,6 @@ type FileStorage struct { func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { return &FileStorage{ policy: policy, - list: NewList(), } } @@ -68,41 +67,10 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { // Init 初始化 func (this *FileStorage) Init() error { - this.list.OnAdd(func(item *Item) { - atomic.AddInt64(&this.totalSize, item.TotalSize()) - }) - this.list.OnRemove(func(item *Item) { - atomic.AddInt64(&this.totalSize, -item.TotalSize()) - }) - this.locker.Lock() defer this.locker.Unlock() before := time.Now() - cacheDir := "" - defer func() { - // 统计 - count := 0 - size := int64(0) - if this.list != nil { - stat := this.list.Stat(func(hash string) bool { - return true - }) - count = stat.Count - size = stat.Size - } - - cost := time.Since(before).Seconds() * 1000 - sizeMB := strconv.FormatInt(size, 10) + " Bytes" - if size > 1024*1024*1024 { - sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024) - } else if size > 1024*1024 { - sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024) - } else if size > 1024 { - sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) - } - remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) - }() // 配置 cacheConfig := &serverconfigs.HTTPFileCacheStorage{} @@ -115,7 +83,7 @@ func (this *FileStorage) Init() error { return err } this.cacheConfig = cacheConfig - cacheDir = cacheConfig.Dir + cacheDir := cacheConfig.Dir if !filepath.IsAbs(this.cacheConfig.Dir) { this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir @@ -127,6 +95,26 @@ func (this *FileStorage) Init() error { return errors.New("[CACHE]cache storage dir can not be empty") } + list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10)) + err = list.Init() + if err != nil { + return err + } + this.list = list + stat, err := list.Stat(func(hash string) bool { + return true + }) + if err != nil { + return err + } + this.totalSize = stat.Size + this.list.OnAdd(func(item *Item) { + atomic.AddInt64(&this.totalSize, item.TotalSize()) + }) + this.list.OnRemove(func(item *Item) { + atomic.AddInt64(&this.totalSize, -item.TotalSize()) + }) + // 检查目录是否存在 _, err = os.Stat(dir) if err != nil { @@ -140,6 +128,34 @@ func (this *FileStorage) Init() error { } } + defer func() { + // 统计 + count := 0 + size := int64(0) + if this.list != nil { + stat, err := this.list.Stat(func(hash string) bool { + return true + }) + if err != nil { + remotelogs.Error("CACHE", "stat cache "+strconv.FormatInt(this.policy.Id, 10)+" failed: "+err.Error()) + } else { + count = stat.Count + size = stat.Size + } + } + + cost := time.Since(before).Seconds() * 1000 + sizeMB := strconv.FormatInt(size, 10) + " Bytes" + if size > 1024*1024*1024 { + sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024) + } else if size > 1024*1024 { + sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024) + } else if size > 1024 { + sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) + } + remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) + }() + // 初始化list err = this.initList() if err != nil { @@ -188,10 +204,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { } } - hash, path := this.keyPath(key) - if !this.list.Exist(hash) { - return nil, ErrNotFound - } + _, path := this.keyPath(key) // TODO 尝试使用mmap加快读取速度 fp, err := os.OpenFile(path, os.O_RDONLY, 0444) @@ -224,17 +237,21 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr } // 检查是否超出最大值 - if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { + count, err := this.list.Count() + if err != nil { + return nil, err + } + if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys { return nil, errors.New("write file cache failed: too many keys in cache storage") } capacityBytes := this.diskCapacityBytes() if capacityBytes > 0 && capacityBytes <= this.totalSize { - return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") + return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) } hash := stringutil.Md5(key) dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] - _, err := os.Stat(dir) + _, err = os.Stat(dir) if err != nil { if !os.IsNotExist(err) { return nil, err @@ -246,7 +263,10 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr } // 先删除 - this.list.Remove(hash) + err = this.list.Remove(hash) + if err != nil { + return nil, err + } path := dir + "/" + hash + ".cache.tmp" writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666) @@ -351,7 +371,10 @@ func (this *FileStorage) AddToList(item *Item) { item.MetaSize = SizeMeta hash := stringutil.Md5(item.Key) - this.list.Add(hash, item) + err := this.list.Add(hash, item) + if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") { + remotelogs.Error("CACHE", "add to list failed: "+err.Error()) + } } // Delete 删除某个键值对应的缓存 @@ -365,8 +388,11 @@ func (this *FileStorage) Delete(key string) error { } hash, path := this.keyPath(key) - this.list.Remove(hash) - err := os.Remove(path) + err := this.list.Remove(hash) + if err != nil { + return err + } + err = os.Remove(path) if err == nil || os.IsNotExist(err) { return nil } @@ -380,7 +406,7 @@ func (this *FileStorage) Stat() (*Stat, error) { return this.list.Stat(func(hash string) bool { return true - }), nil + }) } // CleanAll 清除所有的缓存 @@ -393,7 +419,10 @@ func (this *FileStorage) CleanAll() error { _ = this.memoryStorage.CleanAll() } - this.list.Reset() + err := this.list.CleanAll() + if err != nil { + return err + } // 删除缓存和目录 // 不能直接删除子目录,比较危险 @@ -455,7 +484,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { if urlType == "dir" { resultKeys := []string{} for _, key := range keys { - resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...) + subKeys, err := this.list.FindKeysWithPrefix(key) + if err != nil { + return err + } + resultKeys = append(resultKeys, subKeys...) } keys = resultKeys } @@ -463,7 +496,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { // 文件 for _, key := range keys { hash, path := this.keyPath(key) - if !this.list.Exist(hash) { + exists, err := this.list.Exist(hash) + if err != nil { + return err + } + if !exists { err := os.Remove(path) if err != nil && !os.IsNotExist(err) { return err @@ -471,11 +508,14 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { continue } - err := os.Remove(path) + err = os.Remove(path) if err != nil && !os.IsNotExist(err) { return err } - this.list.Remove(hash) + err = this.list.Remove(hash) + if err != nil { + return err + } } return nil } @@ -534,7 +574,10 @@ func (this *FileStorage) hashPath(hash string) (path string) { // 初始化List func (this *FileStorage) initList() error { - this.list.Reset() + err := this.list.Reset() + if err != nil { + return err + } dir := this.dir() @@ -547,33 +590,6 @@ func (this *FileStorage) initList() error { _ = os.Remove(path) } - // 加载缓存 - files, err = filepath.Glob(dir + "/*/*/*.cache") - if err != nil { - return err - } - for _, path := range files { - basename := filepath.Base(path) - index := strings.LastIndex(basename, ".") - if index < 0 { - continue - } - hash := basename[:index] - - // 解析文件信息 - item, err := this.decodeFile(path) - if err != nil { - if err != ErrNotFound { - remotelogs.Error("CACHE", "decode path '"+path+"': "+err.Error()) - } - continue - } - if item == nil { - continue - } - this.list.Add(hash, item) - } - // 启动定时清理任务 this.ticker = utils.NewTicker(30 * time.Second) events.On(events.EventQuit, func() { @@ -686,12 +702,13 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) { // 清理任务 func (this *FileStorage) purgeLoop() { - this.list.Purge(1000, func(hash string) { + _ = this.list.Purge(1000, func(hash string) error { path := this.hashPath(hash) err := os.Remove(path) if err != nil && !os.IsNotExist(err) { remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) } + return nil }) } diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index a5ba661..f0999fb 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -40,7 +40,7 @@ func TestFileStorage_Init(t *testing.T) { time.Sleep(2 * time.Second) storage.purgeLoop() - t.Log(len(storage.list.m), "entries left") + t.Log(storage.list.(*FileList).total, "entries left") } func TestFileStorage_OpenWriter(t *testing.T) { @@ -441,14 +441,16 @@ func TestFileStorage_CleanAll(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") }() - t.Log("before:", storage.list.m) + c, _ := storage.list.Count() + t.Log("before:", c) err = storage.CleanAll() if err != nil { t.Fatal(err) } - t.Log("after:", storage.list.m) + c, _ = storage.list.Count() + t.Log("after:", c) t.Log("ok") } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index ef398be..113659b 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -22,7 +22,7 @@ type MemoryItem struct { type MemoryStorage struct { policy *serverconfigs.HTTPCachePolicy - list *List + list ListInterface locker *sync.RWMutex valuesMap map[uint64]*MemoryItem ticker *utils.Ticker @@ -33,7 +33,7 @@ type MemoryStorage struct { func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { return &MemoryStorage{ policy: policy, - list: NewList(), + list: NewMemoryList(), locker: &sync.RWMutex{}, valuesMap: map[uint64]*MemoryItem{}, } @@ -92,7 +92,11 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { // OpenWriter 打开缓存写入器等待写入 func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { // 检查是否超出最大值 - if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { + totalKeys, err := this.list.Count() + if err != nil { + return nil, err + } + if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys { return nil, errors.New("write memory cache failed: too many keys in cache storage") } capacityBytes := this.memoryCapacityBytes() @@ -101,7 +105,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( } // 先删除 - err := this.Delete(key) + err = this.Delete(key) if err != nil { return nil, err } @@ -126,7 +130,7 @@ func (this *MemoryStorage) Stat() (*Stat, error) { return this.list.Stat(func(hash string) bool { return true - }), nil + }) } // CleanAll 清除所有缓存 @@ -145,7 +149,11 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error { if urlType == "dir" { resultKeys := []string{} for _, key := range keys { - resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...) + subKeys, err := this.list.FindKeysWithPrefix(key) + if err != nil { + return err + } + resultKeys = append(resultKeys, subKeys...) } keys = resultKeys } @@ -200,13 +208,14 @@ func (this *MemoryStorage) hash(key string) uint64 { // 清理任务 func (this *MemoryStorage) purgeLoop() { - this.list.Purge(2048, func(hash string) { + _ = this.list.Purge(2048, func(hash string) error { uintHash, err := strconv.ParseUint(hash, 10, 64) if err == nil { this.locker.Lock() delete(this.valuesMap, uintHash) this.locker.Unlock() } + return nil }) } diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 7a93023..f9d06b3 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -174,7 +174,8 @@ func TestMemoryStorage_CleanAll(t *testing.T) { if err != nil { t.Fatal(err) } - t.Log(storage.list.Count(), len(storage.valuesMap)) + total, _ := storage.list.Count() + t.Log(total, len(storage.valuesMap)) } func TestMemoryStorage_Purge(t *testing.T) { @@ -208,7 +209,8 @@ func TestMemoryStorage_Purge(t *testing.T) { if err != nil { t.Fatal(err) } - t.Log(storage.list.Count(), len(storage.valuesMap)) + total, _ := storage.list.Count() + t.Log(total, len(storage.valuesMap)) } func TestMemoryStorage_Expire(t *testing.T) {