写入和删除缓存文件时增加线程数限制

This commit is contained in:
刘祥超
2024-04-29 22:36:26 +08:00
parent 7febc6aaf3
commit 973324ae8f
15 changed files with 117 additions and 136 deletions

View File

@@ -525,11 +525,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return nil, fmt.Errorf("%w(001)", ErrFileIsWriting)
}
if !isFlushing && !fsutils.WriteReady() {
sharedWritingFileKeyLocker.Unlock()
return nil, ErrServerIsBusy
}
sharedWritingFileKeyMap[key] = zero.New()
sharedWritingFileKeyLocker.Unlock()
defer func() {
@@ -596,7 +591,11 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
// 数据库中是否存在
existsCacheItem, _, _ := this.list.Exist(hash)
if existsCacheItem {
if !fsutils.ReaderLimiter.TryAck() {
return nil, ErrServerIsBusy
}
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
fsutils.ReaderLimiter.Release()
if err == nil {
var partialReader = NewPartialFileReader(readerFp)
err = partialReader.Init()
@@ -629,15 +628,19 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if isNewCreated && existsFile {
flags |= os.O_TRUNC
}
fsutils.WriteBegin()
if !fsutils.WriterLimiter.TryAck() {
return nil, ErrServerIsBusy
}
writer, err := os.OpenFile(tmpPath, flags, 0666)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777)
// open file again
fsutils.WriterLimiter.Ack()
writer, err = os.OpenFile(tmpPath, flags, 0666)
fsutils.WriterLimiter.Release()
}
if err != nil {
return nil, err
@@ -654,7 +657,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if !isOk {
_ = writer.Close()
if removeOnFailure {
_ = os.Remove(tmpPath)
_ = fsutils.Remove(tmpPath)
}
}
}()
@@ -697,9 +700,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
metaBodySize = bodySize
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_, err = writer.Write(metaBytes)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
return nil, err
}
@@ -1152,9 +1155,7 @@ func (this *FileStorage) purgeLoop() {
for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1211,9 +1212,7 @@ func (this *FileStorage) purgeLoop() {
var before = time.Now()
err := this.list.PurgeLFU(count, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1481,7 +1480,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
openFileCache.Close(path)
}
var err = os.Remove(path)
var err = fsutils.Remove(path)
if err == nil || os.IsNotExist(err) {
err = nil
@@ -1493,7 +1492,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
_, statErr := os.Stat(partialPath)
if statErr == nil {
_ = os.Remove(partialPath)
_ = fsutils.Remove(partialPath)
SharedPartialRangesQueue.Delete(partialPath)
}
}