mirror of
				https://gitee.com/gitea/gitea
				synced 2025-11-04 08:30:25 +08:00 
			
		
		
		
	Improve queue & process & stacktrace (#24636)
Although some features are mixed together in this PR, this PR is not
that large, and these features are all related.
Actually there are more than 70 lines are for a toy "test queue", so
this PR is quite simple.
Major features:
1. Allow site admin to clear a queue (remove all items in a queue)
* Because there is no transaction, the "unique queue" could be corrupted
in rare cases, that's unfixable.
* eg: the item is in the "set" but not in the "list", so the item would
never be able to be pushed into the queue.
* Now site admin could simply clear the queue, then everything becomes
correct, the lost items could be re-pushed into queue by future
operations.
3. Split the "admin/monitor" to separate pages
4. Allow to download diagnosis report
* In history, there were many users reporting that Gitea queue gets
stuck, or Gitea's CPU is 100%
    * With diagnosis report, maintainers could know what happens clearly
The diagnosis report sample:
[gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip)
, use "go tool pprof profile.dat" to view the report.
Screenshots:



---------
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: Giteabot <teabot@gitea.io>
			
			
This commit is contained in:
		@@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
 | 
			
		||||
func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
 | 
			
		||||
	q.mu.Lock()
 | 
			
		||||
	defer q.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if !q.isUnique {
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
	return q.set.Contains(string(data)), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -107,7 +109,9 @@ func (q *baseChannel) Close() error {
 | 
			
		||||
	defer q.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	close(q.c)
 | 
			
		||||
	q.set = container.Set[string]{}
 | 
			
		||||
	if q.isUnique {
 | 
			
		||||
		q.set = container.Set[string]{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error {
 | 
			
		||||
	for q.c != nil && len(q.c) > 0 {
 | 
			
		||||
		<-q.c
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if q.isUnique {
 | 
			
		||||
		q.set = container.Set[string]{}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -77,6 +77,14 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
 | 
			
		||||
	}
 | 
			
		||||
	lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))
 | 
			
		||||
 | 
			
		||||
	for lq.q.Len() > 0 {
 | 
			
		||||
		if _, err := lq.q.LPop(); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// the "set" must be cleared after the "list" because there is no transaction.
 | 
			
		||||
	// it's better to have duplicate items than losing items.
 | 
			
		||||
	members, err := lq.set.Members()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err // seriously corrupted
 | 
			
		||||
@@ -84,10 +92,5 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
 | 
			
		||||
	for _, v := range members {
 | 
			
		||||
		_, _ = lq.set.Remove(v)
 | 
			
		||||
	}
 | 
			
		||||
	for lq.q.Len() > 0 {
 | 
			
		||||
		if _, err = lq.q.LPop(); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -123,7 +123,10 @@ func (q *baseRedis) Close() error {
 | 
			
		||||
func (q *baseRedis) RemoveAll(ctx context.Context) error {
 | 
			
		||||
	q.mu.Lock()
 | 
			
		||||
	defer q.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	c1 := q.client.Del(ctx, q.cfg.QueueFullName)
 | 
			
		||||
	// the "set" must be cleared after the "list" because there is no transaction.
 | 
			
		||||
	// it's better to have duplicate items than losing items.
 | 
			
		||||
	c2 := q.client.Del(ctx, q.cfg.SetFullName)
 | 
			
		||||
	if c1.Err() != nil {
 | 
			
		||||
		return c1.Err()
 | 
			
		||||
 
 | 
			
		||||
@@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface {
 | 
			
		||||
	// FlushWithContext tries to make the handler process all items in the queue synchronously.
 | 
			
		||||
	// It is for testing purpose only. It's not designed to be used in a cluster.
 | 
			
		||||
	FlushWithContext(ctx context.Context, timeout time.Duration) error
 | 
			
		||||
 | 
			
		||||
	// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
 | 
			
		||||
	RemoveAllItems(ctx context.Context) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var manager *Manager
 | 
			
		||||
 
 | 
			
		||||
@@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveAllItems removes all items in the baes queue
 | 
			
		||||
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
 | 
			
		||||
	return q.baseQueue.RemoveAll(ctx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
 | 
			
		||||
	bs, err := json.Marshal(data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user