Files
EdgeNode/internal/nodes/node_status_executor.go

376 lines
9.9 KiB
Go
Raw Normal View History

2020-09-09 18:53:53 +08:00
package nodes
import (
"encoding/json"
2020-10-25 21:27:38 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/caches"
2020-09-09 18:53:53 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
2021-04-29 16:48:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
2023-07-08 18:59:08 +08:00
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
2020-09-09 18:53:53 +08:00
"github.com/iwind/TeaGo/lists"
2021-04-29 16:48:47 +08:00
"github.com/iwind/TeaGo/maps"
2022-03-12 19:11:29 +08:00
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
2023-05-17 10:48:59 +08:00
"github.com/shirou/gopsutil/v3/net"
"math"
2020-09-09 18:53:53 +08:00
"os"
"runtime"
"strings"
"time"
)
type NodeStatusExecutor struct {
2023-05-17 10:48:59 +08:00
isFirstTime bool
lastUpdatedTime time.Time
2020-09-09 18:53:53 +08:00
cpuLogicalCount int
cpuPhysicalCount int
// 流量统计
lastIOCounterStat net.IOCountersStat
lastUDPInDatagrams int64
lastUDPOutDatagrams int64
2023-05-17 10:48:59 +08:00
apiCallStat *rpc.CallStat
ticker *time.Ticker
2020-09-09 18:53:53 +08:00
}
func NewNodeStatusExecutor() *NodeStatusExecutor {
return &NodeStatusExecutor{
ticker: time.NewTicker(30 * time.Second),
apiCallStat: rpc.NewCallStat(10),
lastUDPInDatagrams: -1,
lastUDPOutDatagrams: -1,
}
2020-09-09 18:53:53 +08:00
}
func (this *NodeStatusExecutor) Listen() {
this.isFirstTime = true
2023-05-17 10:48:59 +08:00
this.lastUpdatedTime = time.Now()
2020-09-09 18:53:53 +08:00
this.update()
2022-01-12 20:31:04 +08:00
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE_STATUS", "quit executor")
this.ticker.Stop()
2020-10-28 11:19:06 +08:00
})
for range this.ticker.C {
2020-09-09 18:53:53 +08:00
this.isFirstTime = false
this.update()
}
}
func (this *NodeStatusExecutor) update() {
2020-09-26 08:07:07 +08:00
if sharedNodeConfig == nil {
return
}
var tr = trackers.Begin("UPLOAD_NODE_STATUS")
defer tr.End()
var status = &nodeconfigs.NodeStatus{}
2020-09-26 08:07:07 +08:00
status.BuildVersion = teaconst.Version
status.BuildVersionCode = utils.VersionToLong(teaconst.Version)
2020-10-28 11:19:06 +08:00
status.OS = runtime.GOOS
status.Arch = runtime.GOARCH
2022-07-21 15:07:12 +08:00
exe, _ := os.Executable()
status.ExePath = exe
2020-09-26 08:07:07 +08:00
status.ConfigVersion = sharedNodeConfig.Version
2020-09-09 18:53:53 +08:00
status.IsActive = true
status.ConnectionCount = sharedListenerManager.TotalActiveConnections()
status.CacheTotalDiskSize = caches.SharedManager.TotalDiskSize()
status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize()
status.TrafficInBytes = teaconst.InTrafficBytes
status.TrafficOutBytes = teaconst.OutTrafficBytes
apiSuccessPercent, apiAvgCostSeconds := this.apiCallStat.Sum()
status.APISuccessPercent = apiSuccessPercent
status.APIAvgCostSeconds = apiAvgCostSeconds
var localFirewall = firewalls.Firewall()
if localFirewall != nil && !localFirewall.IsMock() {
status.LocalFirewallName = localFirewall.Name()
}
2020-09-09 18:53:53 +08:00
2021-04-29 16:48:47 +08:00
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{
"total": status.ConnectionCount,
})
2020-09-09 18:53:53 +08:00
hostname, _ := os.Hostname()
status.Hostname = hostname
var cpuTR = tr.Begin("cpu")
2020-09-09 18:53:53 +08:00
this.updateCPU(status)
cpuTR.End()
var memTR = tr.Begin("memory")
2020-09-09 18:53:53 +08:00
this.updateMem(status)
memTR.End()
var loadTR = tr.Begin("load")
2020-09-09 18:53:53 +08:00
this.updateLoad(status)
loadTR.End()
var diskTR = tr.Begin("disk")
2020-09-09 18:53:53 +08:00
this.updateDisk(status)
diskTR.End()
var cacheSpaceTR = tr.Begin("cache space")
2021-07-08 19:43:30 +08:00
this.updateCacheSpace(status)
cacheSpaceTR.End()
2023-05-17 10:48:59 +08:00
this.updateAllTraffic(status)
// 修改更新时间
this.lastUpdatedTime = time.Now()
2020-09-09 18:53:53 +08:00
status.UpdatedAt = time.Now().Unix()
status.Timestamp = status.UpdatedAt
2020-09-09 18:53:53 +08:00
// 发送数据
jsonData, err := json.Marshal(status)
if err != nil {
remotelogs.Error("NODE_STATUS", "serial NodeStatus fail: "+err.Error())
2020-09-09 18:53:53 +08:00
return
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error())
2020-09-09 18:53:53 +08:00
return
}
var before = time.Now()
2022-08-24 20:04:46 +08:00
_, err = rpcClient.NodeRPC.UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
2020-09-09 18:53:53 +08:00
StatusJSON: jsonData,
})
var costSeconds = time.Since(before).Seconds()
this.apiCallStat.Add(err == nil, costSeconds)
2020-09-09 18:53:53 +08:00
if err != nil {
2021-11-10 21:51:56 +08:00
if rpc.IsConnError(err) {
remotelogs.Warn("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error())
} else {
remotelogs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error())
}
2020-09-09 18:53:53 +08:00
return
}
}
// 更新CPU
2020-10-25 21:27:38 +08:00
func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
var duration = time.Duration(0)
2020-09-09 18:53:53 +08:00
if this.isFirstTime {
duration = 100 * time.Millisecond
}
percents, err := cpu.Percent(duration, false)
if err != nil {
2020-10-02 17:22:38 +08:00
status.Error = "cpu.Percent(): " + err.Error()
2020-09-09 18:53:53 +08:00
return
}
if len(percents) == 0 {
return
}
status.CPUUsage = percents[0] / 100
2021-04-29 16:48:47 +08:00
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCPU, maps.Map{
"usage": status.CPUUsage,
"cores": runtime.NumCPU(),
2021-04-29 16:48:47 +08:00
})
2021-01-25 19:40:57 +08:00
if this.cpuLogicalCount == 0 && this.cpuPhysicalCount == 0 {
2020-09-09 18:53:53 +08:00
status.CPULogicalCount, err = cpu.Counts(true)
if err != nil {
2020-10-02 17:22:38 +08:00
status.Error = "cpu.Counts(): " + err.Error()
2020-09-09 18:53:53 +08:00
return
}
status.CPUPhysicalCount, err = cpu.Counts(false)
if err != nil {
2020-10-02 17:22:38 +08:00
status.Error = "cpu.Counts(): " + err.Error()
2020-09-09 18:53:53 +08:00
return
}
this.cpuLogicalCount = status.CPULogicalCount
this.cpuPhysicalCount = status.CPUPhysicalCount
} else {
status.CPULogicalCount = this.cpuLogicalCount
status.CPUPhysicalCount = this.cpuPhysicalCount
}
}
// 更新硬盘
2020-10-25 21:27:38 +08:00
func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
2020-09-09 18:53:53 +08:00
partitions, err := disk.Partitions(false)
if err != nil {
remotelogs.Error("NODE_STATUS", err.Error())
2020-09-09 18:53:53 +08:00
return
}
lists.Sort(partitions, func(i int, j int) bool {
p1 := partitions[i]
p2 := partitions[j]
return p1.Mountpoint > p2.Mountpoint
})
// 当前TeaWeb所在的fs
var rootFS = ""
var rootTotal = uint64(0)
2020-09-09 18:53:53 +08:00
if lists.ContainsString([]string{"darwin", "linux", "freebsd"}, runtime.GOOS) {
for _, p := range partitions {
if p.Mountpoint == "/" {
rootFS = p.Fstype
usage, _ := disk.Usage(p.Mountpoint)
if usage != nil {
rootTotal = usage.Total
}
break
}
}
}
var total = rootTotal
var totalUsage = uint64(0)
var maxUsage = float64(0)
2020-09-09 18:53:53 +08:00
for _, partition := range partitions {
if runtime.GOOS != "windows" && !strings.Contains(partition.Device, "/") && !strings.Contains(partition.Device, "\\") {
continue
}
// 跳过不同fs的
if len(rootFS) > 0 && rootFS != partition.Fstype {
continue
}
usage, err := disk.Usage(partition.Mountpoint)
if err != nil {
continue
}
if partition.Mountpoint != "/" && (usage.Total != rootTotal || total == 0) {
total += usage.Total
}
totalUsage += usage.Used
if usage.UsedPercent >= maxUsage {
maxUsage = usage.UsedPercent
status.DiskMaxUsagePartition = partition.Mountpoint
}
}
status.DiskTotal = total
status.DiskUsage = float64(totalUsage) / float64(total)
status.DiskMaxUsage = maxUsage / 100
2021-04-29 16:48:47 +08:00
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemDisk, maps.Map{
"total": status.DiskTotal,
"usage": status.DiskUsage,
"maxUsage": status.DiskMaxUsage,
})
2020-09-09 18:53:53 +08:00
}
2021-07-08 19:43:30 +08:00
// 缓存空间
func (this *NodeStatusExecutor) updateCacheSpace(status *nodeconfigs.NodeStatus) {
var result = []maps.Map{}
var cachePaths = caches.SharedManager.FindAllCachePaths()
2021-07-08 19:43:30 +08:00
for _, path := range cachePaths {
2023-07-08 18:59:08 +08:00
stat, err := fsutils.Stat(path)
2021-07-08 19:43:30 +08:00
if err != nil {
return
}
result = append(result, maps.Map{
"path": path,
2023-07-08 18:59:08 +08:00
"total": stat.TotalSize(),
"avail": stat.AvailableSize(),
"used": stat.UsedSize(),
2021-07-08 19:43:30 +08:00
})
}
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCacheDir, maps.Map{
"dirs": result,
})
}
2023-05-17 10:48:59 +08:00
// 流量
func (this *NodeStatusExecutor) updateAllTraffic(status *nodeconfigs.NodeStatus) {
trafficCounters, err := net.IOCounters(true)
2023-05-17 10:48:59 +08:00
if err != nil {
remotelogs.Warn("NODE_STATUS_EXECUTOR", err.Error())
return
}
var allCounter = net.IOCountersStat{}
for _, counter := range trafficCounters {
2023-05-17 10:48:59 +08:00
// 跳过lo
if counter.Name == "lo" {
continue
}
allCounter.BytesRecv += counter.BytesRecv
allCounter.BytesSent += counter.BytesSent
}
if allCounter.BytesSent == 0 && allCounter.BytesRecv == 0 {
return
}
if this.lastIOCounterStat.BytesSent > 0 {
// 记录监控数据
if allCounter.BytesSent >= this.lastIOCounterStat.BytesSent && allCounter.BytesRecv >= this.lastIOCounterStat.BytesRecv {
var costSeconds = int(math.Ceil(time.Since(this.lastUpdatedTime).Seconds()))
if costSeconds > 0 {
var bytesSent = allCounter.BytesSent - this.lastIOCounterStat.BytesSent
var bytesRecv = allCounter.BytesRecv - this.lastIOCounterStat.BytesRecv
// UDP
var udpInDatagrams int64 = 0
var udpOutDatagrams int64 = 0
protoStats, protoErr := net.ProtoCounters([]string{"udp"})
if protoErr == nil {
for _, protoStat := range protoStats {
if protoStat.Protocol == "udp" {
udpInDatagrams = protoStat.Stats["InDatagrams"]
udpOutDatagrams = protoStat.Stats["OutDatagrams"]
if udpInDatagrams < 0 {
udpInDatagrams = 0
}
if udpOutDatagrams < 0 {
udpOutDatagrams = 0
}
}
}
}
var avgUDPInDatagrams int64 = 0
var avgUDPOutDatagrams int64 = 0
if this.lastUDPInDatagrams >= 0 && this.lastUDPOutDatagrams >= 0 {
avgUDPInDatagrams = (udpInDatagrams - this.lastUDPInDatagrams) / int64(costSeconds)
avgUDPOutDatagrams = (udpOutDatagrams - this.lastUDPOutDatagrams) / int64(costSeconds)
if avgUDPInDatagrams < 0 {
avgUDPInDatagrams = 0
}
if avgUDPOutDatagrams < 0 {
avgUDPOutDatagrams = 0
}
}
this.lastUDPInDatagrams = udpInDatagrams
this.lastUDPOutDatagrams = udpOutDatagrams
2023-05-17 10:48:59 +08:00
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAllTraffic, maps.Map{
"inBytes": bytesRecv,
"outBytes": bytesSent,
"avgInBytes": bytesRecv / uint64(costSeconds),
"avgOutBytes": bytesSent / uint64(costSeconds),
"avgUDPInDatagrams": avgUDPInDatagrams,
"avgUDPOutDatagrams": avgUDPOutDatagrams,
2023-05-17 10:48:59 +08:00
})
}
}
}
this.lastIOCounterStat = allCounter
}