提供区域监控上报结果接口

This commit is contained in:
GoEdgeLab
2021-09-06 08:12:48 +08:00
parent 63bf680adb
commit ad20b7c243
9 changed files with 205 additions and 37 deletions

View File

@@ -1,28 +0,0 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
)
type ConnectivityResultDAO dbs.DAO
func NewConnectivityResultDAO() *ConnectivityResultDAO {
return dbs.NewDAO(&ConnectivityResultDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeConnectivityResults",
Model: new(ConnectivityResult),
PkName: "id",
},
}).(*ConnectivityResultDAO)
}
var SharedConnectivityResultDAO *ConnectivityResultDAO
func init() {
dbs.OnReady(func() {
SharedConnectivityResultDAO = NewConnectivityResultDAO()
})
}

View File

@@ -346,6 +346,19 @@ func (this *NodeIPAddressDAO) ListEnabledIPAddresses(tx *dbs.Tx, role string, no
return
}
// FindAllEnabledAndOnIPAddressesWithClusterId 列出所有的正在启用的IP地址
func (this *NodeIPAddressDAO) FindAllEnabledAndOnIPAddressesWithClusterId(tx *dbs.Tx, role string, clusterId int64) (result []*NodeIPAddress, err error) {
_, err = this.Query(tx).
State(NodeIPAddressStateEnabled).
Attr("role", role).
Attr("isOn", true).
Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE state=1 AND clusterId=:clusterId)").
Param("clusterId", clusterId).
Slice(&result).
FindAll()
return
}
// NotifyUpdate 通知更新
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
address, err := this.Query(tx).

View File

@@ -0,0 +1,98 @@
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"time"
)
type ReportResultDAO dbs.DAO
func NewReportResultDAO() *ReportResultDAO {
return dbs.NewDAO(&ReportResultDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeReportResults",
Model: new(ReportResult),
PkName: "id",
},
}).(*ReportResultDAO)
}
var SharedReportResultDAO *ReportResultDAO
func init() {
dbs.OnReady(func() {
SharedReportResultDAO = NewReportResultDAO()
})
}
// UpdateResult 创建结果
func (this *ReportResultDAO) UpdateResult(tx *dbs.Tx, taskType string, targetId int64, targetDesc string, reportNodeId int64, isOk bool, costMs float64, errString string) error {
var countUp interface{} = 0
var countDown interface{} = 0
if isOk {
countUp = dbs.SQL("countUp+1")
} else {
countDown = dbs.SQL("countDown+1")
}
return this.Query(tx).
InsertOrUpdateQuickly(maps.Map{
"type": taskType,
"targetId": targetId,
"targetDesc": targetDesc,
"updatedAt": time.Now().Unix(),
"reportNodeId": reportNodeId,
"isOk": isOk,
"costMs": costMs,
"error": errString,
"countUp": countUp,
"countDown": countDown,
}, maps.Map{
"targetDesc": targetDesc,
"updatedAt": time.Now().Unix(),
"isOk": isOk,
"costMs": costMs,
"error": errString,
"countUp": countUp,
"countDown": countDown,
})
}
// CountAllResults 计算结果数量
func (this *ReportResultDAO) CountAllResults(tx *dbs.Tx, reportNodeId int64, okState configutils.BoolState) (int64, error) {
var query = this.Query(tx).
Attr("reportNodeId", reportNodeId)
switch okState {
case configutils.BoolStateYes:
query.Attr("isOk", 1)
case configutils.BoolStateNo:
query.Attr("isOk", 0)
}
return query.
Count()
}
// ListResults 列出单页结果
func (this *ReportResultDAO) ListResults(tx *dbs.Tx, reportNodeId int64, okState configutils.BoolState, offset int64, size int64) (result []*ReportResult, err error) {
var query = this.Query(tx).
Attr("reportNodeId", reportNodeId)
switch okState {
case configutils.BoolStateYes:
query.Attr("isOk", 1)
case configutils.BoolStateNo:
query.Attr("isOk", 0)
}
_, err = query.
Attr("reportNodeId", reportNodeId).
Offset(offset).
Limit(size).
Desc("targetId").
Slice(&result).
FindAll()
return
}

View File

@@ -1,30 +1,34 @@
package models
// ConnectivityResult 连通性监控结果
type ConnectivityResult struct {
// ReportResult 连通性监控结果
type ReportResult struct {
Id uint64 `field:"id"` // ID
Type string `field:"type"` // 对象类型
TargetId uint32 `field:"targetId"` // 对象ID
TargetId uint64 `field:"targetId"` // 对象ID
TargetDesc string `field:"targetDesc"` // 对象描述
UpdatedAt uint64 `field:"updatedAt"` // 更新时间
ReportNodeId uint32 `field:"reportNodeId"` // 监控节点ID
IsOk uint8 `field:"isOk"` // 是否可连接
CostMs float64 `field:"costMs"` // 单次连接花费的时间
Port uint32 `field:"port"` // 连接的端口
Error string `field:"error"` // 产生的错误信息
CountUp uint32 `field:"countUp"` // 连续上线次数
CountDown uint32 `field:"countDown"` // 连续下线次数
}
type ConnectivityResultOperator struct {
type ReportResultOperator struct {
Id interface{} // ID
Type interface{} // 对象类型
TargetId interface{} // 对象ID
TargetDesc interface{} // 对象描述
UpdatedAt interface{} // 更新时间
ReportNodeId interface{} // 监控节点ID
IsOk interface{} // 是否可连接
CostMs interface{} // 单次连接花费的时间
Port interface{} // 连接的端口
Error interface{} // 产生的错误信息
CountUp interface{} // 连续上线次数
CountDown interface{} // 连续下线次数
}
func NewConnectivityResultOperator() *ConnectivityResultOperator {
return &ConnectivityResultOperator{}
func NewReportResultOperator() *ReportResultOperator {
return &ReportResultOperator{}
}

View File

@@ -1367,6 +1367,46 @@ func (this *ServerDAO) FindLatestServers(tx *dbs.Tx, size int64) (result []*Serv
return
}
// FindFirstHTTPOrHTTPSPortWithClusterId 获取集群中第一个HTTP或者HTTPS端口
func (this *ServerDAO) FindFirstHTTPOrHTTPSPortWithClusterId(tx *dbs.Tx, clusterId int64) (int, error) {
one, _, err := this.Query(tx).
Result("JSON_EXTRACT(http, '$.listen[*].portRange') AS httpPort, JSON_EXTRACT(https, '$.listen[*].portRange') AS httpsPort").
Attr("clusterId", clusterId).
State(ServerStateEnabled).
Attr("isOn", 1).
Where("((JSON_CONTAINS(http, :queryJSON) AND JSON_EXTRACT(http, '$.listen[*].portRange') IS NOT NULL) OR (JSON_CONTAINS(https, :queryJSON) AND JSON_EXTRACT(https, '$.listen[*].portRange') IS NOT NULL))").
Param("queryJSON", "{\"isOn\":true}").
FindOne()
if err != nil {
return 0, err
}
httpPortString := one.GetString("httpPort")
if len(httpPortString) > 0 {
var ports = []string{}
err = json.Unmarshal([]byte(httpPortString), &ports)
if err != nil {
return 0, err
}
if len(ports) > 0 {
return types.Int(ports[0]), nil
}
}
httpsPortString := one.GetString("httpsPort")
if len(httpsPortString) > 0 {
var ports = []string{}
err = json.Unmarshal([]byte(httpsPortString), &ports)
if err != nil {
return 0, err
}
if len(ports) > 0 {
return types.Int(ports[0]), nil
}
}
return 0, nil
}
// NotifyUpdate 同步集群
func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
// 创建任务

View File

@@ -0,0 +1,41 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package reporters
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/iwind/TeaGo/dbs"
"google.golang.org/grpc/peer"
"net"
)
// 校验客户端IP
func validateClient(tx *dbs.Tx, nodeId int64, ctx context.Context) error {
allowIPs, err := models.SharedReportNodeDAO.FindNodeAllowIPs(tx, nodeId)
if err != nil {
return err
}
if len(allowIPs) == 0 {
return nil
}
p, ok := peer.FromContext(ctx)
if ok {
host, _, _ := net.SplitHostPort(p.Addr.String())
if len(host) > 0 {
for _, ip := range allowIPs {
r, err := shared.ParseIPRange(ip)
if err == nil && r != nil {
if r.Contains(host) {
return nil
}
}
}
}
}
return errors.New("client was not allowed")
}

File diff suppressed because one or more lines are too long