mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-06 18:10:26 +08:00
优化缓存数据库相关代码
This commit is contained in:
@@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"os"
|
"os"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -21,7 +20,6 @@ const CountFileDB = 20
|
|||||||
type FileList struct {
|
type FileList struct {
|
||||||
dir string
|
dir string
|
||||||
dbList [CountFileDB]*FileListDB
|
dbList [CountFileDB]*FileListDB
|
||||||
total int64
|
|
||||||
|
|
||||||
onAdd func(item *Item)
|
onAdd func(item *Item)
|
||||||
onRemove func(item *Item)
|
onRemove func(item *Item)
|
||||||
@@ -76,12 +74,6 @@ func (this *FileList) Init() error {
|
|||||||
this.dbList[i] = db
|
this.dbList[i] = db
|
||||||
}
|
}
|
||||||
|
|
||||||
// 读取总数量
|
|
||||||
this.total = 0
|
|
||||||
for _, db := range this.dbList {
|
|
||||||
this.total += db.total
|
|
||||||
}
|
|
||||||
|
|
||||||
// 升级老版本数据库
|
// 升级老版本数据库
|
||||||
goman.New(func() {
|
goman.New(func() {
|
||||||
this.upgradeOldDB()
|
this.upgradeOldDB()
|
||||||
@@ -102,13 +94,11 @@ func (this *FileList) Add(hash string, item *Item) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.AddAsync(hash, item)
|
err := db.AddSync(hash, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt64(&this.total, 1)
|
|
||||||
|
|
||||||
// 这里不增加点击量,以减少对数据库的操作次数
|
// 这里不增加点击量,以减少对数据库的操作次数
|
||||||
|
|
||||||
this.memoryCache.Write(hash, 1, item.ExpiredAt)
|
this.memoryCache.Write(hash, 1, item.ExpiredAt)
|
||||||
@@ -296,8 +286,6 @@ func (this *FileList) CleanAll() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.StoreInt64(&this.total, 0)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -332,7 +320,15 @@ func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
|
|||||||
// Count 总数量
|
// Count 总数量
|
||||||
// 常用的方法,所以避免直接查询数据库
|
// 常用的方法,所以避免直接查询数据库
|
||||||
func (this *FileList) Count() (int64, error) {
|
func (this *FileList) Count() (int64, error) {
|
||||||
return atomic.LoadInt64(&this.total), nil
|
var total int64
|
||||||
|
for _, db := range this.dbList {
|
||||||
|
count, err := db.Total()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
total += count
|
||||||
|
}
|
||||||
|
return total, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncreaseHit 增加点击量
|
// IncreaseHit 增加点击量
|
||||||
@@ -392,37 +388,19 @@ func (this *FileList) remove(hash string) (notFound bool, err error) {
|
|||||||
// 从缓存中删除
|
// 从缓存中删除
|
||||||
this.memoryCache.Delete(hash)
|
this.memoryCache.Delete(hash)
|
||||||
|
|
||||||
var row = db.selectByHashStmt.QueryRow(hash)
|
err = db.DeleteSync(hash)
|
||||||
if row.Err() != nil {
|
|
||||||
if row.Err() == sql.ErrNoRows {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, row.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
var item = &Item{Type: ItemTypeFile}
|
|
||||||
err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = db.DeleteAsync(hash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, db.WrapError(err)
|
return false, db.WrapError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt64(&this.total, -1)
|
|
||||||
|
|
||||||
err = db.DeleteHitAsync(hash)
|
err = db.DeleteHitAsync(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, db.WrapError(err)
|
return false, db.WrapError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if this.onRemove != nil {
|
if this.onRemove != nil {
|
||||||
this.onRemove(item)
|
// when remove file item, no any extra information needed
|
||||||
|
this.onRemove(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
|
|||||||
@@ -35,8 +35,6 @@ type FileListDB struct {
|
|||||||
itemsTableName string
|
itemsTableName string
|
||||||
hitsTableName string
|
hitsTableName string
|
||||||
|
|
||||||
total int64
|
|
||||||
|
|
||||||
isClosed bool
|
isClosed bool
|
||||||
isReady bool
|
isReady bool
|
||||||
|
|
||||||
@@ -151,18 +149,6 @@ func (this *FileListDB) Init() error {
|
|||||||
return errors.New("init tables failed: " + err.Error())
|
return errors.New("init tables failed: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// 读取总数量
|
|
||||||
row := this.readDB.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
|
|
||||||
if row.Err() != nil {
|
|
||||||
return row.Err()
|
|
||||||
}
|
|
||||||
var total int64
|
|
||||||
err = row.Scan(&total)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
this.total = total
|
|
||||||
|
|
||||||
// 常用语句
|
// 常用语句
|
||||||
this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" INDEXED BY "hash" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
|
this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" INDEXED BY "hash" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -250,20 +236,15 @@ func (this *FileListDB) IsReady() bool {
|
|||||||
return this.isReady
|
return this.isReady
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileListDB) Total() int64 {
|
func (this *FileListDB) Total() (int64, error) {
|
||||||
return this.total
|
// 读取总数量
|
||||||
}
|
var row = this.readDB.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
|
||||||
|
if row.Err() != nil {
|
||||||
func (this *FileListDB) AddAsync(hash string, item *Item) error {
|
return 0, row.Err()
|
||||||
this.hashMap.Add(hash)
|
|
||||||
|
|
||||||
if item.StaleAt == 0 {
|
|
||||||
item.StaleAt = item.ExpiredAt
|
|
||||||
}
|
}
|
||||||
|
var total int64
|
||||||
this.writeBatch.Add(this.insertSQL, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, fasttime.Now().Unix(), timeutil.Format("YW"))
|
err := row.Scan(&total)
|
||||||
return nil
|
return total, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileListDB) AddSync(hash string, item *Item) error {
|
func (this *FileListDB) AddSync(hash string, item *Item) error {
|
||||||
@@ -281,13 +262,6 @@ func (this *FileListDB) AddSync(hash string, item *Item) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileListDB) DeleteAsync(hash string) error {
|
|
||||||
this.hashMap.Delete(hash)
|
|
||||||
|
|
||||||
this.writeBatch.Add(this.deleteByHashSQL, hash)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *FileListDB) DeleteSync(hash string) error {
|
func (this *FileListDB) DeleteSync(hash string) error {
|
||||||
this.hashMap.Delete(hash)
|
this.hashMap.Delete(hash)
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func NewBatch(db *DB, n int) *Batch {
|
|||||||
var batch = &Batch{
|
var batch = &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
n: n,
|
n: n,
|
||||||
queue: make(chan *batchItem),
|
queue: make(chan *batchItem, 16),
|
||||||
closeEvent: make(chan bool, 1),
|
closeEvent: make(chan bool, 1),
|
||||||
}
|
}
|
||||||
db.batches = append(db.batches, batch)
|
db.batches = append(db.batches, batch)
|
||||||
|
|||||||
Reference in New Issue
Block a user