mirror of
				https://gitee.com/gitea/gitea
				synced 2025-11-04 00:20:25 +08:00 
			
		
		
		
	refactor issue indexer, add some testing and fix a bug (#6131)
* refactor issue indexer, add some testing and fix a bug * fix error copyright year on comment header * issues indexer package import keep consistent
This commit is contained in:
		@@ -4,6 +4,15 @@
 | 
			
		||||
 | 
			
		||||
package issues
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"code.gitea.io/gitea/models"
 | 
			
		||||
	"code.gitea.io/gitea/modules/log"
 | 
			
		||||
	"code.gitea.io/gitea/modules/setting"
 | 
			
		||||
	"code.gitea.io/gitea/modules/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// IndexerData data stored in the issue indexer
 | 
			
		||||
type IndexerData struct {
 | 
			
		||||
	ID       int64
 | 
			
		||||
@@ -34,3 +43,142 @@ type Indexer interface {
 | 
			
		||||
	Delete(ids ...int64) error
 | 
			
		||||
	Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// issueIndexerUpdateQueue queue of issue ids to be updated
 | 
			
		||||
	issueIndexerUpdateQueue Queue
 | 
			
		||||
	issueIndexer            Indexer
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
 | 
			
		||||
// all issue index done.
 | 
			
		||||
func InitIssueIndexer(syncReindex bool) error {
 | 
			
		||||
	var populate bool
 | 
			
		||||
	switch setting.Indexer.IssueType {
 | 
			
		||||
	case "bleve":
 | 
			
		||||
		issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
 | 
			
		||||
		exist, err := issueIndexer.Init()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		populate = !exist
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	switch setting.Indexer.IssueIndexerQueueType {
 | 
			
		||||
	case setting.LevelQueueType:
 | 
			
		||||
		issueIndexerUpdateQueue, err = NewLevelQueue(
 | 
			
		||||
			issueIndexer,
 | 
			
		||||
			setting.Indexer.IssueIndexerQueueDir,
 | 
			
		||||
			setting.Indexer.IssueIndexerQueueBatchNumber)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	case setting.ChannelQueueType:
 | 
			
		||||
		issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go issueIndexerUpdateQueue.Run()
 | 
			
		||||
 | 
			
		||||
	if populate {
 | 
			
		||||
		if syncReindex {
 | 
			
		||||
			populateIssueIndexer()
 | 
			
		||||
		} else {
 | 
			
		||||
			go populateIssueIndexer()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// populateIssueIndexer populate the issue indexer with issue data
 | 
			
		||||
func populateIssueIndexer() {
 | 
			
		||||
	for page := 1; ; page++ {
 | 
			
		||||
		repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
 | 
			
		||||
			Page:        page,
 | 
			
		||||
			PageSize:    models.RepositoryListDefaultPageSize,
 | 
			
		||||
			OrderBy:     models.SearchOrderByID,
 | 
			
		||||
			Private:     true,
 | 
			
		||||
			Collaborate: util.OptionalBoolFalse,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Error(4, "SearchRepositoryByName: %v", err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if len(repos) == 0 {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, repo := range repos {
 | 
			
		||||
			is, err := models.Issues(&models.IssuesOptions{
 | 
			
		||||
				RepoIDs:  []int64{repo.ID},
 | 
			
		||||
				IsClosed: util.OptionalBoolNone,
 | 
			
		||||
				IsPull:   util.OptionalBoolNone,
 | 
			
		||||
			})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Error(4, "Issues: %v", err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if err = models.IssueList(is).LoadDiscussComments(); err != nil {
 | 
			
		||||
				log.Error(4, "LoadComments: %v", err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			for _, issue := range is {
 | 
			
		||||
				UpdateIssueIndexer(issue)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateIssueIndexer add/update an issue to the issue indexer
 | 
			
		||||
func UpdateIssueIndexer(issue *models.Issue) {
 | 
			
		||||
	var comments []string
 | 
			
		||||
	for _, comment := range issue.Comments {
 | 
			
		||||
		if comment.Type == models.CommentTypeComment {
 | 
			
		||||
			comments = append(comments, comment.Content)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	issueIndexerUpdateQueue.Push(&IndexerData{
 | 
			
		||||
		ID:       issue.ID,
 | 
			
		||||
		RepoID:   issue.RepoID,
 | 
			
		||||
		Title:    issue.Title,
 | 
			
		||||
		Content:  issue.Content,
 | 
			
		||||
		Comments: comments,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeleteRepoIssueIndexer deletes repo's all issues indexes
 | 
			
		||||
func DeleteRepoIssueIndexer(repo *models.Repository) {
 | 
			
		||||
	var ids []int64
 | 
			
		||||
	ids, err := models.GetIssueIDsByRepoID(repo.ID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(4, "getIssueIDsByRepoID failed: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(ids) <= 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	issueIndexerUpdateQueue.Push(&IndexerData{
 | 
			
		||||
		IDs:      ids,
 | 
			
		||||
		IsDelete: true,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SearchIssuesByKeyword search issue ids by keywords and repo id
 | 
			
		||||
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
 | 
			
		||||
	var issueIDs []int64
 | 
			
		||||
	res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, r := range res.Hits {
 | 
			
		||||
		issueIDs = append(issueIDs, r.ID)
 | 
			
		||||
	}
 | 
			
		||||
	return issueIDs, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										51
									
								
								modules/indexer/issues/indexer_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								modules/indexer/issues/indexer_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,51 @@
 | 
			
		||||
// Copyright 2019 The Gitea Authors. All rights reserved.
 | 
			
		||||
// Use of this source code is governed by a MIT-style
 | 
			
		||||
// license that can be found in the LICENSE file.
 | 
			
		||||
 | 
			
		||||
package issues
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"code.gitea.io/gitea/models"
 | 
			
		||||
	"code.gitea.io/gitea/modules/setting"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func fatalTestError(fmtStr string, args ...interface{}) {
 | 
			
		||||
	fmt.Fprintf(os.Stderr, fmtStr, args...)
 | 
			
		||||
	os.Exit(1)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMain(m *testing.M) {
 | 
			
		||||
	models.MainTest(m, filepath.Join("..", "..", ".."))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSearchIssues(t *testing.T) {
 | 
			
		||||
	assert.NoError(t, models.PrepareTestDatabase())
 | 
			
		||||
 | 
			
		||||
	os.RemoveAll(setting.Indexer.IssueIndexerQueueDir)
 | 
			
		||||
	os.RemoveAll(setting.Indexer.IssuePath)
 | 
			
		||||
	if err := InitIssueIndexer(true); err != nil {
 | 
			
		||||
		fatalTestError("Error InitIssueIndexer: %v\n", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	time.Sleep(10 * time.Second)
 | 
			
		||||
 | 
			
		||||
	ids, err := SearchIssuesByKeyword(1, "issue2")
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.EqualValues(t, []int64{2}, ids)
 | 
			
		||||
 | 
			
		||||
	ids, err = SearchIssuesByKeyword(1, "first")
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.EqualValues(t, []int64{1}, ids)
 | 
			
		||||
 | 
			
		||||
	ids, err = SearchIssuesByKeyword(1, "for")
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.EqualValues(t, []int64{1, 2, 3, 5}, ids)
 | 
			
		||||
}
 | 
			
		||||
@@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error {
 | 
			
		||||
	var i int
 | 
			
		||||
	var datas = make([]*IndexerData, 0, l.batchNumber)
 | 
			
		||||
	for {
 | 
			
		||||
		bs, err := l.queue.RPop()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Error(4, "RPop: %v", err)
 | 
			
		||||
			time.Sleep(time.Millisecond * 100)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		i++
 | 
			
		||||
		if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
 | 
			
		||||
			l.indexer.Index(datas)
 | 
			
		||||
			datas = make([]*IndexerData, 0, l.batchNumber)
 | 
			
		||||
			i = 0
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		bs, err := l.queue.RPop()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err != levelqueue.ErrNotFound {
 | 
			
		||||
				log.Error(4, "RPop: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			time.Sleep(time.Millisecond * 100)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if len(bs) <= 0 {
 | 
			
		||||
@@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Trace("LedisLocalQueue: task found: %#v", data)
 | 
			
		||||
		log.Trace("LevelQueue: task found: %#v", data)
 | 
			
		||||
 | 
			
		||||
		if data.IsDelete {
 | 
			
		||||
			if data.ID > 0 {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user