diff --git a/build/.gitignore b/build/.gitignore index 36f971e..be6ff92 100644 --- a/build/.gitignore +++ b/build/.gitignore @@ -1 +1,2 @@ bin/* +caches \ No newline at end of file diff --git a/internal/caches/item.go b/internal/caches/item.go new file mode 100644 index 0000000..321531b --- /dev/null +++ b/internal/caches/item.go @@ -0,0 +1,14 @@ +package caches + +import "time" + +type Item struct { + Key string + ExpiredAt int64 + ValueSize int64 + Size int64 +} + +func (this *Item) IsExpired() bool { + return this.ExpiredAt < time.Now().Unix() +} diff --git a/internal/caches/list.go b/internal/caches/list.go new file mode 100644 index 0000000..a16d0eb --- /dev/null +++ b/internal/caches/list.go @@ -0,0 +1,94 @@ +package caches + +import "sync" + +// 缓存列表管理 +type List struct { + m map[string]*Item // hash => item + locker sync.RWMutex +} + +func NewList() *List { + return &List{ + m: map[string]*Item{}, + } +} + +func (this *List) Reset() { + this.locker.Lock() + this.m = map[string]*Item{} + this.locker.Unlock() +} + +func (this *List) Add(hash string, item *Item) { + this.locker.Lock() + this.m[hash] = item + this.locker.Unlock() +} + +func (this *List) Exist(hash string) bool { + this.locker.RLock() + defer this.locker.RUnlock() + + item, ok := this.m[hash] + if !ok { + return false + } + + return !item.IsExpired() +} + +func (this *List) Remove(hash string) { + this.locker.Lock() + delete(this.m, hash) + this.locker.Unlock() +} + +// 清理过期的缓存 +// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 +// callback 每次发现过期key的调用 +func (this *List) Purge(count int, callback func(hash string)) { + this.locker.Lock() + deletedHashList := []string{} + for hash, item := range this.m { + if count <= 0 { + break + } + + if item.IsExpired() { + delete(this.m, hash) + deletedHashList = append(deletedHashList, hash) + } + + count-- + } + this.locker.Unlock() + + // 执行外部操作 + for _, hash := range deletedHashList { + if callback != nil { + callback(hash) + } + } +} + +func (this *List) Stat(check func(hash string) bool) *Stat { + this.locker.RLock() + defer this.locker.RUnlock() + + result := &Stat{ + Count: 0, + Size: 0, + } + for hash, item := range this.m { + if !item.IsExpired() { + // 检查文件是否存在、内容是否正确等 + if check != nil && check(hash) { + result.Count++ + result.ValueSize += item.ValueSize + result.Size += item.Size + } + } + } + return result +} diff --git a/internal/caches/list_test.go b/internal/caches/list_test.go new file mode 100644 index 0000000..b624e90 --- /dev/null +++ b/internal/caches/list_test.go @@ -0,0 +1,96 @@ +package caches + +import ( + "math/rand" + "testing" + "time" +) + +func TestList_Add(t *testing.T) { + list := NewList() + list.Add("a", &Item{ + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("b", &Item{ + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + t.Log(list.m) +} + +func TestList_Remove(t *testing.T) { + list := NewList() + list.Add("a", &Item{ + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("b", &Item{ + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Remove("b") + t.Log(list.m) +} + +func TestList_Purge(t *testing.T) { + list := NewList() + list.Add("a", &Item{ + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("b", &Item{ + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("c", &Item{ + Key: "c1", + ExpiredAt: time.Now().Unix() - 3600, + Size: 1024, + }) + list.Add("d", &Item{ + Key: "d1", + ExpiredAt: time.Now().Unix() - 2, + Size: 1024, + }) + list.Purge(100, func(hash string) { + t.Log("delete:", hash) + }) + t.Log(list.m) +} + +func TestList_Stat(t *testing.T) { + list := NewList() + list.Add("a", &Item{ + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("b", &Item{ + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + Size: 1024, + }) + list.Add("c", &Item{ + Key: "c1", + ExpiredAt: time.Now().Unix(), + Size: 1024, + }) + list.Add("d", &Item{ + Key: "d1", + ExpiredAt: time.Now().Unix() - 2, + Size: 1024, + }) + result := list.Stat(func(hash string) bool { + // 随机测试 + rand.Seed(time.Now().UnixNano()) + return rand.Int()%2 == 0 + }) + t.Log(result) +} diff --git a/internal/caches/manager.go b/internal/caches/manager.go new file mode 100644 index 0000000..20baa48 --- /dev/null +++ b/internal/caches/manager.go @@ -0,0 +1,142 @@ +package caches + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "strconv" + "sync" +) + +var SharedManager = NewManager() + +type Manager struct { + policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy + storageMap map[int64]StorageInterface // policyId => *Storage + locker sync.RWMutex +} + +func NewManager() *Manager { + return &Manager{ + policyMap: map[int64]*serverconfigs.HTTPCachePolicy{}, + storageMap: map[int64]StorageInterface{}, + } +} + +// 重新设置策略 +func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) { + this.locker.Lock() + defer this.locker.Unlock() + + newPolicyIds := []int64{} + for _, policy := range newPolicies { + newPolicyIds = append(newPolicyIds, policy.Id) + } + + // 停止旧有的 + for _, oldPolicy := range this.policyMap { + if !this.containsInt64(newPolicyIds, oldPolicy.Id) { + logs.Println("[CACHE]remove policy", strconv.FormatInt(oldPolicy.Id, 10)) + delete(this.policyMap, oldPolicy.Id) + storage, ok := this.storageMap[oldPolicy.Id] + if ok { + storage.Stop() + delete(this.storageMap, oldPolicy.Id) + } + } + } + + // 启动新的 + for _, newPolicy := range newPolicies { + _, ok := this.policyMap[newPolicy.Id] + if !ok { + logs.Println("[CACHE]add policy", strconv.FormatInt(newPolicy.Id, 10)) + } + + // 初始化 + err := newPolicy.Init() + if err != nil { + logs.Println("[CACHE]UpdatePolicies: init policy error: " + err.Error()) + continue + } + this.policyMap[newPolicy.Id] = newPolicy + } + + // 启动存储管理 + for _, policy := range this.policyMap { + storage, ok := this.storageMap[policy.Id] + if !ok { + storage := this.NewStorageWithPolicy(policy) + if storage == nil { + logs.Println("[CACHE]can not find storage type '" + policy.Type + "'") + continue + } + err := storage.Init() + if err != nil { + logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error()) + continue + } + this.storageMap[policy.Id] = storage + } else { + // 检查policy是否有变化 + if !storage.Policy().IsSame(policy) { + logs.Println("[CACHE]policy " + strconv.FormatInt(policy.Id, 10) + " changed") + + // 停止老的 + storage.Stop() + delete(this.storageMap, policy.Id) + + // 启动新的 + storage := this.NewStorageWithPolicy(policy) + if storage == nil { + logs.Println("[CACHE]can not find storage type '" + policy.Type + "'") + continue + } + err := storage.Init() + if err != nil { + logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error()) + continue + } + this.storageMap[policy.Id] = storage + } + } + } +} + +// 获取Policy信息 +func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy { + this.locker.RLock() + defer this.locker.RUnlock() + + p, _ := this.policyMap[policyId] + return p +} + +// 根据策略ID查找存储 +func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface { + this.locker.RLock() + defer this.locker.RUnlock() + + storage, _ := this.storageMap[policyId] + return storage +} + +// 根据策略获取存储对象 +func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface { + switch policy.Type { + case serverconfigs.CachePolicyTypeFile: + return NewFileStorage(policy) + case serverconfigs.CachePolicyTypeMemory: + return nil // TODO 暂时返回nil + } + return nil +} + +// 可判断一组数字中是否包含某数 +func (this *Manager) containsInt64(values []int64, value int64) bool { + for _, v := range values { + if v == value { + return true + } + } + return false +} diff --git a/internal/caches/manager_test.go b/internal/caches/manager_test.go new file mode 100644 index 0000000..b7c74cc --- /dev/null +++ b/internal/caches/manager_test.go @@ -0,0 +1,81 @@ +package caches + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/Tea" + "testing" +) + +func TestManager_UpdatePolicies(t *testing.T) { + { + policies := []*serverconfigs.HTTPCachePolicy{} + SharedManager.UpdatePolicies(policies) + printManager(t) + } + + { + policies := []*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyTypeFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + { + Id: 2, + Type: serverconfigs.CachePolicyTypeFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + { + Id: 3, + Type: serverconfigs.CachePolicyTypeFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + } + SharedManager.UpdatePolicies(policies) + printManager(t) + } + + { + policies := []*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyTypeFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + { + Id: 2, + Type: serverconfigs.CachePolicyTypeFile, + MaxKeys: 1, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + { + Id: 4, + Type: serverconfigs.CachePolicyTypeFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }, + } + SharedManager.UpdatePolicies(policies) + printManager(t) + } +} + +func printManager(t *testing.T) { + t.Log("===manager==") + t.Log("storage:") + for _, storage := range SharedManager.storageMap { + t.Log(" storage:", storage.Policy().Id) + } + t.Log("===============") +} diff --git a/internal/caches/stat.go b/internal/caches/stat.go new file mode 100644 index 0000000..b9e0411 --- /dev/null +++ b/internal/caches/stat.go @@ -0,0 +1,7 @@ +package caches + +type Stat struct { + Count int // 数量 + ValueSize int64 // 值占用的空间 + Size int64 // 占用的空间尺寸 +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go new file mode 100644 index 0000000..f407153 --- /dev/null +++ b/internal/caches/storage_file.go @@ -0,0 +1,618 @@ +package caches + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/types" + stringutil "github.com/iwind/TeaGo/utils/string" + "io" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +const ( + SizeExpiredAt = 10 + SizeKeyLength = 4 + SizeNL = 1 + SizeEnd = 4 +) + +var ( + ErrNotFound = errors.New("cache not found") +) + +type FileStorage struct { + policy *serverconfigs.HTTPCachePolicy + cacheConfig *serverconfigs.HTTPFileCacheConfig + + list *List + locker sync.RWMutex + ticker *utils.Ticker +} + +func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { + return &FileStorage{ + policy: policy, + list: NewList(), + } +} + +// 获取当前的Policy +func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { + return this.policy +} + +// 初始化 +func (this *FileStorage) Init() error { + this.locker.Lock() + defer this.locker.Unlock() + + before := time.Now() + defer func() { + // 统计 + count := 0 + size := int64(0) + if this.list != nil { + stat := this.list.Stat(func(hash string) bool { + return true + }) + count = stat.Count + size = stat.Size + } + + cost := time.Since(before).Seconds() * 1000 + logs.Println("[CACHE]init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: ", fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M") + }() + + // 配置 + cacheConfig := &serverconfigs.HTTPFileCacheConfig{} + optionsJSON, err := json.Marshal(this.policy.Options) + if err != nil { + return err + } + err = json.Unmarshal(optionsJSON, cacheConfig) + if err != nil { + return err + } + this.cacheConfig = cacheConfig + + if !filepath.IsAbs(this.cacheConfig.Dir) { + this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir + } + + dir := this.cacheConfig.Dir + + if len(dir) == 0 { + return errors.New("[CACHE]cache storage dir can not be empty") + } + + // 检查目录是否存在 + _, err = os.Stat(dir) + if err != nil { + if !os.IsNotExist(err) { + return err + } else { + err = os.MkdirAll(dir, 0777) + if err != nil { + return errors.New("[CACHE]can not create dir:" + err.Error()) + } + } + } + + // 初始化list + err = this.initList() + if err != nil { + return err + } + + return nil +} + +func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error { + hash, path := this.keyPath(key) + if !this.list.Exist(hash) { + return ErrNotFound + } + + this.locker.RLock() + defer this.locker.RUnlock() + + // TODO 尝试使用mmap加快读取速度 + fp, err := os.OpenFile(path, os.O_RDONLY, 0444) + if err != nil { + if !os.IsNotExist(err) { + return err + } + return ErrNotFound + } + defer func() { + _ = fp.Close() + }() + + // 是否过期 + buf := make([]byte, SizeExpiredAt) + n, err := fp.Read(buf) + if err != nil { + return err + } + if n != len(buf) { + return ErrNotFound + } + + expiredAt := types.Int64(string(buf)) + if expiredAt < time.Now().Unix() { + // 已过期 + _ = fp.Close() + _ = os.Remove(path) + + return ErrNotFound + } + + buf = make([]byte, SizeKeyLength) + n, err = fp.Read(buf) + if err != nil { + return err + } + if n != len(buf) { + return ErrNotFound + } + keyLength := int(binary.BigEndian.Uint32(buf)) + + offset, err := fp.Seek(-SizeEnd, io.SeekEnd) + if err != nil { + return err + } + buf = make([]byte, SizeEnd) + n, err = fp.Read(buf) + if n != len(buf) { + return ErrNotFound + } + if string(buf) != "\n$$$" { + _ = fp.Close() + _ = os.Remove(path) + return ErrNotFound + } + startOffset := SizeExpiredAt + SizeKeyLength + keyLength + SizeNL + size := int(offset) + SizeEnd - startOffset + + _, err = fp.Seek(int64(startOffset), io.SeekStart) + if err != nil { + return err + } + + for { + n, err := fp.Read(readerBuf) + if n > 0 { + size -= n + if size < SizeEnd { // 已经到了末尾区域 + if n <= SizeEnd-size { // 已经到了末尾 + break + } else { + callback(readerBuf[:n-(SizeEnd-size)], expiredAt) + } + } else { + callback(readerBuf[:n], expiredAt) + } + } + if err != nil { + if err != io.EOF { + return err + } + + break + } + } + + return nil +} + +// 打开缓存文件等待写入 +func (this *FileStorage) Open(key string, expiredAt int64) (*Writer, error) { + hash := stringutil.Md5(key) + dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + _, err := os.Stat(dir) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + err = os.MkdirAll(dir, 0777) + if err != nil { + return nil, err + } + } + + this.locker.Lock() + + path := dir + "/" + hash + ".cache" + writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777) + if err != nil { + this.locker.Unlock() + return nil, err + } + + isOk := false + defer func() { + if err != nil { + isOk = false + } + + // 如果出错了,就删除文件,避免写一半 + if !isOk { + _ = writer.Close() + _ = os.Remove(path) + this.locker.Unlock() + } + }() + + // 写入过期时间 + _, err = writer.WriteString(fmt.Sprintf("%d", expiredAt)) + if err != nil { + return nil, err + } + + // 写入key length + b := make([]byte, SizeKeyLength) + binary.BigEndian.PutUint32(b, uint32(len(key))) + _, err = writer.Write(b) + if err != nil { + return nil, err + } + + // 写入key + _, err = writer.WriteString(key + "\n") + if err != nil { + return nil, err + } + + isOk = true + + return NewWriter(writer, key, expiredAt, &this.locker), nil +} + +// 写入缓存数据 +// 目录结构:$root/p$policyId/$hash[:2]/$hash[2:4]/$hash.cache +// 数据结构: [expiredAt] [key length] [key] \n value \n $$$ +func (this *FileStorage) Write(key string, expiredAt int64, valueReader io.Reader) error { + this.locker.Lock() + defer this.locker.Unlock() + + hash := stringutil.Md5(key) + dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + _, err := os.Stat(dir) + if err != nil { + if !os.IsNotExist(err) { + return err + } + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } + } + path := dir + "/" + hash + ".cache" + writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777) + if err != nil { + return err + } + + isOk := false + defer func() { + err = writer.Close() + if err != nil { + isOk = false + } + + // 如果出错了,就删除文件,避免写一半 + if !isOk { + _ = os.Remove(path) + } + }() + + // 写入过期时间 + _, err = writer.WriteString(fmt.Sprintf("%d", expiredAt)) + if err != nil { + return err + } + + // 写入key length + b := make([]byte, SizeKeyLength) + binary.BigEndian.PutUint32(b, uint32(len(key))) + _, err = writer.Write(b) + if err != nil { + return err + } + + // 写入key + _, err = writer.WriteString(key + "\n") + if err != nil { + return err + } + + // 写入数据 + valueSize, err := io.Copy(writer, valueReader) + if err != nil { + return err + } + + // 写入结束符 + _, err = writer.WriteString("\n$$$") + + isOk = true + + // 写入List + this.list.Add(hash, &Item{ + Key: key, + ExpiredAt: expiredAt, + ValueSize: valueSize, + Size: valueSize + SizeExpiredAt + SizeKeyLength + int64(len(key)) + SizeNL + SizeEnd, + }) + + return nil +} + +// 添加到List +func (this *FileStorage) AddToList(item *Item) { + item.Size = item.ValueSize + SizeExpiredAt + SizeKeyLength + int64(len(item.Key)) + SizeNL + SizeEnd + hash := stringutil.Md5(item.Key) + this.list.Add(hash, item) +} + +// 删除某个键值对应的缓存 +func (this *FileStorage) Delete(key string) error { + this.locker.Lock() + defer this.locker.Unlock() + + hash, path := this.keyPath(key) + this.list.Remove(hash) + err := os.Remove(path) + if err == nil || os.IsNotExist(err) { + return nil + } + return err +} + +// 统计 +func (this *FileStorage) Stat() (*Stat, error) { + this.locker.RLock() + defer this.locker.RUnlock() + + return this.list.Stat(func(hash string) bool { + return true + }), nil +} + +// 清除所有的缓存 +func (this *FileStorage) CleanAll() error { + this.locker.Lock() + defer this.locker.Unlock() + + this.list.Reset() + + // 删除缓存和目录 + // 不能直接删除子目录,比较危险 + dir := this.dir() + fp, err := os.Open(dir) + if err != nil { + return err + } + defer func() { + _ = fp.Close() + }() + + stat, err := fp.Stat() + if err != nil { + return err + } + + if !stat.IsDir() { + return nil + } + + subDirs, err := fp.Readdir(-1) + if err != nil { + return err + } + for _, info := range subDirs { + subDir := info.Name() + + // 检查目录名 + ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir) + if err != nil { + return err + } + if !ok { + continue + } + + // 删除目录 + err = os.RemoveAll(dir + "/" + subDir) + if err != nil { + return err + } + } + + return nil +} + +// 清理过期的缓存 +func (this *FileStorage) Purge(keys []string) error { + this.locker.Lock() + defer this.locker.Unlock() + + for _, key := range keys { + hash, path := this.keyPath(key) + if !this.list.Exist(hash) { + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return err + } + continue + } + + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return err + } + this.list.Remove(hash) + } + return nil +} + +// 停止 +func (this *FileStorage) Stop() { + this.locker.Lock() + defer this.locker.Unlock() + + this.list.Reset() + if this.ticker != nil { + this.ticker.Stop() + } +} + +// 绝对路径 +func (this *FileStorage) dir() string { + return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" +} + +// 获取Key对应的文件路径 +func (this *FileStorage) keyPath(key string) (hash string, path string) { + hash = stringutil.Md5(key) + dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + path = dir + "/" + hash + ".cache" + return +} + +// 获取Hash对应的文件路径 +func (this *FileStorage) hashPath(hash string) (path string) { + if len(hash) != 32 { + return "" + } + dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + path = dir + "/" + hash + ".cache" + return +} + +// 初始化List +func (this *FileStorage) initList() error { + this.list.Reset() + + dir := this.dir() + files, err := filepath.Glob(dir + "/*/*/*.cache") + if err != nil { + return err + } + for _, path := range files { + basename := filepath.Base(path) + index := strings.LastIndex(basename, ".") + if index < 0 { + continue + } + hash := basename[:index] + + // 解析文件信息 + item, err := this.decodeFile(path) + if err != nil { + if err != ErrNotFound { + logs.Println("[CACHE]decode path '" + path + "': " + err.Error()) + } + continue + } + if item == nil { + continue + } + this.list.Add(hash, item) + } + + // 启动定时清理任务 + this.ticker = utils.NewTicker(30 * time.Second) + go func() { + for this.ticker.Next() { + this.purgeLoop() + } + }() + + return nil +} + +// 解析文件信息 +func (this *FileStorage) decodeFile(path string) (*Item, error) { + fp, err := os.OpenFile(path, os.O_RDONLY, 0444) + if err != nil { + return nil, err + } + defer func() { + _ = fp.Close() + }() + + buf := make([]byte, SizeExpiredAt) + n, err := fp.Read(buf) + if err != nil { + return nil, err + } + if n != len(buf) { + // 数据格式错误 + _ = fp.Close() + _ = os.Remove(path) + + return nil, ErrNotFound + } + expiredAt := types.Int64(string(buf)) + if expiredAt < time.Now().Unix() { + // 已过期 + _ = fp.Close() + _ = os.Remove(path) + return nil, ErrNotFound + } + + buf = make([]byte, SizeKeyLength) + n, err = fp.Read(buf) + if err != nil { + return nil, err + } + keyLength := binary.BigEndian.Uint32(buf) + + buf = make([]byte, keyLength) + n, err = fp.Read(buf) + if err != nil { + return nil, err + } + if n != int(keyLength) { + // 数据格式错误 + _ = fp.Close() + _ = os.Remove(path) + return nil, ErrNotFound + } + + stat, err := fp.Stat() + if err != nil { + return nil, err + } + + item := &Item{} + item.ExpiredAt = expiredAt + item.Key = string(buf) + item.Size = stat.Size() + item.ValueSize = item.Size - SizeExpiredAt - SizeKeyLength - int64(keyLength) - SizeNL - SizeEnd + return item, nil +} + +// 清理任务 +func (this *FileStorage) purgeLoop() { + this.list.Purge(1000, func(hash string) { + path := this.hashPath(hash) + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + logs.Println("[CACHE]purge '" + path + "' error: " + err.Error()) + } + }) +} diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go new file mode 100644 index 0000000..47de105 --- /dev/null +++ b/internal/caches/storage_file_test.go @@ -0,0 +1,283 @@ +package caches + +import ( + "bytes" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" + _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/logs" + "runtime" + "testing" + "time" +) + +func TestFileStorage_Init(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + + err := storage.Init() + if err != nil { + t.Fatal(err) + } + //t.Log(storage.list.m) + + /**err = storage.Write("c", bytes.NewReader([]byte("i am c")), 4, "second") + if err != nil { + t.Fatal(err) + }**/ + //logs.PrintAsJSON(storage.list.m, t) + + time.Sleep(2 * time.Second) + storage.purgeLoop() + t.Log(len(storage.list.m), "entries left") +} + +func TestFileStorage_Open(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + defer func() { + t.Log(time.Since(now).Seconds()*1000, "ms") + }() + + writer, err := storage.Open("abc", time.Now().Unix()+3600) + if err != nil { + t.Fatal(err) + } + t.Log(writer) + + err = writer.Write([]byte("Hello,World")) + if err != nil { + t.Fatal(err) + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestFileStorage_Write(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + reader := bytes.NewBuffer([]byte(`my_value +my_value2 +my_value3 +my_value4 +my_value5 +my_value6 +my_value7 +my_value8 +my_value9 +my_value10`)) + err = storage.Write("my-key", time.Now().Unix()+3600, reader) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileStorage_Read(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, expiredAt int64) { + t.Log("[expiredAt]", "["+string(data)+"]") + })) + t.Log(time.Since(now).Seconds()*1000, "ms") +} + +func TestFileStorage_Read_NotFound(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, expiredAt int64) { + t.Log("[" + string(data) + "]") + })) + t.Log(time.Since(now).Seconds()*1000, "ms") +} + +func TestFileStorage_Delete(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + err = storage.Delete("my-key") + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestFileStorage_Stat(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + stat, err := storage.Stat() + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(stat, t) +} + +func TestFileStorage_CleanAll(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + t.Log("before:", storage.list.m) + + err = storage.CleanAll() + if err != nil { + t.Fatal(err) + } + + t.Log("after:", storage.list.m) + t.Log("ok") +} + +func TestFileStorage_Purge(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + _ = storage.Write("a", time.Now().Unix()+3600, bytes.NewReader([]byte("a1"))) + _ = storage.Write("b", time.Now().Unix()+3600, bytes.NewReader([]byte("b1"))) + _ = storage.Write("c", time.Now().Unix()+3600, bytes.NewReader([]byte("c1"))) + _ = storage.Write("d", time.Now().Unix()+3600, bytes.NewReader([]byte("d1"))) + + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + err = storage.Purge([]string{"a", "b1", "c"}) + if err != nil { + t.Fatal(err) + } + + t.Log(storage.list.m) + t.Log("ok") +} + +func TestFileStorage_Stop(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + storage.Stop() +} + +func BenchmarkFileStorage_Read(b *testing.B) { + runtime.GOMAXPROCS(1) + + _ = utils.SetRLimit(1024 * 1024) + + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + b.Fatal(err) + } + buf := make([]byte, 1024) + for i := 0; i < b.N; i++ { + _ = storage.Read("my-key", buf, func(data []byte, expiredAt int64) { + }) + } +} diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go new file mode 100644 index 0000000..02b48b7 --- /dev/null +++ b/internal/caches/storage_interface.go @@ -0,0 +1,33 @@ +package caches + +import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + +// 缓存存储接口 +type StorageInterface interface { + // 初始化 + Init() error + + // 读取缓存 + Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error + + // 打开缓存写入器等待写入 + Open(key string, expiredAt int64) (*Writer, error) + + // 删除某个键值对应的缓存 + Delete(key string) error + + // 统计缓存 + Stat() (*Stat, error) + + // 清除所有缓存 + CleanAll() error + + // 批量删除缓存 + Purge(keys []string) error + + // 停止缓存策略 + Stop() + + // 获取当前存储的Policy + Policy() *serverconfigs.HTTPCachePolicy +} diff --git a/internal/caches/writer.go b/internal/caches/writer.go new file mode 100644 index 0000000..bbc7ae5 --- /dev/null +++ b/internal/caches/writer.go @@ -0,0 +1,71 @@ +package caches + +import ( + "os" + "sync" +) + +type Writer struct { + rawWriter *os.File + key string + size int64 + expiredAt int64 + locker *sync.RWMutex + isReleased bool +} + +func NewWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *Writer { + return &Writer{ + key: key, + rawWriter: rawWriter, + expiredAt: expiredAt, + locker: locker, + } +} + +// 写入数据 +func (this *Writer) Write(data []byte) error { + n, err := this.rawWriter.Write(data) + this.size += int64(n) + if err != nil { + _ = this.rawWriter.Close() + _ = os.Remove(this.rawWriter.Name()) + this.Release() + } + + return err +} + +// 关闭 +func (this *Writer) Close() error { + // 写入结束符 + _, err := this.rawWriter.WriteString("\n$$$") + if err != nil { + _ = os.Remove(this.rawWriter.Name()) + } + + this.Release() + + return err +} + +func (this *Writer) Size() int64 { + return this.size +} + +func (this *Writer) ExpiredAt() int64 { + return this.expiredAt +} + +func (this *Writer) Key() string { + return this.key +} + +// 释放锁,一定要调用 +func (this *Writer) Release() { + if this.isReleased { + return + } + this.isReleased = true + this.locker.Unlock() +} diff --git a/internal/errors/error.go b/internal/errors/error.go new file mode 100644 index 0000000..74faa7d --- /dev/null +++ b/internal/errors/error.go @@ -0,0 +1,60 @@ +package errors + +import ( + "errors" + "path/filepath" + "runtime" + "strconv" +) + +type errorObj struct { + err error + file string + line int + funcName string +} + +func (this *errorObj) Error() string { + s := this.err.Error() + "\n " + this.file + if len(this.funcName) > 0 { + s += ":" + this.funcName + "()" + } + s += ":" + strconv.Itoa(this.line) + return s +} + +// 新错误 +func New(errText string) error { + ptr, file, line, ok := runtime.Caller(1) + funcName := "" + if ok { + frame, _ := runtime.CallersFrames([]uintptr{ptr}).Next() + funcName = filepath.Base(frame.Function) + } + return &errorObj{ + err: errors.New(errText), + file: file, + line: line, + funcName: funcName, + } +} + +// 包装已有错误 +func Wrap(err error) error { + if err == nil { + return nil + } + + ptr, file, line, ok := runtime.Caller(1) + funcName := "" + if ok { + frame, _ := runtime.CallersFrames([]uintptr{ptr}).Next() + funcName = filepath.Base(frame.Function) + } + return &errorObj{ + err: err, + file: file, + line: line, + funcName: funcName, + } +} diff --git a/internal/errors/error_test.go b/internal/errors/error_test.go new file mode 100644 index 0000000..b288d27 --- /dev/null +++ b/internal/errors/error_test.go @@ -0,0 +1,22 @@ +package errors + +import ( + "errors" + "testing" +) + +func TestNew(t *testing.T) { + t.Log(New("hello")) + t.Log(Wrap(errors.New("hello"))) + t.Log(testError1()) + t.Log(Wrap(testError1())) + t.Log(Wrap(testError2())) +} + +func testError1() error { + return New("test error1") +} + +func testError2() error { + return Wrap(testError1()) +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go new file mode 100644 index 0000000..5f5c88f --- /dev/null +++ b/internal/nodes/api_stream.go @@ -0,0 +1,215 @@ +package nodes + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/errors" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/logs" + "strconv" + "time" +) + +type APIStream struct { + stream pb.NodeService_NodeStreamClient +} + +func NewAPIStream() *APIStream { + return &APIStream{} +} + +func (this *APIStream) Start() { + for { + err := this.loop() + if err != nil { + logs.Println("[API STREAM]" + err.Error()) + time.Sleep(10 * time.Second) + continue + } + time.Sleep(1 * time.Second) + } +} + +func (this *APIStream) loop() error { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return errors.Wrap(err) + } + nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context()) + if err != nil { + return errors.Wrap(err) + } + this.stream = nodeStream + for { + message, err := nodeStream.Recv() + if err != nil { + return errors.Wrap(err) + } + + // 处理消息 + switch message.Code { + case messageconfigs.MessageCodeConnectedAPINode: // 连接API节点成功 + err = this.handleConnectedAPINode(message) + case messageconfigs.MessageCodeWriteCache: // 写入缓存 + err = this.handleWriteCache(message) + case messageconfigs.MessageCodeReadCache: // 读取缓存 + err = this.handleReadCache(message) + default: + err = this.handleUnknownMessage(message) + } + if err != nil { + logs.Println("[API STREAM]handle message failed: " + err.Error()) + } + } +} + +// 连接API节点成功 +func (this *APIStream) handleConnectedAPINode(message *pb.NodeStreamMessage) error { + // 更改连接的APINode信息 + if len(message.DataJSON) == 0 { + return nil + } + msg := &messageconfigs.ConnectedAPINodeMessage{} + err := json.Unmarshal(message.DataJSON, msg) + if err != nil { + return errors.Wrap(err) + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + return errors.Wrap(err) + } + + _, err = rpcClient.NodeRPC().UpdateNodeConnectedAPINodes(rpcClient.Context(), &pb.UpdateNodeConnectedAPINodesRequest{ApiNodeIds: []int64{msg.APINodeId}}) + if err != nil { + return errors.Wrap(err) + } + logs.Println("[API STREAM]connected to api node '" + strconv.FormatInt(msg.APINodeId, 10) + "'") + return nil +} + +// 写入缓存 +func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { + msg := &messageconfigs.WriteCacheMessage{} + err := json.Unmarshal(message.DataJSON, msg) + if err != nil { + this.replyFail(message.RequestId, "decode message data failed: "+err.Error()) + return err + } + + cachePolicy := &serverconfigs.HTTPCachePolicy{} + err = json.Unmarshal(msg.CachePolicyJSON, cachePolicy) + if err != nil { + this.replyFail(message.RequestId, "decode cache policy config failed: "+err.Error()) + return err + } + + storage := caches.SharedManager.FindStorageWithPolicy(cachePolicy.Id) + if storage == nil { + storage = caches.SharedManager.NewStorageWithPolicy(cachePolicy) + if storage == nil { + this.replyFail(message.RequestId, "invalid storage type '"+cachePolicy.Type+"'") + return nil + } + defer func() { + storage.Stop() + }() + err = storage.Init() + if err != nil { + this.replyFail(message.RequestId, "storage init failed: "+err.Error()) + return err + } + } + + writer, err := storage.Open(msg.Key, time.Now().Unix()+msg.LifeSeconds) + if err != nil { + this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) + return err + } + + defer func() { + // 不用担心重复 + _ = writer.Close() + }() + + err = writer.Write(msg.Value) + if err != nil { + this.replyFail(message.RequestId, "write failed: "+err.Error()) + return err + } + _ = writer.Close() + + this.replyOk(message.RequestId, "write ok") + + return nil +} + +// 读取缓存 +func (this *APIStream) handleReadCache(message *pb.NodeStreamMessage) error { + msg := &messageconfigs.ReadCacheMessage{} + err := json.Unmarshal(message.DataJSON, msg) + if err != nil { + this.replyFail(message.RequestId, "decode message data failed: "+err.Error()) + return err + } + cachePolicy := &serverconfigs.HTTPCachePolicy{} + err = json.Unmarshal(msg.CachePolicyJSON, cachePolicy) + if err != nil { + this.replyFail(message.RequestId, "decode cache policy config failed: "+err.Error()) + return err + } + + storage := caches.SharedManager.FindStorageWithPolicy(cachePolicy.Id) + if storage == nil { + storage = caches.SharedManager.NewStorageWithPolicy(cachePolicy) + if storage == nil { + this.replyFail(message.RequestId, "invalid storage type '"+cachePolicy.Type+"'") + return nil + } + defer func() { + storage.Stop() + }() + err = storage.Init() + if err != nil { + this.replyFail(message.RequestId, "storage init failed: "+err.Error()) + return err + } + } + + buf := make([]byte, 1024) + size := 0 + err = storage.Read(msg.Key, buf, func(data []byte, expiredAt int64) { + size += len(data) + }) + if err != nil { + if err == caches.ErrNotFound { + this.replyFail(message.RequestId, "key not found") + return nil + } + this.replyFail(message.RequestId, "read key failed: "+err.Error()) + return err + } + + this.replyOk(message.RequestId, "value "+strconv.Itoa(size)+" bytes") + + return nil +} + +// 处理未知消息 +func (this *APIStream) handleUnknownMessage(message *pb.NodeStreamMessage) error { + this.replyFail(message.RequestId, "unknown message code '"+message.Code+"'") + return nil +} + +// 回复失败 +func (this *APIStream) replyFail(requestId int64, message string) { + _ = this.stream.Send(&pb.NodeStreamMessage{RequestId: requestId, IsOk: false, Message: message}) +} + +// 回复成功 +func (this *APIStream) replyOk(requestId int64, message string) { + _ = this.stream.Send(&pb.NodeStreamMessage{RequestId: requestId, IsOk: true, Message: message}) +} diff --git a/internal/nodes/api_stream_test.go b/internal/nodes/api_stream_test.go new file mode 100644 index 0000000..00a85d1 --- /dev/null +++ b/internal/nodes/api_stream_test.go @@ -0,0 +1,8 @@ +package nodes + +import "testing" + +func TestAPIStream_Start(t *testing.T) { + apiStream := NewAPIStream() + apiStream.Start() +} diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index fba1843..5f12bdf 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -56,6 +56,8 @@ type HTTPRequest struct { rewriteRule *serverconfigs.HTTPRewriteRule // 匹配到的重写规则 rewriteReplace string // 重写规则的目标 rewriteIsExternalURL bool // 重写目标是否为外部URL + cachePolicy *serverconfigs.HTTPCachePolicy // 缓存策略 + cacheCond *serverconfigs.HTTPCacheCond // 缓存条件 } // 初始化 diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 81b734a..1fdb678 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -11,7 +11,6 @@ import ( "time" ) -var stop = make(chan bool) var lastVersion = int64(-1) var sharedNodeConfig *nodeconfigs.NodeConfig @@ -52,6 +51,9 @@ func (this *Node) Start() { // 设置rlimit _ = utils.SetRLimit(1024 * 1024) + // 连接API + go NewAPIStream().Start() + // 启动端口 err = sharedListenerManager.Start(nodeConfig) if err != nil { @@ -59,7 +61,7 @@ func (this *Node) Start() { } // hold住进程 - <-stop + select {} } // 读取API配置