优化数据库节点管理

This commit is contained in:
GoEdgeLab
2021-08-30 10:56:31 +08:00
parent 1978e779bd
commit 6bec6b8a2c
4 changed files with 78 additions and 12 deletions

View File

@@ -176,3 +176,15 @@ func (this *DBNodeDAO) DecodePassword(password string) string {
} }
return string(encrypt.MagicKeyDecode(data)) return string(encrypt.MagicKeyDecode(data))
} }
// CheckNodeIsOn 检查节点是否已经启用
func (this *DBNodeDAO) CheckNodeIsOn(tx *dbs.Tx, nodeId int64) (bool, error) {
isOn, err := this.Query(tx).
Pk(nodeId).
Result("isOn").
FindIntCol(0)
if err != nil {
return false, err
}
return isOn == 1, nil
}

View File

@@ -3,10 +3,10 @@ package models
import ( import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"hash/crc32" "hash/crc32"
"regexp" "regexp"
@@ -266,7 +266,7 @@ func (this *DBNodeInitializer) Start() {
// 初始运行 // 初始运行
err := this.loop() err := this.loop()
if err != nil { if err != nil {
logs.Println("[DB_NODE]" + err.Error()) remotelogs.Error("DB_NODE", err.Error())
} }
// 定时运行 // 定时运行
@@ -274,7 +274,7 @@ func (this *DBNodeInitializer) Start() {
for range ticker.C { for range ticker.C {
err := this.loop() err := this.loop()
if err != nil { if err != nil {
logs.Println("[DB_NODE]" + err.Error()) remotelogs.Error("DB_NODE", err.Error())
} }
} }
} }
@@ -300,7 +300,7 @@ func (this *DBNodeInitializer) loop() error {
delete(accessLogDBMapping, nodeId) delete(accessLogDBMapping, nodeId)
delete(httpAccessLogDAOMapping, nodeId) delete(httpAccessLogDAOMapping, nodeId)
delete(nsAccessLogDAOMapping, nodeId) delete(nsAccessLogDAOMapping, nodeId)
logs.Println("[DB_NODE]close db node '" + strconv.FormatInt(nodeId, 10) + "'") remotelogs.Error("DB_NODE", "close db node '"+strconv.FormatInt(nodeId, 10)+"'")
} }
} }
accessLogLocker.Unlock() accessLogLocker.Unlock()
@@ -321,7 +321,7 @@ func (this *DBNodeInitializer) loop() error {
// 检查配置是否有变化 // 检查配置是否有变化
oldConfig, err := db.Config() oldConfig, err := db.Config()
if err != nil { if err != nil {
logs.Println("[DB_NODE]read database old config failed: " + err.Error()) remotelogs.Error("DB_NODE", "read database old config failed: "+err.Error())
continue continue
} }
@@ -340,7 +340,7 @@ func (this *DBNodeInitializer) loop() error {
} }
db, err := dbs.NewInstanceFromConfig(config) db, err := dbs.NewInstanceFromConfig(config)
if err != nil { if err != nil {
logs.Println("[DB_NODE]initialize database config failed: " + err.Error()) remotelogs.Error("DB_NODE", "initialize database config failed: "+err.Error())
continue continue
} }
@@ -350,12 +350,12 @@ func (this *DBNodeInitializer) loop() error {
tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), true) tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), true)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误 if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error()) remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error())
// 创建节点日志 // 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix()) createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil { if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error()) remotelogs.Error("NODE_LOG", createLogErr.Error())
} }
continue continue
@@ -373,7 +373,7 @@ func (this *DBNodeInitializer) loop() error {
} }
err = daoObject.Init() err = daoObject.Init()
if err != nil { if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error()) remotelogs.Error("DB_NODE", "initialize dao failed: "+err.Error())
continue continue
} }
@@ -394,12 +394,12 @@ func (this *DBNodeInitializer) loop() error {
tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false) tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误 if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error()) remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error())
// 创建节点日志 // 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix()) createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil { if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error()) remotelogs.Error("NODE_LOG", createLogErr.Error())
} }
continue continue
@@ -417,7 +417,7 @@ func (this *DBNodeInitializer) loop() error {
} }
err = daoObject.Init() err = daoObject.Init()
if err != nil { if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error()) remotelogs.Error("DB_NODE", "initialize dao failed: "+err.Error())
continue continue
} }

View File

@@ -15,6 +15,8 @@ type HTTPAccessLog struct {
FirewallRuleId uint32 `field:"firewallRuleId"` // WAF规则ID FirewallRuleId uint32 `field:"firewallRuleId"` // WAF规则ID
RemoteAddr string `field:"remoteAddr"` // IP地址 RemoteAddr string `field:"remoteAddr"` // IP地址
Domain string `field:"domain"` // 域名 Domain string `field:"domain"` // 域名
RequestBody string `field:"requestBody"` // 请求内容
ResponseBody string `field:"responseBody"` // 响应内容
} }
type HTTPAccessLogOperator struct { type HTTPAccessLogOperator struct {
@@ -31,6 +33,8 @@ type HTTPAccessLogOperator struct {
FirewallRuleId interface{} // WAF规则ID FirewallRuleId interface{} // WAF规则ID
RemoteAddr interface{} // IP地址 RemoteAddr interface{} // IP地址
Domain interface{} // 域名 Domain interface{} // 域名
RequestBody interface{} // 请求内容
ResponseBody interface{} // 响应内容
} }
func NewHTTPAccessLogOperator() *HTTPAccessLogOperator { func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {

View File

@@ -109,6 +109,10 @@ func (this *DBNodeService) ListEnabledDBNodes(ctx context.Context, req *pb.ListE
if err != nil { if err != nil {
status.Error = err.Error() status.Error = err.Error()
} else { } else {
// 版本
version, _ := db.FindCol(0, "SELECT VERSION()")
status.Version = types.String(version)
one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name()) one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name())
if err != nil { if err != nil {
status.Error = err.Error() status.Error = err.Error()
@@ -294,3 +298,49 @@ func (this *DBNodeService) TruncateDBNodeTable(ctx context.Context, req *pb.Trun
} }
return this.Success() return this.Success()
} }
// CheckDBNodeStatus 检查数据库节点状态
func (this *DBNodeService) CheckDBNodeStatus(ctx context.Context, req *pb.CheckDBNodeStatusRequest) (*pb.CheckDBNodeStatusResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.CheckDBNodeStatusResponse{DbNodeStatus: nil}, nil
}
status := &pb.DBNodeStatus{}
// 是否能够连接
if node.IsOn == 1 {
db, err := dbs.NewInstanceFromConfig(node.DBConfig())
if err != nil {
status.Error = err.Error()
} else {
// 版本
version, _ := db.FindCol(0, "SELECT VERSION()")
status.Version = types.String(version)
one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name())
if err != nil {
status.Error = err.Error()
_ = db.Close()
} else if one == nil {
status.Error = "unable to read size from database server"
_ = db.Close()
} else {
status.IsOk = true
status.Size = one.GetInt64("size")
_ = db.Close()
}
}
}
return &pb.CheckDBNodeStatusResponse{DbNodeStatus: status}, nil
}