mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			194 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			194 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2022 GoEdge goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cloud .
 | 
						|
 | 
						|
package dbs
 | 
						|
 | 
						|
import (
 | 
						|
	"database/sql"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
						|
)
 | 
						|
 | 
						|
type batchItem struct {
 | 
						|
	query string
 | 
						|
	args  []any
 | 
						|
}
 | 
						|
 | 
						|
type Batch struct {
 | 
						|
	db *DB
 | 
						|
	n  int
 | 
						|
 | 
						|
	enableStat bool
 | 
						|
 | 
						|
	onFail func(err error)
 | 
						|
 | 
						|
	queue      chan *batchItem
 | 
						|
	closeEvent chan bool
 | 
						|
 | 
						|
	isClosed bool
 | 
						|
}
 | 
						|
 | 
						|
func NewBatch(db *DB, n int) *Batch {
 | 
						|
	var batch = &Batch{
 | 
						|
		db:         db,
 | 
						|
		n:          n,
 | 
						|
		queue:      make(chan *batchItem, 16),
 | 
						|
		closeEvent: make(chan bool, 1),
 | 
						|
	}
 | 
						|
	db.batches = append(db.batches, batch)
 | 
						|
	return batch
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) EnableStat(b bool) {
 | 
						|
	this.enableStat = b
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) OnFail(callback func(err error)) {
 | 
						|
	this.onFail = callback
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) Add(query string, args ...any) {
 | 
						|
	if this.isClosed {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	this.queue <- &batchItem{
 | 
						|
		query: query,
 | 
						|
		args:  args,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) Exec() {
 | 
						|
	var n = this.n
 | 
						|
	if n <= 0 {
 | 
						|
		n = 4
 | 
						|
	}
 | 
						|
 | 
						|
	var ticker = time.NewTicker(100 * time.Millisecond)
 | 
						|
	var count = 0
 | 
						|
	var lastTx *sql.Tx
 | 
						|
For:
 | 
						|
	for {
 | 
						|
		// closed
 | 
						|
		if this.isClosed {
 | 
						|
			if lastTx != nil {
 | 
						|
				_ = this.commitTx(lastTx)
 | 
						|
				lastTx = nil
 | 
						|
			}
 | 
						|
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		case item := <-this.queue:
 | 
						|
			if lastTx == nil {
 | 
						|
				lastTx = this.beginTx()
 | 
						|
				if lastTx == nil {
 | 
						|
					continue For
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			err := this.execItem(lastTx, item)
 | 
						|
			if err != nil {
 | 
						|
				if IsClosedErr(err) {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				this.processErr(item.query, err)
 | 
						|
			}
 | 
						|
 | 
						|
			count++
 | 
						|
 | 
						|
			if count == n {
 | 
						|
				count = 0
 | 
						|
				err = this.commitTx(lastTx)
 | 
						|
				lastTx = nil
 | 
						|
				if err != nil {
 | 
						|
					if IsClosedErr(err) {
 | 
						|
						return
 | 
						|
					}
 | 
						|
					this.processErr("commit", err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		case <-ticker.C:
 | 
						|
			if lastTx == nil || count == 0 {
 | 
						|
				continue For
 | 
						|
			}
 | 
						|
			count = 0
 | 
						|
			err := this.commitTx(lastTx)
 | 
						|
			lastTx = nil
 | 
						|
			if err != nil {
 | 
						|
				if IsClosedErr(err) {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				this.processErr("commit", err)
 | 
						|
			}
 | 
						|
		case <-this.closeEvent:
 | 
						|
			// closed
 | 
						|
 | 
						|
			if lastTx != nil {
 | 
						|
				_ = this.commitTx(lastTx)
 | 
						|
				lastTx = nil
 | 
						|
			}
 | 
						|
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) close() {
 | 
						|
	this.isClosed = true
 | 
						|
 | 
						|
	select {
 | 
						|
	case this.closeEvent <- true:
 | 
						|
	default:
 | 
						|
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) beginTx() *sql.Tx {
 | 
						|
	if !this.db.BeginUpdating() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	tx, err := this.db.Begin()
 | 
						|
	if err != nil {
 | 
						|
		this.processErr("begin transaction", err)
 | 
						|
		this.db.EndUpdating()
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return tx
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) commitTx(tx *sql.Tx) error {
 | 
						|
	// always commit without checking database closing status
 | 
						|
	this.db.EndUpdating()
 | 
						|
	return tx.Commit()
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error {
 | 
						|
	// check database status
 | 
						|
	if this.db.BeginUpdating() {
 | 
						|
		defer this.db.EndUpdating()
 | 
						|
	} else {
 | 
						|
		return errDBIsClosed
 | 
						|
	}
 | 
						|
 | 
						|
	if this.enableStat {
 | 
						|
		defer SharedQueryStatManager.AddQuery(item.query).End()
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := tx.Exec(item.query, item.args...)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (this *Batch) processErr(prefix string, err error) {
 | 
						|
	if err == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if this.onFail != nil {
 | 
						|
		this.onFail(err)
 | 
						|
	} else {
 | 
						|
		remotelogs.Error("SQLITE_BATCH", prefix+": "+err.Error())
 | 
						|
	}
 | 
						|
}
 |