刷新/预热缓存任务可以并行处理

This commit is contained in:
GoEdgeLab
2023-07-26 18:45:17 +08:00
parent a3d8aae4fe
commit 42e6f848e3
4 changed files with 130 additions and 18 deletions

View File

@@ -1,27 +1,28 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package goman package goman_test
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"testing" "testing"
"time" "time"
) )
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
New(func() { goman.New(func() {
t.Log("Hello") t.Log("Hello")
t.Log(List()) t.Log(goman.List())
}) })
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
t.Log(List()) t.Log(goman.List())
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
func TestNewWithArgs(t *testing.T) { func TestNewWithArgs(t *testing.T) {
NewWithArgs(func(args ...interface{}) { goman.NewWithArgs(func(args ...interface{}) {
t.Log(args[0], args[1]) t.Log(args[0], args[1])
}, 1, 2) }, 1, 2)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

View File

@@ -0,0 +1,52 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package goman
import (
"github.com/TeaOSLab/EdgeNode/internal/zero"
"runtime"
"sync"
)
type TaskGroup struct {
semi chan zero.Zero
wg *sync.WaitGroup
locker *sync.RWMutex
}
func NewTaskGroup() *TaskGroup {
var concurrent = runtime.NumCPU()
if concurrent <= 1 {
concurrent = 2
}
return &TaskGroup{
semi: make(chan zero.Zero, concurrent),
wg: &sync.WaitGroup{},
locker: &sync.RWMutex{},
}
}
func (this *TaskGroup) Run(f func()) {
this.wg.Add(1)
go func() {
defer this.wg.Done()
this.semi <- zero.Zero{}
f()
<-this.semi
}()
}
func (this *TaskGroup) Wait() {
this.wg.Wait()
}
func (this *TaskGroup) Lock() {
this.locker.Lock()
}
func (this *TaskGroup) Unlock() {
this.locker.Unlock()
}

View File

@@ -0,0 +1,30 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package goman_test
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"runtime"
"testing"
)
func TestNewTaskGroup(t *testing.T) {
var group = goman.NewTaskGroup()
var m = map[int]bool{}
for i := 0; i < runtime.NumCPU()*2; i++ {
var index = i
group.Run(func() {
t.Log("task", index)
group.Lock()
_, ok := m[index]
if ok {
t.Error("duplicated:", index)
}
m[index] = true
group.Unlock()
})
}
group.Wait()
}

View File

@@ -19,6 +19,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@@ -131,21 +132,29 @@ func (this *HTTPCacheTaskManager) Loop() error {
var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{} var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{}
var taskGroup = goman.NewTaskGroup()
for _, key := range keys { for _, key := range keys {
err = this.processKey(key) var taskKey = key
taskGroup.Run(func() {
processErr := this.processKey(taskKey)
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{
Id: taskKey.Id,
NodeClusterId: taskKey.NodeClusterId,
Error: "",
}
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{ if processErr != nil {
Id: key.Id, pbResult.Error = processErr.Error()
NodeClusterId: key.NodeClusterId, }
Error: "",
}
if err != nil { taskGroup.Lock()
pbResult.Error = err.Error() pbResults = append(pbResults, pbResult)
} taskGroup.Unlock()
pbResults = append(pbResults, pbResult) })
} }
taskGroup.Wait()
_, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults}) _, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults})
if err != nil { if err != nil {
return err return err
@@ -242,6 +251,7 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
req.Header.Set("Accept-Encoding", "gzip, deflate, br") req.Header.Set("Accept-Encoding", "gzip, deflate, br")
resp, err := this.httpClient.Do(req) resp, err := this.httpClient.Do(req)
if err != nil { if err != nil {
err = this.simplifyErr(err)
return errors.New("request failed: " + fullKey + ": " + err.Error()) return errors.New("request failed: " + fullKey + ": " + err.Error())
} }
@@ -249,13 +259,32 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
_ = resp.Body.Close() _ = resp.Body.Close()
}() }()
// 读取内容,以便于生成缓存
_, _ = io.Copy(io.Discard, resp.Body)
// 处理502 // 处理502
if resp.StatusCode == http.StatusBadGateway { if resp.StatusCode == http.StatusBadGateway {
return errors.New("read origin site timeout") return errors.New("read origin site timeout")
} }
// 读取内容,以便于生成缓存
_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
if err != io.EOF {
err = this.simplifyErr(err)
return errors.New("request failed: " + fullKey + ": " + err.Error())
} else {
err = nil
}
}
return nil return nil
} }
func (this *HTTPCacheTaskManager) simplifyErr(err error) error {
if err == nil {
return nil
}
if os.IsTimeout(err) {
return errors.New("timeout to read origin site")
}
return err
}