WAF规则匹配后的IP也会上报/实现IP全局名单/将名单存储到本地数据库,提升读写速度

This commit is contained in:
GoEdgeLab
2021-11-17 16:16:09 +08:00
parent 79c2cb7b73
commit 56ecda5653
19 changed files with 522 additions and 80 deletions

View File

@@ -8,9 +8,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
"io/ioutil"
"os"
"sync"
"time"
)
@@ -24,13 +21,9 @@ func init() {
})
}
var versionCacheFile = "ip_list_version.cache"
// IPListManager IP名单管理
type IPListManager struct {
// 缓存文件
// 每行一个数据id|from|to|expiredAt
cacheFile string
db *IPListDB
version int64
pageSize int64
@@ -41,17 +34,13 @@ type IPListManager struct {
func NewIPListManager() *IPListManager {
return &IPListManager{
cacheFile: Tea.Root + "/configs/ip_list.cache",
pageSize: 500,
listMap: map[int64]*IPList{},
pageSize: 500,
listMap: map[int64]*IPList{},
}
}
func (this *IPListManager) Start() {
// TODO 从缓存当中读取数据
// 从缓存中读取位置
this.version = this.readLocalVersion()
this.init()
// 第一次读取
err := this.loop()
@@ -60,6 +49,9 @@ func (this *IPListManager) Start() {
}
ticker := time.NewTicker(60 * time.Second)
if Tea.IsTesting() {
ticker = time.NewTicker(10 * time.Second)
}
events.On(events.EventQuit, func() {
ticker.Stop()
})
@@ -88,6 +80,31 @@ func (this *IPListManager) Start() {
}
}
func (this *IPListManager) init() {
// 从数据库中当中读取数据
db, err := NewIPListDB()
if err != nil {
remotelogs.Error("IP_LIST_MANAGER", "create ip list local database failed: "+err.Error())
} else {
this.db = db
var offset int64 = 0
var size int64 = 1000
for {
items, err := db.ReadItems(offset, size)
if err != nil {
remotelogs.Error("IP_LIST_MANAGER", "read ip list from local database failed: "+err.Error())
} else {
if len(items) == 0 {
break
}
this.processItems(items, false)
}
offset += int64(len(items))
}
}
}
func (this *IPListManager) loop() error {
for {
hasNext, err := this.fetch()
@@ -119,11 +136,53 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
if len(items) == 0 {
return false, nil
}
// 保存到本地数据库
if this.db != nil {
for _, item := range items {
err = this.db.AddItem(item)
if err != nil {
remotelogs.Error("IP_LIST_MANAGER", "insert item to local database failed: "+err.Error())
}
}
}
this.processItems(items, true)
return true, nil
}
func (this *IPListManager) FindList(listId int64) *IPList {
this.locker.Lock()
list, _ := this.listMap[listId]
this.locker.Unlock()
return list
}
func (this *IPListManager) processItems(items []*pb.IPItem, shouldExecute bool) {
this.locker.Lock()
var changedLists = map[*IPList]bool{}
for _, item := range items {
list, ok := this.listMap[item.ListId]
if !ok {
var list *IPList
// TODO 实现节点专有List
if item.ServerId > 0 { // 服务专有List
switch item.ListType {
case "black":
list = SharedServerListManager.FindBlackList(item.ServerId, true)
case "white":
list = SharedServerListManager.FindWhiteList(item.ServerId, true)
}
} else if item.IsGlobal { // 全局List
switch item.ListType {
case "black":
list = GlobalBlackIPList
case "white":
list = GlobalWhiteIPList
}
} else { // 其他List
list = this.listMap[item.ListId]
}
if list == nil {
list = NewIPList()
this.listMap[item.ListId] = list
}
@@ -133,18 +192,13 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
if item.IsDeleted {
list.Delete(item.Id)
// 从临时名单中删除
if len(item.IpFrom) > 0 && len(item.IpTo) == 0 {
switch item.ListType {
case "black":
waf.SharedIPBlackList.RemoveIP(item.IpFrom)
case "white":
waf.SharedIPWhiteList.RemoveIP(item.IpFrom)
}
}
// 从WAF名单中删除
waf.SharedIPBlackList.RemoveIP(item.IpFrom, item.ServerId)
// 操作事件
SharedActionManager.DeleteItem(item.ListType, item)
if shouldExecute {
SharedActionManager.DeleteItem(item.ListType, item)
}
continue
}
@@ -159,8 +213,10 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
})
// 事件操作
SharedActionManager.DeleteItem(item.ListType, item)
SharedActionManager.AddItem(item.ListType, item)
if shouldExecute {
SharedActionManager.DeleteItem(item.ListType, item)
SharedActionManager.AddItem(item.ListType, item)
}
}
for changedList := range changedLists {
@@ -169,38 +225,4 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
this.locker.Unlock()
this.version = items[len(items)-1].Version
// 写入版本号到缓存当中
this.updateLocalVersion(this.version)
return true, nil
}
func (this *IPListManager) FindList(listId int64) *IPList {
this.locker.Lock()
list, _ := this.listMap[listId]
this.locker.Unlock()
return list
}
func (this *IPListManager) readLocalVersion() int64 {
data, err := ioutil.ReadFile(Tea.ConfigFile(versionCacheFile))
if err != nil || len(data) == 0 {
return 0
}
return types.Int64(string(data))
}
func (this *IPListManager) updateLocalVersion(version int64) {
fp, err := os.OpenFile(Tea.ConfigFile(versionCacheFile), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
if err != nil {
remotelogs.Warn("IP_LIST", "write local version cache failed: "+err.Error())
return
}
_, err = fp.WriteString(types.String(version))
if err != nil {
remotelogs.Warn("IP_LIST", "write local version cache failed: "+err.Error())
return
}
_ = fp.Close()
}