mirror of
				https://gitee.com/gitea/gitea
				synced 2025-11-04 00:20:25 +08:00 
			
		
		
		
	Restore Graceful Restarting & Socket Activation (#7274)
* Prevent deadlock in indexer initialisation during graceful restart * Move from gracehttp to our own service to add graceful ssh * Add timeout for start of indexers and make hammer time configurable * Fix issue with re-initialization in indexer during tests * move the code to detect use of closed to graceful * Handle logs gracefully - add a pid suffix just before restart * Move to using a cond and a holder for indexers * use time.Since * Add some comments and attribution * update modules.txt * Use zero to disable timeout * Move RestartProcess to its own file * Add cleanup routine
This commit is contained in:
		@@ -5,9 +5,11 @@
 | 
			
		||||
package issues
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"code.gitea.io/gitea/models"
 | 
			
		||||
	"code.gitea.io/gitea/modules/graceful"
 | 
			
		||||
	"code.gitea.io/gitea/modules/log"
 | 
			
		||||
	"code.gitea.io/gitea/modules/setting"
 | 
			
		||||
	"code.gitea.io/gitea/modules/util"
 | 
			
		||||
@@ -45,78 +47,143 @@ type Indexer interface {
 | 
			
		||||
	Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type indexerHolder struct {
 | 
			
		||||
	indexer Indexer
 | 
			
		||||
	mutex   sync.RWMutex
 | 
			
		||||
	cond    *sync.Cond
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newIndexerHolder() *indexerHolder {
 | 
			
		||||
	h := &indexerHolder{}
 | 
			
		||||
	h.cond = sync.NewCond(h.mutex.RLocker())
 | 
			
		||||
	return h
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *indexerHolder) set(indexer Indexer) {
 | 
			
		||||
	h.mutex.Lock()
 | 
			
		||||
	defer h.mutex.Unlock()
 | 
			
		||||
	h.indexer = indexer
 | 
			
		||||
	h.cond.Broadcast()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *indexerHolder) get() Indexer {
 | 
			
		||||
	h.mutex.RLock()
 | 
			
		||||
	defer h.mutex.RUnlock()
 | 
			
		||||
	if h.indexer == nil {
 | 
			
		||||
		h.cond.Wait()
 | 
			
		||||
	}
 | 
			
		||||
	return h.indexer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
 | 
			
		||||
	// issueIndexerQueue queue of issue ids to be updated
 | 
			
		||||
	issueIndexerQueue Queue
 | 
			
		||||
	issueIndexer      Indexer
 | 
			
		||||
	holder            = newIndexerHolder()
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
 | 
			
		||||
// all issue index done.
 | 
			
		||||
func InitIssueIndexer(syncReindex bool) error {
 | 
			
		||||
	var populate bool
 | 
			
		||||
	var dummyQueue bool
 | 
			
		||||
	switch setting.Indexer.IssueType {
 | 
			
		||||
	case "bleve":
 | 
			
		||||
		issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
 | 
			
		||||
		exist, err := issueIndexer.Init()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		populate = !exist
 | 
			
		||||
	case "db":
 | 
			
		||||
		issueIndexer = &DBIndexer{}
 | 
			
		||||
		dummyQueue = true
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if dummyQueue {
 | 
			
		||||
		issueIndexerQueue = &DummyQueue{}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	switch setting.Indexer.IssueQueueType {
 | 
			
		||||
	case setting.LevelQueueType:
 | 
			
		||||
		issueIndexerQueue, err = NewLevelQueue(
 | 
			
		||||
			issueIndexer,
 | 
			
		||||
			setting.Indexer.IssueQueueDir,
 | 
			
		||||
			setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	case setting.ChannelQueueType:
 | 
			
		||||
		issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
	case setting.RedisQueueType:
 | 
			
		||||
		addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func InitIssueIndexer(syncReindex bool) {
 | 
			
		||||
	waitChannel := make(chan time.Duration)
 | 
			
		||||
	go func() {
 | 
			
		||||
		err = issueIndexerQueue.Run()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Error("issueIndexerQueue.Run: %v", err)
 | 
			
		||||
		start := time.Now()
 | 
			
		||||
		log.Info("Initializing Issue Indexer")
 | 
			
		||||
		var populate bool
 | 
			
		||||
		var dummyQueue bool
 | 
			
		||||
		switch setting.Indexer.IssueType {
 | 
			
		||||
		case "bleve":
 | 
			
		||||
			issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
 | 
			
		||||
			exist, err := issueIndexer.Init()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			populate = !exist
 | 
			
		||||
			holder.set(issueIndexer)
 | 
			
		||||
		case "db":
 | 
			
		||||
			issueIndexer := &DBIndexer{}
 | 
			
		||||
			holder.set(issueIndexer)
 | 
			
		||||
			dummyQueue = true
 | 
			
		||||
		default:
 | 
			
		||||
			log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	if populate {
 | 
			
		||||
		if syncReindex {
 | 
			
		||||
			populateIssueIndexer()
 | 
			
		||||
		if dummyQueue {
 | 
			
		||||
			issueIndexerQueue = &DummyQueue{}
 | 
			
		||||
		} else {
 | 
			
		||||
			go populateIssueIndexer()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
			var err error
 | 
			
		||||
			switch setting.Indexer.IssueQueueType {
 | 
			
		||||
			case setting.LevelQueueType:
 | 
			
		||||
				issueIndexerQueue, err = NewLevelQueue(
 | 
			
		||||
					holder.get(),
 | 
			
		||||
					setting.Indexer.IssueQueueDir,
 | 
			
		||||
					setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Fatal(
 | 
			
		||||
						"Unable create level queue for issue queue dir: %s batch number: %d : %v",
 | 
			
		||||
						setting.Indexer.IssueQueueDir,
 | 
			
		||||
						setting.Indexer.IssueQueueBatchNumber,
 | 
			
		||||
						err)
 | 
			
		||||
				}
 | 
			
		||||
			case setting.ChannelQueueType:
 | 
			
		||||
				issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
			case setting.RedisQueueType:
 | 
			
		||||
				addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
 | 
			
		||||
						setting.Indexer.IssueQueueConnStr,
 | 
			
		||||
						err)
 | 
			
		||||
				}
 | 
			
		||||
				issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Fatal("Unable to create RedisQueue: %s : %v",
 | 
			
		||||
						setting.Indexer.IssueQueueConnStr,
 | 
			
		||||
						err)
 | 
			
		||||
				}
 | 
			
		||||
			default:
 | 
			
		||||
				log.Fatal("Unsupported indexer queue type: %v",
 | 
			
		||||
					setting.Indexer.IssueQueueType)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
			go func() {
 | 
			
		||||
				err = issueIndexerQueue.Run()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Error("issueIndexerQueue.Run: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		go func() {
 | 
			
		||||
			for data := range issueIndexerChannel {
 | 
			
		||||
				_ = issueIndexerQueue.Push(data)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		if populate {
 | 
			
		||||
			if syncReindex {
 | 
			
		||||
				populateIssueIndexer()
 | 
			
		||||
			} else {
 | 
			
		||||
				go populateIssueIndexer()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		waitChannel <- time.Since(start)
 | 
			
		||||
	}()
 | 
			
		||||
	if syncReindex {
 | 
			
		||||
		<-waitChannel
 | 
			
		||||
	} else if setting.Indexer.StartupTimeout > 0 {
 | 
			
		||||
		go func() {
 | 
			
		||||
			timeout := setting.Indexer.StartupTimeout
 | 
			
		||||
			if graceful.IsChild && setting.GracefulHammerTime > 0 {
 | 
			
		||||
				timeout += setting.GracefulHammerTime
 | 
			
		||||
			}
 | 
			
		||||
			select {
 | 
			
		||||
			case duration := <-waitChannel:
 | 
			
		||||
				log.Info("Issue Indexer Initialization took %v", duration)
 | 
			
		||||
			case <-time.After(timeout):
 | 
			
		||||
				log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// populateIssueIndexer populate the issue indexer with issue data
 | 
			
		||||
@@ -166,13 +233,13 @@ func UpdateIssueIndexer(issue *models.Issue) {
 | 
			
		||||
			comments = append(comments, comment.Content)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	_ = issueIndexerQueue.Push(&IndexerData{
 | 
			
		||||
	issueIndexerChannel <- &IndexerData{
 | 
			
		||||
		ID:       issue.ID,
 | 
			
		||||
		RepoID:   issue.RepoID,
 | 
			
		||||
		Title:    issue.Title,
 | 
			
		||||
		Content:  issue.Content,
 | 
			
		||||
		Comments: comments,
 | 
			
		||||
	})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeleteRepoIssueIndexer deletes repo's all issues indexes
 | 
			
		||||
@@ -188,16 +255,16 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_ = issueIndexerQueue.Push(&IndexerData{
 | 
			
		||||
	issueIndexerChannel <- &IndexerData{
 | 
			
		||||
		IDs:      ids,
 | 
			
		||||
		IsDelete: true,
 | 
			
		||||
	})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SearchIssuesByKeyword search issue ids by keywords and repo id
 | 
			
		||||
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
 | 
			
		||||
	var issueIDs []int64
 | 
			
		||||
	res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
 | 
			
		||||
	res, err := holder.get().Search(keyword, repoID, 1000, 0)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -5,7 +5,6 @@
 | 
			
		||||
package issues
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"testing"
 | 
			
		||||
@@ -17,11 +16,6 @@ import (
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func fatalTestError(fmtStr string, args ...interface{}) {
 | 
			
		||||
	fmt.Fprintf(os.Stderr, fmtStr, args...)
 | 
			
		||||
	os.Exit(1)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMain(m *testing.M) {
 | 
			
		||||
	models.MainTest(m, filepath.Join("..", "..", ".."))
 | 
			
		||||
}
 | 
			
		||||
@@ -32,9 +26,7 @@ func TestBleveSearchIssues(t *testing.T) {
 | 
			
		||||
	os.RemoveAll(setting.Indexer.IssueQueueDir)
 | 
			
		||||
	os.RemoveAll(setting.Indexer.IssuePath)
 | 
			
		||||
	setting.Indexer.IssueType = "bleve"
 | 
			
		||||
	if err := InitIssueIndexer(true); err != nil {
 | 
			
		||||
		fatalTestError("Error InitIssueIndexer: %v\n", err)
 | 
			
		||||
	}
 | 
			
		||||
	InitIssueIndexer(true)
 | 
			
		||||
 | 
			
		||||
	time.Sleep(5 * time.Second)
 | 
			
		||||
 | 
			
		||||
@@ -59,9 +51,7 @@ func TestDBSearchIssues(t *testing.T) {
 | 
			
		||||
	assert.NoError(t, models.PrepareTestDatabase())
 | 
			
		||||
 | 
			
		||||
	setting.Indexer.IssueType = "db"
 | 
			
		||||
	if err := InitIssueIndexer(true); err != nil {
 | 
			
		||||
		fatalTestError("Error InitIssueIndexer: %v\n", err)
 | 
			
		||||
	}
 | 
			
		||||
	InitIssueIndexer(true)
 | 
			
		||||
 | 
			
		||||
	ids, err := SearchIssuesByKeyword(1, "issue2")
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
 
 | 
			
		||||
@@ -6,6 +6,7 @@ package indexer
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"code.gitea.io/gitea/modules/log"
 | 
			
		||||
	"code.gitea.io/gitea/modules/setting"
 | 
			
		||||
@@ -25,8 +26,36 @@ const (
 | 
			
		||||
	repoIndexerLatestVersion = 4
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type bleveIndexerHolder struct {
 | 
			
		||||
	index bleve.Index
 | 
			
		||||
	mutex sync.RWMutex
 | 
			
		||||
	cond  *sync.Cond
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newBleveIndexerHolder() *bleveIndexerHolder {
 | 
			
		||||
	b := &bleveIndexerHolder{}
 | 
			
		||||
	b.cond = sync.NewCond(b.mutex.RLocker())
 | 
			
		||||
	return b
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *bleveIndexerHolder) set(index bleve.Index) {
 | 
			
		||||
	r.mutex.Lock()
 | 
			
		||||
	defer r.mutex.Unlock()
 | 
			
		||||
	r.index = index
 | 
			
		||||
	r.cond.Broadcast()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *bleveIndexerHolder) get() bleve.Index {
 | 
			
		||||
	r.mutex.RLock()
 | 
			
		||||
	defer r.mutex.RUnlock()
 | 
			
		||||
	if r.index == nil {
 | 
			
		||||
		r.cond.Wait()
 | 
			
		||||
	}
 | 
			
		||||
	return r.index
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// repoIndexer (thread-safe) index for repository contents
 | 
			
		||||
var repoIndexer bleve.Index
 | 
			
		||||
var indexerHolder = newBleveIndexerHolder()
 | 
			
		||||
 | 
			
		||||
// RepoIndexerOp type of operation to perform on repo indexer
 | 
			
		||||
type RepoIndexerOp int
 | 
			
		||||
@@ -73,12 +102,12 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch)
 | 
			
		||||
 | 
			
		||||
// InitRepoIndexer initialize repo indexer
 | 
			
		||||
func InitRepoIndexer(populateIndexer func() error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
 | 
			
		||||
	indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatal("InitRepoIndexer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if repoIndexer != nil {
 | 
			
		||||
	if indexer != nil {
 | 
			
		||||
		indexerHolder.set(indexer)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -92,7 +121,6 @@ func InitRepoIndexer(populateIndexer func() error) {
 | 
			
		||||
 | 
			
		||||
// createRepoIndexer create a repo indexer if one does not already exist
 | 
			
		||||
func createRepoIndexer(path string, latestVersion int) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	docMapping := bleve.NewDocumentMapping()
 | 
			
		||||
	numericFieldMapping := bleve.NewNumericFieldMapping()
 | 
			
		||||
	numericFieldMapping.IncludeInAll = false
 | 
			
		||||
@@ -103,9 +131,9 @@ func createRepoIndexer(path string, latestVersion int) error {
 | 
			
		||||
	docMapping.AddFieldMappingsAt("Content", textFieldMapping)
 | 
			
		||||
 | 
			
		||||
	mapping := bleve.NewIndexMapping()
 | 
			
		||||
	if err = addUnicodeNormalizeTokenFilter(mapping); err != nil {
 | 
			
		||||
	if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	} else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
 | 
			
		||||
	} else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
 | 
			
		||||
		"type":          custom.Name,
 | 
			
		||||
		"char_filters":  []string{},
 | 
			
		||||
		"tokenizer":     unicode.Name,
 | 
			
		||||
@@ -117,10 +145,12 @@ func createRepoIndexer(path string, latestVersion int) error {
 | 
			
		||||
	mapping.AddDocumentMapping(repoIndexerDocType, docMapping)
 | 
			
		||||
	mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
 | 
			
		||||
 | 
			
		||||
	repoIndexer, err = bleve.New(path, mapping)
 | 
			
		||||
	indexer, err := bleve.New(path, mapping)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	indexerHolder.set(indexer)
 | 
			
		||||
 | 
			
		||||
	return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
 | 
			
		||||
		Version: latestVersion,
 | 
			
		||||
	})
 | 
			
		||||
@@ -140,14 +170,14 @@ func filenameOfIndexerID(indexerID string) string {
 | 
			
		||||
 | 
			
		||||
// RepoIndexerBatch batch to add updates to
 | 
			
		||||
func RepoIndexerBatch() rupture.FlushingBatch {
 | 
			
		||||
	return rupture.NewFlushingBatch(repoIndexer, maxBatchSize)
 | 
			
		||||
	return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeleteRepoFromIndexer delete all of a repo's files from indexer
 | 
			
		||||
func DeleteRepoFromIndexer(repoID int64) error {
 | 
			
		||||
	query := numericEqualityQuery(repoID, "RepoID")
 | 
			
		||||
	searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false)
 | 
			
		||||
	result, err := repoIndexer.Search(searchRequest)
 | 
			
		||||
	result, err := indexerHolder.get().Search(searchRequest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
@@ -196,7 +226,7 @@ func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (i
 | 
			
		||||
	searchRequest.Fields = []string{"Content", "RepoID"}
 | 
			
		||||
	searchRequest.IncludeLocations = true
 | 
			
		||||
 | 
			
		||||
	result, err := repoIndexer.Search(searchRequest)
 | 
			
		||||
	result, err := indexerHolder.get().Search(searchRequest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user