diff --git a/internal/utils/taskutils/concurrent.go b/internal/utils/taskutils/concurrent.go new file mode 100644 index 00000000..c4e8317c --- /dev/null +++ b/internal/utils/taskutils/concurrent.go @@ -0,0 +1,56 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package taskutils + +import ( + "errors" + "reflect" + "sync" +) + +func RunConcurrent(tasks any, concurrent int, f func(task any)) error { + if tasks == nil { + return nil + } + var tasksValue = reflect.ValueOf(tasks) + if tasksValue.Type().Kind() != reflect.Slice { + return errors.New("ony works for slice") + } + + var countTasks = tasksValue.Len() + if countTasks == 0 { + return nil + } + + if concurrent <= 0 { + concurrent = 8 + } + if concurrent > countTasks { + concurrent = countTasks + } + + var taskChan = make(chan any, countTasks) + for i := 0; i < countTasks; i++ { + taskChan <- tasksValue.Index(i).Interface() + } + + var wg = &sync.WaitGroup{} + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func() { + defer wg.Done() + + for { + select { + case task := <-taskChan: + f(task) + default: + return + } + } + }() + } + wg.Wait() + + return nil +} diff --git a/internal/utils/taskutils/concurrent_test.go b/internal/utils/taskutils/concurrent_test.go new file mode 100644 index 00000000..4726eb96 --- /dev/null +++ b/internal/utils/taskutils/concurrent_test.go @@ -0,0 +1,17 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package taskutils_test + +import ( + "github.com/TeaOSLab/EdgeAdmin/internal/utils/taskutils" + "testing" +) + +func TestRunConcurrent(t *testing.T) { + err := taskutils.RunConcurrent([]string{"a", "b", "c", "d", "e"}, 3, func(task any) { + t.Log("run", task) + }) + if err != nil { + t.Fatal(err) + } +}