diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 3d884a1..4a896ac 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -20,6 +20,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" ) @@ -31,7 +32,8 @@ const ( ) var ( - ErrNotFound = errors.New("cache not found") + ErrNotFound = errors.New("cache not found") + ErrFileIsWriting = errors.New("the file is writing") ) type FileStorage struct { @@ -251,19 +253,17 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { } } - this.locker.Lock() - // 先删除 this.list.Remove(hash) path := dir + "/" + hash + ".cache" - writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777) + writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666) if err != nil { - this.locker.Unlock() return nil, err } isOk := false + removeOnFailure := true defer func() { if err != nil { isOk = false @@ -272,11 +272,24 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { // 如果出错了,就删除文件,避免写一半 if !isOk { _ = writer.Close() - _ = os.Remove(path) - this.locker.Unlock() + if removeOnFailure { + _ = os.Remove(path) + } } }() + // 尝试锁定,如果锁定失败,则直接返回 + err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + removeOnFailure = false + return nil, ErrFileIsWriting + } + + err = writer.Truncate(0) + if err != nil { + return nil, err + } + // 写入过期时间 _, err = writer.WriteString(fmt.Sprintf("%d", expiredAt)) if err != nil { @@ -299,7 +312,7 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { isOk = true - return NewFileWriter(writer, key, expiredAt, &this.locker), nil + return NewFileWriter(writer, key, expiredAt), nil } // 写入缓存数据 diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index ddb4d70..705ac89 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -8,6 +8,8 @@ import ( _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/logs" "runtime" + "strconv" + "sync" "testing" "time" ) @@ -72,6 +74,111 @@ func TestFileStorage_Open(t *testing.T) { } } +func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + defer func() { + t.Log(time.Since(now).Seconds()*1000, "ms") + }() + + wg := sync.WaitGroup{} + count := 100 + wg.Add(count) + + for i := 0; i < count; i++ { + go func(i int) { + defer wg.Done() + + writer, err := storage.Open("abc"+strconv.Itoa(i), time.Now().Unix()+3600) + if err != nil { + if err != ErrFileIsWriting { + t.Fatal(err) + } + return + } + //t.Log(writer) + + _, err = writer.Write([]byte("Hello,World")) + if err != nil { + t.Fatal(err) + } + + // 故意造成慢速写入 + time.Sleep(1 * time.Second) + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + }(i) + } + + wg.Wait() +} + +func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + defer func() { + t.Log(time.Since(now).Seconds()*1000, "ms") + }() + + wg := sync.WaitGroup{} + count := 100 + wg.Add(count) + + for i := 0; i < count; i++ { + go func(i int) { + defer wg.Done() + + writer, err := storage.Open("abc"+strconv.Itoa(0), time.Now().Unix()+3600) + if err != nil { + if err != ErrFileIsWriting { + t.Fatal(err) + } + return + } + //t.Log(writer) + + t.Log("writing") + _, err = writer.Write([]byte("Hello,World")) + if err != nil { + t.Fatal(err) + } + + // 故意造成慢速写入 + time.Sleep(time.Duration(1) * time.Second) + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + }(i) + } + + wg.Wait() +} + func TestFileStorage_Write(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 29e7076..e1f0e0f 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -2,7 +2,6 @@ package caches import ( "os" - "sync" ) type FileWriter struct { @@ -10,16 +9,14 @@ type FileWriter struct { key string size int64 expiredAt int64 - locker *sync.RWMutex isReleased bool } -func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *FileWriter { +func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter { return &FileWriter{ key: key, rawWriter: rawWriter, expiredAt: expiredAt, - locker: locker, } } @@ -42,6 +39,7 @@ func (this *FileWriter) Close() error { if err != nil { _ = os.Remove(this.rawWriter.Name()) } + _ = this.rawWriter.Close() this.Release() @@ -73,5 +71,4 @@ func (this *FileWriter) Release() { return } this.isReleased = true - this.locker.Unlock() } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index ac73c14..99773e9 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -366,7 +366,9 @@ func (this *HTTPWriter) prepareCache(size int64) { expiredAt := utils.UnixTime() + life cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt) if err != nil { - remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + if err != caches.ErrFileIsWriting { + remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + } return } this.cacheWriter = cacheWriter