mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-07 18:50:26 +08:00
优化数据库节点管理
This commit is contained in:
@@ -176,3 +176,15 @@ func (this *DBNodeDAO) DecodePassword(password string) string {
|
|||||||
}
|
}
|
||||||
return string(encrypt.MagicKeyDecode(data))
|
return string(encrypt.MagicKeyDecode(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckNodeIsOn 检查节点是否已经启用
|
||||||
|
func (this *DBNodeDAO) CheckNodeIsOn(tx *dbs.Tx, nodeId int64) (bool, error) {
|
||||||
|
isOn, err := this.Query(tx).
|
||||||
|
Pk(nodeId).
|
||||||
|
Result("isOn").
|
||||||
|
FindIntCol(0)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return isOn == 1, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,10 +3,10 @@ package models
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||||
|
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
"github.com/iwind/TeaGo/lists"
|
"github.com/iwind/TeaGo/lists"
|
||||||
"github.com/iwind/TeaGo/logs"
|
|
||||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"regexp"
|
"regexp"
|
||||||
@@ -266,7 +266,7 @@ func (this *DBNodeInitializer) Start() {
|
|||||||
// 初始运行
|
// 初始运行
|
||||||
err := this.loop()
|
err := this.loop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]" + err.Error())
|
remotelogs.Error("DB_NODE", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定时运行
|
// 定时运行
|
||||||
@@ -274,7 +274,7 @@ func (this *DBNodeInitializer) Start() {
|
|||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
err := this.loop()
|
err := this.loop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]" + err.Error())
|
remotelogs.Error("DB_NODE", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -300,7 +300,7 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
delete(accessLogDBMapping, nodeId)
|
delete(accessLogDBMapping, nodeId)
|
||||||
delete(httpAccessLogDAOMapping, nodeId)
|
delete(httpAccessLogDAOMapping, nodeId)
|
||||||
delete(nsAccessLogDAOMapping, nodeId)
|
delete(nsAccessLogDAOMapping, nodeId)
|
||||||
logs.Println("[DB_NODE]close db node '" + strconv.FormatInt(nodeId, 10) + "'")
|
remotelogs.Error("DB_NODE", "close db node '"+strconv.FormatInt(nodeId, 10)+"'")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
accessLogLocker.Unlock()
|
accessLogLocker.Unlock()
|
||||||
@@ -321,7 +321,7 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
// 检查配置是否有变化
|
// 检查配置是否有变化
|
||||||
oldConfig, err := db.Config()
|
oldConfig, err := db.Config()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]read database old config failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "read database old config failed: "+err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -340,7 +340,7 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
}
|
}
|
||||||
db, err := dbs.NewInstanceFromConfig(config)
|
db, err := dbs.NewInstanceFromConfig(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]initialize database config failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "initialize database config failed: "+err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -350,12 +350,12 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), true)
|
tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
|
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
|
||||||
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error())
|
||||||
|
|
||||||
// 创建节点日志
|
// 创建节点日志
|
||||||
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
|
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
|
||||||
if createLogErr != nil {
|
if createLogErr != nil {
|
||||||
logs.Println("[NODE_LOG]" + createLogErr.Error())
|
remotelogs.Error("NODE_LOG", createLogErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
@@ -373,7 +373,7 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
}
|
}
|
||||||
err = daoObject.Init()
|
err = daoObject.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "initialize dao failed: "+err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -394,12 +394,12 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false)
|
tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
|
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
|
||||||
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error())
|
||||||
|
|
||||||
// 创建节点日志
|
// 创建节点日志
|
||||||
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
|
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
|
||||||
if createLogErr != nil {
|
if createLogErr != nil {
|
||||||
logs.Println("[NODE_LOG]" + createLogErr.Error())
|
remotelogs.Error("NODE_LOG", createLogErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
@@ -417,7 +417,7 @@ func (this *DBNodeInitializer) loop() error {
|
|||||||
}
|
}
|
||||||
err = daoObject.Init()
|
err = daoObject.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
|
remotelogs.Error("DB_NODE", "initialize dao failed: "+err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ type HTTPAccessLog struct {
|
|||||||
FirewallRuleId uint32 `field:"firewallRuleId"` // WAF规则ID
|
FirewallRuleId uint32 `field:"firewallRuleId"` // WAF规则ID
|
||||||
RemoteAddr string `field:"remoteAddr"` // IP地址
|
RemoteAddr string `field:"remoteAddr"` // IP地址
|
||||||
Domain string `field:"domain"` // 域名
|
Domain string `field:"domain"` // 域名
|
||||||
|
RequestBody string `field:"requestBody"` // 请求内容
|
||||||
|
ResponseBody string `field:"responseBody"` // 响应内容
|
||||||
}
|
}
|
||||||
|
|
||||||
type HTTPAccessLogOperator struct {
|
type HTTPAccessLogOperator struct {
|
||||||
@@ -31,6 +33,8 @@ type HTTPAccessLogOperator struct {
|
|||||||
FirewallRuleId interface{} // WAF规则ID
|
FirewallRuleId interface{} // WAF规则ID
|
||||||
RemoteAddr interface{} // IP地址
|
RemoteAddr interface{} // IP地址
|
||||||
Domain interface{} // 域名
|
Domain interface{} // 域名
|
||||||
|
RequestBody interface{} // 请求内容
|
||||||
|
ResponseBody interface{} // 响应内容
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {
|
func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {
|
||||||
|
|||||||
@@ -109,6 +109,10 @@ func (this *DBNodeService) ListEnabledDBNodes(ctx context.Context, req *pb.ListE
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
status.Error = err.Error()
|
status.Error = err.Error()
|
||||||
} else {
|
} else {
|
||||||
|
// 版本
|
||||||
|
version, _ := db.FindCol(0, "SELECT VERSION()")
|
||||||
|
status.Version = types.String(version)
|
||||||
|
|
||||||
one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name())
|
one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status.Error = err.Error()
|
status.Error = err.Error()
|
||||||
@@ -294,3 +298,49 @@ func (this *DBNodeService) TruncateDBNodeTable(ctx context.Context, req *pb.Trun
|
|||||||
}
|
}
|
||||||
return this.Success()
|
return this.Success()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckDBNodeStatus 检查数据库节点状态
|
||||||
|
func (this *DBNodeService) CheckDBNodeStatus(ctx context.Context, req *pb.CheckDBNodeStatusRequest) (*pb.CheckDBNodeStatusResponse, error) {
|
||||||
|
_, err := this.ValidateAdmin(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var tx = this.NullTx()
|
||||||
|
node, err := models.SharedDBNodeDAO.FindEnabledDBNode(tx, req.DbNodeId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if node == nil {
|
||||||
|
return &pb.CheckDBNodeStatusResponse{DbNodeStatus: nil}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
status := &pb.DBNodeStatus{}
|
||||||
|
|
||||||
|
// 是否能够连接
|
||||||
|
if node.IsOn == 1 {
|
||||||
|
db, err := dbs.NewInstanceFromConfig(node.DBConfig())
|
||||||
|
if err != nil {
|
||||||
|
status.Error = err.Error()
|
||||||
|
} else {
|
||||||
|
// 版本
|
||||||
|
version, _ := db.FindCol(0, "SELECT VERSION()")
|
||||||
|
status.Version = types.String(version)
|
||||||
|
|
||||||
|
one, err := db.FindOne("SELECT SUM(DATA_LENGTH+INDEX_LENGTH) AS size FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=?", db.Name())
|
||||||
|
if err != nil {
|
||||||
|
status.Error = err.Error()
|
||||||
|
_ = db.Close()
|
||||||
|
} else if one == nil {
|
||||||
|
status.Error = "unable to read size from database server"
|
||||||
|
_ = db.Close()
|
||||||
|
} else {
|
||||||
|
status.IsOk = true
|
||||||
|
status.Size = one.GetInt64("size")
|
||||||
|
_ = db.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pb.CheckDBNodeStatusResponse{DbNodeStatus: status}, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user