[API节点]列表显示版本号、CPU、内存、状态等信息

This commit is contained in:
GoEdgeLab
2020-12-29 18:28:07 +08:00
parent fe1e99bacc
commit f1f321a5c3
14 changed files with 504 additions and 4 deletions

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.0.6.1"
Version = "0.0.6.2"
ProductName = "Edge API"
ProcessName = "edge-api"

View File

@@ -232,6 +232,15 @@ func (this *APINodeDAO) FindEnabledAPINodeIdWithAddr(protocol string, host strin
return int64(one.(*APINode).Id), nil
}
// 设置API节点状态
func (this *APINodeDAO) UpdateAPINodeStatus(apiNodeId int64, statusJSON []byte) error {
_, err := this.Query().
Pk(apiNodeId).
Set("status", statusJSON).
Update()
return err
}
// 生成唯一ID
func (this *APINodeDAO) genUniqueId() (string, error) {
for {

View File

@@ -1,5 +1,14 @@
package events
type Event = string
const (
EventStart Event = "start" // start loading
EventLoaded Event = "loaded" // first load
EventQuit Event = "quit" // quit node gracefully
EventReload Event = "reload" // reload config
)
// 节点更新事件
// TODO 改成事件
var NodeDNSChanges = make(chan int64, 128)

27
internal/events/utils.go Normal file
View File

@@ -0,0 +1,27 @@
package events
import "sync"
var eventsMap = map[string][]func(){} // event => []callbacks
var locker = sync.Mutex{}
// 增加事件回调
func On(event string, callback func()) {
locker.Lock()
defer locker.Unlock()
callbacks, _ := eventsMap[event]
callbacks = append(callbacks, callback)
eventsMap[event] = callbacks
}
// 通知事件
func Notify(event string) {
locker.Lock()
callbacks, _ := eventsMap[event]
locker.Unlock()
for _, callback := range callbacks {
callback()
}
}

View File

@@ -68,6 +68,9 @@ func (this *APINode) Start() {
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
// 状态变更计时器
go NewNodeStatusExecutor().Listen()
// 监听RPC服务
logs.Println("[API_NODE]starting rpc ...")
@@ -136,6 +139,8 @@ func (this *APINode) Start() {
}
}
// HTTP接口
if !isListening {
logs.Println("[API_NODE]the api node does have a listening address")
return

View File

@@ -0,0 +1,181 @@
package nodes
import (
"encoding/json"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/events"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/lists"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"os"
"runtime"
"strings"
"time"
)
type NodeStatusExecutor struct {
isFirstTime bool
cpuUpdatedTime time.Time
cpuLogicalCount int
cpuPhysicalCount int
}
func NewNodeStatusExecutor() *NodeStatusExecutor {
return &NodeStatusExecutor{}
}
func (this *NodeStatusExecutor) Listen() {
this.isFirstTime = true
this.cpuUpdatedTime = time.Now()
this.update()
// TODO 这个时间间隔可以配置
ticker := time.NewTicker(30 * time.Second)
events.On(events.EventQuit, func() {
remotelogs.Println("NODE_STATUS", "quit executor")
ticker.Stop()
})
for range ticker.C {
this.isFirstTime = false
this.update()
}
}
func (this *NodeStatusExecutor) update() {
if sharedAPIConfig == nil {
return
}
status := &nodeconfigs.NodeStatus{}
status.BuildVersion = teaconst.Version
status.OS = runtime.GOOS
status.Arch = runtime.GOARCH
status.ConfigVersion = 0
status.IsActive = true
status.ConnectionCount = 0 // TODO 实现连接数计算
hostname, _ := os.Hostname()
status.Hostname = hostname
this.updateCPU(status)
this.updateMem(status)
this.updateLoad(status)
this.updateDisk(status)
status.UpdatedAt = time.Now().Unix()
// 发送数据
jsonData, err := json.Marshal(status)
if err != nil {
remotelogs.Error("NODE_STATUS", "serial NodeStatus fail: "+err.Error())
return
}
err = models.SharedAPINodeDAO.UpdateAPINodeStatus(sharedAPIConfig.NumberId(), jsonData)
if err != nil {
remotelogs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error())
return
}
}
// 更新CPU
func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
duration := time.Duration(0)
if this.isFirstTime {
duration = 100 * time.Millisecond
}
percents, err := cpu.Percent(duration, false)
if err != nil {
status.Error = "cpu.Percent(): " + err.Error()
return
}
if len(percents) == 0 {
return
}
status.CPUUsage = percents[0] / 100
if time.Since(this.cpuUpdatedTime) > 300*time.Second { // 每隔5分钟才会更新一次
this.cpuUpdatedTime = time.Now()
status.CPULogicalCount, err = cpu.Counts(true)
if err != nil {
status.Error = "cpu.Counts(): " + err.Error()
return
}
status.CPUPhysicalCount, err = cpu.Counts(false)
if err != nil {
status.Error = "cpu.Counts(): " + err.Error()
return
}
this.cpuLogicalCount = status.CPULogicalCount
this.cpuPhysicalCount = status.CPUPhysicalCount
} else {
status.CPULogicalCount = this.cpuLogicalCount
status.CPUPhysicalCount = this.cpuPhysicalCount
}
}
// 更新硬盘
func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
partitions, err := disk.Partitions(false)
if err != nil {
remotelogs.Error("NODE_STATUS", err.Error())
return
}
lists.Sort(partitions, func(i int, j int) bool {
p1 := partitions[i]
p2 := partitions[j]
return p1.Mountpoint > p2.Mountpoint
})
// 当前TeaWeb所在的fs
rootFS := ""
rootTotal := uint64(0)
if lists.ContainsString([]string{"darwin", "linux", "freebsd"}, runtime.GOOS) {
for _, p := range partitions {
if p.Mountpoint == "/" {
rootFS = p.Fstype
usage, _ := disk.Usage(p.Mountpoint)
if usage != nil {
rootTotal = usage.Total
}
break
}
}
}
total := rootTotal
totalUsage := uint64(0)
maxUsage := float64(0)
for _, partition := range partitions {
if runtime.GOOS != "windows" && !strings.Contains(partition.Device, "/") && !strings.Contains(partition.Device, "\\") {
continue
}
// 跳过不同fs的
if len(rootFS) > 0 && rootFS != partition.Fstype {
continue
}
usage, err := disk.Usage(partition.Mountpoint)
if err != nil {
continue
}
if partition.Mountpoint != "/" && (usage.Total != rootTotal || total == 0) {
total += usage.Total
}
totalUsage += usage.Used
if usage.UsedPercent >= maxUsage {
maxUsage = usage.UsedPercent
status.DiskMaxUsagePartition = partition.Mountpoint
}
}
status.DiskTotal = total
status.DiskUsage = float64(totalUsage) / float64(total)
status.DiskMaxUsage = maxUsage / 100
}

View File

@@ -0,0 +1,27 @@
package nodes
import (
"github.com/shirou/gopsutil/cpu"
"testing"
"time"
)
func TestNodeStatusExecutor_CPU(t *testing.T) {
countLogicCPU, err := cpu.Counts(true)
if err != nil {
t.Fatal(err)
}
t.Log("logic count:", countLogicCPU)
countPhysicalCPU, err := cpu.Counts(false)
if err != nil {
t.Fatal(err)
}
t.Log("physical count:", countPhysicalCPU)
percents, err := cpu.Percent(100 * time.Millisecond, false)
if err != nil {
t.Fatal(err)
}
t.Log(percents)
}

View File

@@ -0,0 +1,41 @@
// +build !windows
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
)
// 更新内存
func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
stat, err := mem.VirtualMemory()
if err != nil {
return
}
// 重新计算内存
if stat.Total > 0 {
stat.Used = stat.Total - stat.Free - stat.Buffers - stat.Cached
status.MemoryUsage = float64(stat.Used) / float64(stat.Total)
}
status.MemoryTotal = stat.Total
}
// 更新负载
func (this *NodeStatusExecutor) updateLoad(status *nodeconfigs.NodeStatus) {
stat, err := load.Avg()
if err != nil {
status.Error = err.Error()
return
}
if stat == nil {
status.Error = "load is nil"
return
}
status.Load1m = stat.Load1
status.Load5m = stat.Load5
status.Load15m = stat.Load15
}

View File

@@ -0,0 +1,101 @@
// +build windows
package nodes
import (
"context"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"math"
"sync"
"time"
)
type WindowsLoadValue struct {
Timestamp int64
Value int
}
var windowsLoadValues = []*WindowsLoadValue{}
var windowsLoadLocker = &sync.Mutex{}
// 更新内存
func (this *NodeStatusExecutor) updateMem(status *NodeStatus) {
stat, err := mem.VirtualMemory()
if err != nil {
status.Error = err.Error()
return
}
status.MemoryUsage = stat.UsedPercent
status.MemoryTotal = stat.Total
}
// 更新负载
func (this *NodeStatusExecutor) updateLoad(status *NodeStatus) {
timestamp := time.Now().Unix()
currentLoad := 0
info, err := cpu.ProcInfo()
if err == nil && len(info) > 0 && info[0].ProcessorQueueLength < 1000 {
currentLoad = int(info[0].ProcessorQueueLength)
}
// 删除15分钟之前的数据
windowsLoadLocker.Lock()
result := []*WindowsLoadValue{}
for _, v := range windowsLoadValues {
if timestamp-v.Timestamp > 15*60 {
continue
}
result = append(result, v)
}
result = append(result, &WindowsLoadValue{
Timestamp: timestamp,
Value: currentLoad,
})
windowsLoadValues = result
total1 := 0
count1 := 0
total5 := 0
count5 := 0
total15 := 0
count15 := 0
for _, v := range result {
if timestamp-v.Timestamp <= 60 {
total1 += v.Value
count1++
}
if timestamp-v.Timestamp <= 300 {
total5 += v.Value
count5++
}
total15 += v.Value
count15++
}
load1 := float64(0)
load5 := float64(0)
load15 := float64(0)
if count1 > 0 {
load1 = math.Round(float64(total1*100)/float64(count1)) / 100
}
if count5 > 0 {
load5 = math.Round(float64(total5*100)/float64(count5)) / 100
}
if count15 > 0 {
load15 = math.Round(float64(total15*100)/float64(count15)) / 100
}
windowsLoadLocker.Unlock()
// 在老Windows上不显示错误
if err == context.DeadlineExceeded {
err = nil
}
status.Load1m = load1
status.Load5m = load5
status.Load15m = load15
}

View File

@@ -0,0 +1,90 @@
package remotelogs
import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs"
"time"
)
var logChan = make(chan *pb.NodeLog, 1024)
func init() {
// 定期上传日志
ticker := time.NewTicker(60 * time.Second)
go func() {
for range ticker.C {
// TODO
}
}()
}
// 打印普通信息
func Println(tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "info",
NodeId: nodeConfig.Id,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 打印警告信息
func Warn(tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "warning",
NodeId: nodeConfig.Id,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 打印错误信息
func Error(tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "error",
NodeId: nodeConfig.Id,
CreatedAt: time.Now().Unix(),
}:
default:
}
}

View File

@@ -140,6 +140,7 @@ func (this *APINodeService) ListEnabledAPINodes(ctx context.Context, req *pb.Lis
HttpsJSON: []byte(node.Https),
AccessAddrsJSON: []byte(node.AccessAddrs),
AccessAddrs: accessAddrs,
StatusJSON: []byte(node.Status),
})
}

File diff suppressed because one or more lines are too long