diff --git a/internal/goman/lib_test.go b/internal/goman/lib_test.go index 11358d1..2b7616f 100644 --- a/internal/goman/lib_test.go +++ b/internal/goman/lib_test.go @@ -1,27 +1,28 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package goman +package goman_test import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" "testing" "time" ) func TestNew(t *testing.T) { - New(func() { + goman.New(func() { t.Log("Hello") - t.Log(List()) + t.Log(goman.List()) }) time.Sleep(1 * time.Second) - t.Log(List()) + t.Log(goman.List()) time.Sleep(1 * time.Second) } func TestNewWithArgs(t *testing.T) { - NewWithArgs(func(args ...interface{}) { + goman.NewWithArgs(func(args ...interface{}) { t.Log(args[0], args[1]) }, 1, 2) time.Sleep(1 * time.Second) diff --git a/internal/goman/task_group.go b/internal/goman/task_group.go new file mode 100644 index 0000000..7c9493b --- /dev/null +++ b/internal/goman/task_group.go @@ -0,0 +1,52 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package goman + +import ( + "github.com/TeaOSLab/EdgeNode/internal/zero" + "runtime" + "sync" +) + +type TaskGroup struct { + semi chan zero.Zero + wg *sync.WaitGroup + locker *sync.RWMutex +} + +func NewTaskGroup() *TaskGroup { + var concurrent = runtime.NumCPU() + if concurrent <= 1 { + concurrent = 2 + } + return &TaskGroup{ + semi: make(chan zero.Zero, concurrent), + wg: &sync.WaitGroup{}, + locker: &sync.RWMutex{}, + } +} + +func (this *TaskGroup) Run(f func()) { + this.wg.Add(1) + go func() { + defer this.wg.Done() + + this.semi <- zero.Zero{} + + f() + + <-this.semi + }() +} + +func (this *TaskGroup) Wait() { + this.wg.Wait() +} + +func (this *TaskGroup) Lock() { + this.locker.Lock() +} + +func (this *TaskGroup) Unlock() { + this.locker.Unlock() +} diff --git a/internal/goman/task_group_test.go b/internal/goman/task_group_test.go new file mode 100644 index 0000000..99a435f --- /dev/null +++ b/internal/goman/task_group_test.go @@ -0,0 +1,30 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package goman_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" + "runtime" + "testing" +) + +func TestNewTaskGroup(t *testing.T) { + var group = goman.NewTaskGroup() + var m = map[int]bool{} + + for i := 0; i < runtime.NumCPU()*2; i++ { + var index = i + group.Run(func() { + t.Log("task", index) + + group.Lock() + _, ok := m[index] + if ok { + t.Error("duplicated:", index) + } + m[index] = true + group.Unlock() + }) + } + group.Wait() +} diff --git a/internal/nodes/http_cache_task_manager.go b/internal/nodes/http_cache_task_manager.go index bdaf5de..62e0d13 100644 --- a/internal/nodes/http_cache_task_manager.go +++ b/internal/nodes/http_cache_task_manager.go @@ -19,6 +19,7 @@ import ( "io" "net" "net/http" + "os" "regexp" "strings" "time" @@ -131,21 +132,29 @@ func (this *HTTPCacheTaskManager) Loop() error { var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{} + var taskGroup = goman.NewTaskGroup() for _, key := range keys { - err = this.processKey(key) + var taskKey = key + taskGroup.Run(func() { + processErr := this.processKey(taskKey) + var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{ + Id: taskKey.Id, + NodeClusterId: taskKey.NodeClusterId, + Error: "", + } - var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{ - Id: key.Id, - NodeClusterId: key.NodeClusterId, - Error: "", - } + if processErr != nil { + pbResult.Error = processErr.Error() + } - if err != nil { - pbResult.Error = err.Error() - } - pbResults = append(pbResults, pbResult) + taskGroup.Lock() + pbResults = append(pbResults, pbResult) + taskGroup.Unlock() + }) } + taskGroup.Wait() + _, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults}) if err != nil { return err @@ -242,6 +251,7 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { req.Header.Set("Accept-Encoding", "gzip, deflate, br") resp, err := this.httpClient.Do(req) if err != nil { + err = this.simplifyErr(err) return errors.New("request failed: " + fullKey + ": " + err.Error()) } @@ -249,13 +259,32 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { _ = resp.Body.Close() }() - // 读取内容,以便于生成缓存 - _, _ = io.Copy(io.Discard, resp.Body) - // 处理502 if resp.StatusCode == http.StatusBadGateway { return errors.New("read origin site timeout") } + // 读取内容,以便于生成缓存 + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + if err != io.EOF { + err = this.simplifyErr(err) + return errors.New("request failed: " + fullKey + ": " + err.Error()) + } else { + err = nil + } + } + return nil } + +func (this *HTTPCacheTaskManager) simplifyErr(err error) error { + if err == nil { + return nil + } + if os.IsTimeout(err) { + return errors.New("timeout to read origin site") + } + + return err +}