优化缓存管理

This commit is contained in:
GoEdgeLab
2021-01-13 12:02:50 +08:00
parent 0e51e1c7cb
commit ba6fa92dc9
22 changed files with 1517 additions and 548 deletions

View File

@@ -159,26 +159,38 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
}
expiredAt := time.Now().Unix() + msg.LifeSeconds
writer, err := storage.Open(msg.Key, expiredAt)
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200)
if err != nil {
this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
return err
}
_, err = writer.Write(msg.Value)
// 写入一个空的Header
_, err = writer.WriteHeader([]byte(":"))
if err != nil {
_ = writer.Discard()
this.replyFail(message.RequestId, "write failed: "+err.Error())
return err
}
err = writer.Close()
if err == nil {
storage.AddToList(&caches.Item{
Key: msg.Key,
ExpiredAt: expiredAt,
})
// 写入数据
_, err = writer.Write(msg.Value)
if err != nil {
this.replyFail(message.RequestId, "write failed: "+err.Error())
return err
}
err = writer.Close()
if err != nil {
this.replyFail(message.RequestId, "write failed: "+err.Error())
return err
}
storage.AddToList(&caches.Item{
Key: msg.Key,
ExpiredAt: expiredAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
this.replyOk(message.RequestId, "write ok")
return nil
@@ -203,21 +215,20 @@ func (this *APIStream) handleReadCache(message *pb.NodeStreamMessage) error {
}()
}
buf := make([]byte, 1024)
size := 0
err = storage.Read(msg.Key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) {
size += len(data)
})
reader, err := storage.OpenReader(msg.Key)
if err != nil {
if err == caches.ErrNotFound {
this.replyFail(message.RequestId, "key not found")
return nil
}
this.replyFail(message.RequestId, "read key failed: "+err.Error())
return err
return nil
}
defer func() {
_ = reader.Close()
}()
this.replyOk(message.RequestId, "value "+strconv.Itoa(size)+" bytes")
this.replyOk(message.RequestId, "value "+strconv.FormatInt(reader.BodySize(), 10)+" bytes")
return nil
}
@@ -373,6 +384,13 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
return
}
if resp.StatusCode != 200 {
locker.Lock()
errorMessages = append(errorMessages, "request failed: "+key+": status code '"+strconv.Itoa(resp.StatusCode)+"'")
locker.Unlock()
return
}
defer func() {
_ = resp.Body.Close()
}()
@@ -388,7 +406,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
}
expiredAt := time.Now().Unix() + 8600
writer, err := storage.Open(key, expiredAt) // TODO 可以设置缓存过期时间
writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间
if err != nil {
locker.Lock()
errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())
@@ -398,6 +416,21 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
buf := make([]byte, 16*1024)
isClosed := false
// 写入Header
for k, v := range resp.Header {
for _, v1 := range v {
_, err = writer.WriteHeader([]byte(k + ":" + v1 + "\n"))
if err != nil {
locker.Lock()
errorMessages = append(errorMessages, "write failed: "+key+": "+err.Error())
locker.Unlock()
return
}
}
}
// 写入Body
for {
n, err := resp.Body.Read(buf)
if n > 0 {
@@ -411,6 +444,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
}
if err != nil {
if err == io.EOF {
err = writer.Close()
if err == nil {
storage.AddToList(&caches.Item{
@@ -484,7 +518,7 @@ func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage)
cmd.Add(systemctl, "is-enabled", shortName)
output, err := cmd.Run()
if err != nil {
this.replyFail(message.RequestId, "'systemctl' command error: " + err.Error())
this.replyFail(message.RequestId, "'systemctl' command error: "+err.Error())
return nil
}
if output == "enabled" {