阶段性提交

This commit is contained in:
GoEdgeLab
2020-09-09 18:53:53 +08:00
parent ea05b2f236
commit 5a941ec9d7
115 changed files with 7910 additions and 744 deletions

View File

@@ -3,17 +3,16 @@ package nodes
import (
"context"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"github.com/iwind/TeaGo/logs"
"net"
"net/http"
"sync"
)
type Listener struct {
group *configs.ServerGroup
group *serverconfigs.ServerGroup
isListening bool
listener interface{} // 监听器
listener ListenerImpl // 监听器
locker sync.RWMutex
}
@@ -22,7 +21,7 @@ func NewListener() *Listener {
return &Listener{}
}
func (this *Listener) Reload(group *configs.ServerGroup) {
func (this *Listener) Reload(group *serverconfigs.ServerGroup) {
this.locker.Lock()
defer this.locker.Unlock()
this.group = group
@@ -40,78 +39,67 @@ func (this *Listener) Listen() error {
return nil
}
protocol := this.group.Protocol()
switch protocol {
case configs.ProtocolHTTP, configs.ProtocolHTTP4, configs.ProtocolHTTP6:
return this.listenHTTP()
case configs.ProtocolHTTPS, configs.ProtocolHTTPS4, configs.ProtocolHTTPS6:
return this.ListenHTTPS()
case configs.ProtocolTCP, configs.ProtocolTCP4, configs.ProtocolTCP6:
return this.listenTCP()
case configs.ProtocolTLS, configs.ProtocolTLS4, configs.ProtocolTLS6:
return this.listenTLS()
case configs.ProtocolUnix:
return this.listenUnix()
case configs.ProtocolUDP:
return this.listenUDP()
default:
return errors.New("unknown protocol '" + protocol + "'")
}
}
func (this *Listener) Close() error {
// TODO 需要实现
return nil
}
func (this *Listener) listenHTTP() error {
listener, err := this.createListener()
netListener, err := this.createListener()
if err != nil {
return err
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write([]byte("Hello, World"))
})
server := &http.Server{
Addr: this.group.Addr(),
Handler: mux,
switch protocol {
case serverconfigs.ProtocolHTTP, serverconfigs.ProtocolHTTP4, serverconfigs.ProtocolHTTP6:
this.listener = &HTTPListener{
Group: this.group,
Listener: netListener,
}
case serverconfigs.ProtocolHTTPS, serverconfigs.ProtocolHTTPS4, serverconfigs.ProtocolHTTPS6:
this.listener = &HTTPListener{
Group: this.group,
Listener: netListener,
}
case serverconfigs.ProtocolTCP, serverconfigs.ProtocolTCP4, serverconfigs.ProtocolTCP6:
this.listener = &TCPListener{
Group: this.group,
Listener: netListener,
}
case serverconfigs.ProtocolTLS, serverconfigs.ProtocolTLS4, serverconfigs.ProtocolTLS6:
this.listener = &TCPListener{
Group: this.group,
Listener: netListener,
}
case serverconfigs.ProtocolUnix:
this.listener = &UnixListener{
Group: this.group,
Listener: netListener,
}
case serverconfigs.ProtocolUDP:
this.listener = &UDPListener{
Group: this.group,
Listener: netListener,
}
default:
return errors.New("unknown protocol '" + protocol + "'")
}
this.listener.Init()
go func() {
err = server.Serve(listener)
err := this.listener.Serve()
if err != nil {
logs.Println("[LISTENER]" + err.Error())
}
}()
return nil
}
func (this *Listener) ListenHTTPS() error {
// TODO 需要实现
return nil
}
func (this *Listener) listenTCP() error {
// TODO 需要实现
return nil
}
func (this *Listener) listenTLS() error {
// TODO 需要实现
return nil
}
func (this *Listener) listenUnix() error {
// TODO 需要实现
return nil
}
func (this *Listener) listenUDP() error {
// TODO 需要实现
return nil
func (this *Listener) Close() error {
if this.listener == nil {
return nil
}
return this.listener.Close()
}
// 创建监听器
func (this *Listener) createListener() (net.Listener, error) {
listenConfig := net.ListenConfig{
Control: nil,
@@ -119,9 +107,9 @@ func (this *Listener) createListener() (net.Listener, error) {
}
switch this.group.Protocol() {
case configs.ProtocolHTTP4, configs.ProtocolHTTPS4, configs.ProtocolTLS4:
case serverconfigs.ProtocolHTTP4, serverconfigs.ProtocolHTTPS4, serverconfigs.ProtocolTLS4:
return listenConfig.Listen(context.Background(), "tcp4", this.group.Addr())
case configs.ProtocolHTTP6, configs.ProtocolHTTPS6, configs.ProtocolTLS6:
case serverconfigs.ProtocolHTTP6, serverconfigs.ProtocolHTTPS6, serverconfigs.ProtocolTLS6:
return listenConfig.Listen(context.Background(), "tcp6", this.group.Addr())
}

View File

@@ -0,0 +1,195 @@
package nodes
import (
"crypto/tls"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs/sslconfigs"
http2 "golang.org/x/net/http2"
"sync"
)
type BaseListener struct {
serversLocker sync.RWMutex
namedServersLocker sync.RWMutex
namedServers map[string]*NamedServer // 域名 => server
}
// 初始化
func (this *BaseListener) Init() {
this.namedServers = map[string]*NamedServer{}
}
// 构造TLS配置
func (this *BaseListener) buildTLSConfig(group *serverconfigs.ServerGroup) *tls.Config {
return &tls.Config{
Certificates: nil,
GetConfigForClient: func(info *tls.ClientHelloInfo) (config *tls.Config, e error) {
ssl, _, err := this.matchSSL(group, info.ServerName)
if err != nil {
return nil, err
}
cipherSuites := ssl.TLSCipherSuites()
if len(cipherSuites) == 0 {
cipherSuites = nil
}
nextProto := []string{}
if !ssl.HTTP2Disabled {
nextProto = []string{http2.NextProtoTLS}
}
return &tls.Config{
Certificates: nil,
MinVersion: ssl.TLSMinVersion(),
CipherSuites: cipherSuites,
GetCertificate: func(info *tls.ClientHelloInfo) (certificate *tls.Certificate, e error) {
_, cert, err := this.matchSSL(group, info.ServerName)
if err != nil {
return nil, err
}
if cert == nil {
return nil, errors.New("[proxy]no certs found for '" + info.ServerName + "'")
}
return cert, nil
},
ClientAuth: sslconfigs.GoSSLClientAuthType(ssl.ClientAuthType),
ClientCAs: ssl.CAPool(),
NextProtos: nextProto,
}, nil
},
GetCertificate: func(info *tls.ClientHelloInfo) (certificate *tls.Certificate, e error) {
_, cert, err := this.matchSSL(group, info.ServerName)
if err != nil {
return nil, err
}
if cert == nil {
return nil, errors.New("[proxy]no certs found for '" + info.ServerName + "'")
}
return cert, nil
},
}
}
// 根据域名匹配证书
func (this *BaseListener) matchSSL(group *serverconfigs.ServerGroup, domain string) (*sslconfigs.SSLConfig, *tls.Certificate, error) {
this.serversLocker.RLock()
defer this.serversLocker.RUnlock()
// 如果域名为空,则取第一个
// 通常域名为空是因为是直接通过IP访问的
if len(domain) == 0 {
if serverconfigs.SharedGlobalConfig().HTTPAll.MatchDomainStrictly {
return nil, nil, errors.New("no tls server name matched")
}
firstServer := group.FirstServer()
if firstServer == nil {
return nil, nil, errors.New("no server available")
}
sslConfig := firstServer.SSLConfig()
if sslConfig != nil {
return sslConfig, sslConfig.FirstCert(), nil
}
return nil, nil, errors.New("no tls server name found")
}
// 通过代理服务域名配置匹配
server, _ := this.findNamedServer(group, domain)
if server == nil || server.SSLConfig() == nil || !server.SSLConfig().IsOn {
// 搜索所有的Server通过SSL证书内容中的DNSName匹配
for _, server := range group.Servers {
if server.SSLConfig() == nil || !server.SSLConfig().IsOn {
continue
}
cert, ok := server.SSLConfig().MatchDomain(domain)
if ok {
return server.SSLConfig(), cert, nil
}
}
return nil, nil, errors.New("[proxy]no server found for '" + domain + "'")
}
// 证书是否匹配
sslConfig := server.SSLConfig()
cert, ok := sslConfig.MatchDomain(domain)
if ok {
return sslConfig, cert, nil
}
return sslConfig, sslConfig.FirstCert(), nil
}
// 根据域名来查找匹配的域名
func (this *BaseListener) findNamedServer(group *serverconfigs.ServerGroup, name string) (serverConfig *serverconfigs.ServerConfig, serverName string) {
// 读取缓存
this.namedServersLocker.RLock()
namedServer, found := this.namedServers[name]
if found {
this.namedServersLocker.RUnlock()
return namedServer.Server, namedServer.Name
}
this.namedServersLocker.RUnlock()
this.serversLocker.RLock()
defer this.serversLocker.RUnlock()
currentServers := group.Servers
countServers := len(currentServers)
if countServers == 0 {
return nil, ""
}
// 只记录N个记录防止内存耗尽
maxNamedServers := 10240
// 是否严格匹配域名
matchDomainStrictly := serverconfigs.SharedGlobalConfig().HTTPAll.MatchDomainStrictly
// 如果只有一个server则默认为这个
if countServers == 1 && !matchDomainStrictly {
return currentServers[0], name
}
// 精确查找
for _, server := range currentServers {
if server.MatchNameStrictly(name) {
this.namedServersLocker.Lock()
if len(this.namedServers) < maxNamedServers {
this.namedServers[name] = &NamedServer{
Name: name,
Server: server,
}
}
this.namedServersLocker.Unlock()
return server, name
}
}
// 模糊查找
for _, server := range currentServers {
if matched := server.MatchName(name); matched {
this.namedServersLocker.Lock()
if len(this.namedServers) < maxNamedServers {
this.namedServers[name] = &NamedServer{
Name: name,
Server: server,
}
}
this.namedServersLocker.Unlock()
return server, name
}
}
// 找不到而且域名严格匹配模式下不返回Server
if matchDomainStrictly {
return nil, name
}
// 如果没有找到,则匹配到第一个
return currentServers[0], name
}

View File

@@ -0,0 +1,68 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"github.com/iwind/TeaGo/logs"
"golang.org/x/net/http2"
"net"
"net/http"
"time"
)
type HTTPListener struct {
BaseListener
Group *serverconfigs.ServerGroup
Listener net.Listener
httpServer *http.Server
}
func (this *HTTPListener) Serve() error {
handler := http.NewServeMux()
handler.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
this.handleHTTP(writer, request)
})
this.httpServer = &http.Server{
Addr: this.Group.Addr(),
Handler: handler,
IdleTimeout: 2 * time.Minute,
}
this.httpServer.SetKeepAlivesEnabled(true)
// HTTP协议
if this.Group.IsHTTP() {
err := this.httpServer.Serve(this.Listener)
if err != nil && err != http.ErrServerClosed {
return err
}
}
// HTTPS协议
if this.Group.IsHTTPS() {
this.httpServer.TLSConfig = this.buildTLSConfig(this.Group)
// support http/2
err := http2.ConfigureServer(this.httpServer, nil)
if err != nil {
logs.Println("[HTTP_LISTENER]configure http2 error: " + err.Error())
}
err = this.httpServer.ServeTLS(this.Listener, "", "")
if err != nil && err != http.ErrServerClosed {
return err
}
}
return nil
}
func (this *HTTPListener) Close() error {
// TODO
return nil
}
func (this *HTTPListener) handleHTTP(writer http.ResponseWriter, req *http.Request) {
writer.Write([]byte("Hello, World"))
}

View File

@@ -0,0 +1,13 @@
package nodes
// 各协议监听器的具体实现
type ListenerImpl interface {
// 初始化
Init()
// 监听
Serve() error
// 关闭
Close() error
}

View File

@@ -4,6 +4,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"net/url"
"regexp"
"sync"
)
@@ -12,6 +14,7 @@ var sharedListenerManager = NewListenerManager()
type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex
lastConfig *configs.NodeConfig
}
func NewListenerManager() *ListenerManager {
@@ -24,6 +27,18 @@ func (this *ListenerManager) Start(node *configs.NodeConfig) error {
this.locker.Lock()
defer this.locker.Unlock()
// 检查是否有变化
if this.lastConfig != nil && this.lastConfig.Version == node.Version {
return nil
}
this.lastConfig = node
// 初始化
err := node.Init()
if err != nil {
return err
}
// 所有的新地址
groupAddrs := []string{}
for _, group := range node.AvailableGroups() {
@@ -45,15 +60,16 @@ func (this *ListenerManager) Start(node *configs.NodeConfig) error {
addr := group.FullAddr()
listener, ok := this.listenersMap[addr]
if ok {
logs.Println("[LISTENER_MANAGER]reload '" + addr + "'")
logs.Println("[LISTENER_MANAGER]reload '" + this.prettyAddress(addr) + "'")
listener.Reload(group)
} else {
logs.Println("[LISTENER_MANAGER]listen '" + addr + "'")
logs.Println("[LISTENER_MANAGER]listen '" + this.prettyAddress(addr) + "'")
listener = NewListener()
listener.Reload(group)
err := listener.Listen()
if err != nil {
return err
logs.Println("[LISTENER_MANAGER]" + err.Error())
continue
}
this.listenersMap[addr] = listener
}
@@ -61,3 +77,14 @@ func (this *ListenerManager) Start(node *configs.NodeConfig) error {
return nil
}
func (this *ListenerManager) prettyAddress(addr string) string {
u, err := url.Parse(addr)
if err != nil {
return addr
}
if regexp.MustCompile(`^:\d+$`).MatchString(u.Host) {
u.Host = "*" + u.Host
}
return u.String()
}

View File

@@ -12,15 +12,29 @@ func TestListenerManager_Listen(t *testing.T) {
{
IsOn: true,
HTTP: &configs.HTTPProtocolConfig{
IsOn: true,
Listen: []string{"127.0.0.1:1234"},
BaseProtocol: configs.BaseProtocol{
IsOn: true,
Listen: []*configs.NetworkAddressConfig{
{
Protocol: configs.ProtocolHTTP,
PortRange: "1234",
},
},
},
},
},
{
IsOn: true,
HTTP: &configs.HTTPProtocolConfig{
IsOn: true,
Listen: []string{"127.0.0.1:1235"},
BaseProtocol: configs.BaseProtocol{
IsOn: true,
Listen: []*configs.NetworkAddressConfig{
{
Protocol: configs.ProtocolHTTP,
PortRange: "1235",
},
},
},
},
},
},
@@ -34,15 +48,29 @@ func TestListenerManager_Listen(t *testing.T) {
{
IsOn: true,
HTTP: &configs.HTTPProtocolConfig{
IsOn: true,
Listen: []string{"127.0.0.1:1234"},
BaseProtocol: configs.BaseProtocol{
IsOn: true,
Listen: []*configs.NetworkAddressConfig{
{
Protocol: configs.ProtocolHTTP,
PortRange: "1234",
},
},
},
},
},
{
IsOn: true,
HTTP: &configs.HTTPProtocolConfig{
IsOn: true,
Listen: []string{"127.0.0.1:1236"},
BaseProtocol: configs.BaseProtocol{
IsOn: true,
Listen: []*configs.NetworkAddressConfig{
{
Protocol: configs.ProtocolHTTP,
PortRange: "1236",
},
},
},
},
},
},

View File

@@ -0,0 +1,23 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"net"
)
type TCPListener struct {
BaseListener
Group *serverconfigs.ServerGroup
Listener net.Listener
}
func (this *TCPListener) Serve() error {
// TODO
return nil
}
func (this *TCPListener) Close() error {
// TODO
return nil
}

View File

@@ -0,0 +1,23 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"net"
)
type UDPListener struct {
BaseListener
Group *serverconfigs.ServerGroup
Listener net.Listener
}
func (this *UDPListener) Serve() error {
// TODO
return nil
}
func (this *UDPListener) Close() error {
// TODO
return nil
}

View File

@@ -0,0 +1,23 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
"net"
)
type UnixListener struct {
BaseListener
Group *serverconfigs.ServerGroup
Listener net.Listener
}
func (this *UnixListener) Serve() error {
// TODO
return nil
}
func (this *UnixListener) Close() error {
// TODO
return nil
}

View File

@@ -0,0 +1,9 @@
package nodes
import "github.com/TeaOSLab/EdgeNode/internal/configs/serverconfigs"
// 域名和服务映射
type NamedServer struct {
Name string // 匹配后的域名
Server *serverconfigs.ServerConfig // 匹配后的服务配置
}

View File

@@ -1,14 +1,20 @@
package nodes
import (
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"time"
)
var sharedNodeConfig *configs.NodeConfig = nil
var stop = make(chan bool)
var lastVersion = -1
// 节点
type Node struct {
}
@@ -17,13 +23,24 @@ func NewNode() *Node {
}
func (this *Node) Start() {
// 读取API配置
err := this.syncConfig(false)
if err != nil {
logs.Println(err.Error())
}
// 启动同步计时器
this.startSyncTimer()
// 状态变更计时器
go NewNodeStatusExecutor().Listen()
// 读取配置
nodeConfig, err := configs.SharedNodeConfig()
if err != nil {
logs.Println("[NODE]start failed: read node config failed: " + err.Error())
return
}
sharedNodeConfig = nodeConfig
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
@@ -37,3 +54,59 @@ func (this *Node) Start() {
// hold住进程
<-stop
}
// 读取API配置
func (this *Node) syncConfig(isFirstTime bool) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("[NODE]create rpc client failed: " + err.Error())
}
configResp, err := rpcClient.NodeRPC().ComposeNodeConfig(rpcClient.Context(), &pb.ComposeNodeConfigRequest{})
if err != nil {
return errors.New("[NODE]read config from rpc failed: " + err.Error())
}
configBytes := configResp.ConfigJSON
nodeConfig := &configs.NodeConfig{}
err = json.Unmarshal(configBytes, nodeConfig)
if err != nil {
return errors.New("[NODE]decode config failed: " + err.Error())
}
// 写入到文件中
err = nodeConfig.Save()
if err != nil {
return err
}
// 如果版本相同,则只是保存
if lastVersion == nodeConfig.Version {
return nil
}
lastVersion = nodeConfig.Version
// 刷新配置
err = configs.ReloadNodeConfig()
if err != nil {
return err
}
if !isFirstTime {
return sharedListenerManager.Start(nodeConfig)
}
return nil
}
// 启动同步计时器
func (this *Node) startSyncTimer() {
ticker := time.NewTicker(60 * time.Second)
go func() {
for range ticker.C {
err := this.syncConfig(false)
if err != nil {
logs.Println("[NODE]sync config error: " + err.Error())
continue
}
}
}()
}

View File

@@ -0,0 +1,24 @@
package nodes
// 节点状态
type NodeStatus struct {
Version string `json:"version"`
Hostname string `json:"hostname"`
HostIP string `json:"hostIP"`
CPUUsage float64 `json:"cpuUsage"`
CPULogicalCount int `json:"cpuLogicalCount"`
CPUPhysicalCount int `json:"cpuPhysicalCount"`
MemoryUsage float64 `json:"memoryUsage"`
MemoryTotal uint64 `json:"memoryTotal"`
DiskUsage float64 `json:"diskUsage"`
DiskMaxUsage float64 `json:"diskMaxUsage"`
DiskMaxUsagePartition string `json:"diskMaxUsagePartition"`
DiskTotal uint64 `json:"diskTotal"`
UpdatedAt int64 `json:"updatedAt"`
Load1m float64 `json:"load1m"`
Load5m float64 `json:"load5m"`
Load15m float64 `json:"load15m"`
IsActive bool `json:"isActive"`
Error string `json:"error"`
}

View File

@@ -0,0 +1,172 @@
package nodes
import (
"encoding/json"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/rpc/pb"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"os"
"runtime"
"strings"
"time"
)
type NodeStatusExecutor struct {
isFirstTime bool
cpuUpdatedTime time.Time
cpuLogicalCount int
cpuPhysicalCount int
}
func NewNodeStatusExecutor() *NodeStatusExecutor {
return &NodeStatusExecutor{}
}
func (this *NodeStatusExecutor) Listen() {
this.isFirstTime = true
this.cpuUpdatedTime = time.Now()
this.update()
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
this.isFirstTime = false
this.update()
}
}
func (this *NodeStatusExecutor) update() {
status := &NodeStatus{}
status.Version = teaconst.Version
status.IsActive = true
hostname, _ := os.Hostname()
status.Hostname = hostname
this.updateCPU(status)
this.updateMem(status)
this.updateLoad(status)
this.updateDisk(status)
status.UpdatedAt = time.Now().Unix()
// 发送数据
jsonData, err := json.Marshal(status)
if err != nil {
logs.Println("[NODE]serial NodeStatus fail: " + err.Error())
return
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
logs.Println("[NODE]failed to open rpc: " + err.Error())
return
}
_, err = rpcClient.NodeRPC().UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
StatusJSON: jsonData,
})
if err != nil {
logs.Println("[NODE]rpc UpdateNodeStatus() failed: " + err.Error())
return
}
}
// 更新CPU
func (this *NodeStatusExecutor) updateCPU(status *NodeStatus) {
duration := time.Duration(0)
if this.isFirstTime {
duration = 100 * time.Millisecond
}
percents, err := cpu.Percent(duration, false)
if err != nil {
status.Error = err.Error()
return
}
if len(percents) == 0 {
return
}
status.CPUUsage = percents[0] / 100
if time.Since(this.cpuUpdatedTime) > 300*time.Second { // 每隔5分钟才会更新一次
this.cpuUpdatedTime = time.Now()
status.CPULogicalCount, err = cpu.Counts(true)
if err != nil {
status.Error = err.Error()
return
}
status.CPUPhysicalCount, err = cpu.Counts(false)
if err != nil {
status.Error = err.Error()
return
}
this.cpuLogicalCount = status.CPULogicalCount
this.cpuPhysicalCount = status.CPUPhysicalCount
} else {
status.CPULogicalCount = this.cpuLogicalCount
status.CPUPhysicalCount = this.cpuPhysicalCount
}
}
// 更新硬盘
func (this *NodeStatusExecutor) updateDisk(status *NodeStatus) {
partitions, err := disk.Partitions(false)
if err != nil {
logs.Error(err)
return
}
lists.Sort(partitions, func(i int, j int) bool {
p1 := partitions[i]
p2 := partitions[j]
return p1.Mountpoint > p2.Mountpoint
})
// 当前TeaWeb所在的fs
rootFS := ""
rootTotal := uint64(0)
if lists.ContainsString([]string{"darwin", "linux", "freebsd"}, runtime.GOOS) {
for _, p := range partitions {
if p.Mountpoint == "/" {
rootFS = p.Fstype
usage, _ := disk.Usage(p.Mountpoint)
if usage != nil {
rootTotal = usage.Total
}
break
}
}
}
total := rootTotal
totalUsage := uint64(0)
maxUsage := float64(0)
for _, partition := range partitions {
if runtime.GOOS != "windows" && !strings.Contains(partition.Device, "/") && !strings.Contains(partition.Device, "\\") {
continue
}
// 跳过不同fs的
if len(rootFS) > 0 && rootFS != partition.Fstype {
continue
}
usage, err := disk.Usage(partition.Mountpoint)
if err != nil {
continue
}
if partition.Mountpoint != "/" && (usage.Total != rootTotal || total == 0) {
total += usage.Total
}
totalUsage += usage.Used
if usage.UsedPercent >= maxUsage {
maxUsage = usage.UsedPercent
status.DiskMaxUsagePartition = partition.Mountpoint
}
}
status.DiskTotal = total
status.DiskUsage = float64(totalUsage) / float64(total)
status.DiskMaxUsage = maxUsage / 100
}

View File

@@ -0,0 +1,40 @@
// +build !windows
package nodes
import (
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
)
// 更新内存
func (this *NodeStatusExecutor) updateMem(status *NodeStatus) {
stat, err := mem.VirtualMemory()
if err != nil {
return
}
// 重新计算内存
if stat.Total > 0 {
stat.Used = stat.Total - stat.Free - stat.Buffers - stat.Cached
status.MemoryUsage = float64(stat.Used) / float64(stat.Total)
}
status.MemoryTotal = stat.Total
}
// 更新负载
func (this *NodeStatusExecutor) updateLoad(status *NodeStatus) {
stat, err := load.Avg()
if err != nil {
status.Error = err.Error()
return
}
if stat == nil {
status.Error = "load is nil"
return
}
status.Load1m = stat.Load1
status.Load5m = stat.Load5
status.Load15m = stat.Load15
}

View File

@@ -0,0 +1,101 @@
// +build windows
package agent
import (
"context"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"math"
"sync"
"time"
)
type WindowsLoadValue struct {
Timestamp int64
Value int
}
var windowsLoadValues = []*WindowsLoadValue{}
var windowsLoadLocker = &sync.Mutex{}
// 更新内存
func (this *NodeStatusExecutor) updateMem(status *NodeStatus) {
stat, err := mem.VirtualMemory()
if err != nil {
status.Error = err.Error()
return
}
status.MemoryUsage = stat.UsedPercent
status.MemoryTotal = stat.Total
}
// 更新负载
func (this *NodeStatusExecutor) updateLoad(status *NodeStatus) {
timestamp := time.Now().Unix()
currentLoad := 0
info, err := cpu.ProcInfo()
if err == nil && len(info) > 0 && info[0].ProcessorQueueLength < 1000 {
currentLoad = int(info[0].ProcessorQueueLength)
}
// 删除15分钟之前的数据
windowsLoadLocker.Lock()
result := []*WindowsLoadValue{}
for _, v := range windowsLoadValues {
if timestamp-v.Timestamp > 15*60 {
continue
}
result = append(result, v)
}
result = append(result, &WindowsLoadValue{
Timestamp: timestamp,
Value: currentLoad,
})
windowsLoadValues = result
total1 := 0
count1 := 0
total5 := 0
count5 := 0
total15 := 0
count15 := 0
for _, v := range result {
if timestamp-v.Timestamp <= 60 {
total1 += v.Value
count1++
}
if timestamp-v.Timestamp <= 300 {
total5 += v.Value
count5++
}
total15 += v.Value
count15++
}
load1 := float64(0)
load5 := float64(0)
load15 := float64(0)
if count1 > 0 {
load1 = math.Round(float64(total1*100)/float64(count1)) / 100
}
if count5 > 0 {
load5 = math.Round(float64(total5*100)/float64(count5)) / 100
}
if count15 > 0 {
load15 = math.Round(float64(total15*100)/float64(count15)) / 100
}
windowsLoadLocker.Unlock()
// 在老Windows上不显示错误
if err == context.DeadlineExceeded {
err = nil
}
status.Load1m = load1
status.Load5m = load5
status.Load15m = load15
}

View File

@@ -1,50 +0,0 @@
package nodes
import (
"context"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/rpc/node"
"github.com/iwind/TeaGo/rands"
"google.golang.org/grpc"
)
type RPCClient struct {
nodeClients []node.ServiceClient
}
func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
nodeClients := []node.ServiceClient{}
conns := []*grpc.ClientConn{}
for _, endpoint := range apiConfig.RPC.Endpoints {
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
return nil, err
}
conns = append(conns, conn)
}
if len(conns) == 0 {
return nil, errors.New("[RPC]no available endpoints")
}
// node clients
for _, conn := range conns {
nodeClients = append(nodeClients, node.NewServiceClient(conn))
}
return &RPCClient{
nodeClients: nodeClients,
}, nil
}
func (this *RPCClient) NodeRPC() node.ServiceClient {
if len(this.nodeClients) > 0 {
return this.nodeClients[rands.Int(0, len(this.nodeClients)-1)]
}
return nil
}
func (this *RPCClient) Context() context.Context {
return context.Background()
}

View File

@@ -1,30 +0,0 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/rpc/node"
"testing"
"time"
)
func TestRPCClient_NodeRPC(t *testing.T) {
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
config, err := configs.LoadAPIConfig()
if err != nil {
t.Fatal(err)
}
rpc, err := NewRPCClient(config)
if err != nil {
t.Fatal(err)
}
resp, err := rpc.NodeRPC().Config(rpc.Context(), &node.ConfigRequest{
NodeId: "123456",
})
if err != nil {
t.Fatal(err)
}
t.Log(resp)
}