diff --git a/internal/iplibrary/action_manager.go b/internal/iplibrary/action_manager.go index 51aa2e5..6c6b20d 100644 --- a/internal/iplibrary/action_manager.go +++ b/internal/iplibrary/action_manager.go @@ -14,7 +14,7 @@ import ( var SharedActionManager = NewActionManager() -// 动作管理器定义 +// ActionManager 动作管理器定义 type ActionManager struct { locker sync.Mutex @@ -23,7 +23,7 @@ type ActionManager struct { instanceMap map[int64]ActionInterface // id => instance } -// 获取动作管理对象 +// NewActionManager 获取动作管理对象 func NewActionManager() *ActionManager { return &ActionManager{ configMap: map[int64]*firewallconfigs.FirewallActionConfig{}, @@ -31,7 +31,7 @@ func NewActionManager() *ActionManager { } } -// 更新配置 +// UpdateActions 更新配置 func (this *ActionManager) UpdateActions(actions []*firewallconfigs.FirewallActionConfig) { this.locker.Lock() defer this.locker.Unlock() @@ -108,14 +108,14 @@ func (this *ActionManager) UpdateActions(actions []*firewallconfigs.FirewallActi } } -// 查找事件对应的动作 +// FindEventActions 查找事件对应的动作 func (this *ActionManager) FindEventActions(eventLevel string) []ActionInterface { this.locker.Lock() defer this.locker.Unlock() return this.eventMap[eventLevel] } -// 执行添加IP动作 +// AddItem 执行添加IP动作 func (this *ActionManager) AddItem(listType IPListType, item *pb.IPItem) { instances, ok := this.eventMap[item.EventLevel] if ok { @@ -128,7 +128,7 @@ func (this *ActionManager) AddItem(listType IPListType, item *pb.IPItem) { } } -// 执行删除IP动作 +// DeleteItem 执行删除IP动作 func (this *ActionManager) DeleteItem(listType IPListType, item *pb.IPItem) { instances, ok := this.eventMap[item.EventLevel] if ok { diff --git a/internal/iplibrary/manager.go b/internal/iplibrary/manager.go index 69c3c2e..34fe741 100644 --- a/internal/iplibrary/manager.go +++ b/internal/iplibrary/manager.go @@ -21,7 +21,7 @@ func init() { // 初始化 library, err := SharedManager.Load() if err != nil { - remotelogs.Error("IP_LIBRARY", err.Error()) + remotelogs.ErrorObject("IP_LIBRARY", err) return } SharedLibrary = library diff --git a/internal/iplibrary/manager_country.go b/internal/iplibrary/manager_country.go index 1b4eada..46dbe14 100644 --- a/internal/iplibrary/manager_country.go +++ b/internal/iplibrary/manager_country.go @@ -46,13 +46,13 @@ func (this *CountryManager) Start() { // 从缓存中读取 err := this.load() if err != nil { - remotelogs.Error("COUNTRY_MANAGER", err.Error()) + remotelogs.ErrorObject("COUNTRY_MANAGER", err) } // 第一次更新 err = this.loop() if err != nil { - remotelogs.Error("COUNTRY_MANAGER", err.Error()) + remotelogs.ErrorObject("COUNTRY_MANAGER", err) } // 定时更新 @@ -63,7 +63,7 @@ func (this *CountryManager) Start() { for range ticker.C { err := this.loop() if err != nil { - remotelogs.Error("COUNTRY_MANAGER", err.Error()) + remotelogs.ErrorObject("COUNTRY_MANAGER", err) } } } diff --git a/internal/iplibrary/manager_ip_list.go b/internal/iplibrary/manager_ip_list.go index c41cfd6..beaab20 100644 --- a/internal/iplibrary/manager_ip_list.go +++ b/internal/iplibrary/manager_ip_list.go @@ -48,7 +48,7 @@ func (this *IPListManager) Start() { // 第一次读取 err := this.loop() if err != nil { - remotelogs.Error("IP_LIST_MANAGER", err.Error()) + remotelogs.ErrorObject("IP_LIST_MANAGER", err) } ticker := time.NewTicker(60 * time.Second) @@ -65,7 +65,7 @@ func (this *IPListManager) Start() { if err != nil { countErrors++ - remotelogs.Error("IP_LIST_MANAGER", err.Error()) + remotelogs.ErrorObject("IP_LIST_MANAGER", err) // 连续错误小于3次的我们立即重试 if countErrors <= 3 { diff --git a/internal/iplibrary/manager_province.go b/internal/iplibrary/manager_province.go index df0410a..a0de6b2 100644 --- a/internal/iplibrary/manager_province.go +++ b/internal/iplibrary/manager_province.go @@ -50,13 +50,13 @@ func (this *ProvinceManager) Start() { // 从缓存中读取 err := this.load() if err != nil { - remotelogs.Error("PROVINCE_MANAGER", err.Error()) + remotelogs.ErrorObject("PROVINCE_MANAGER", err) } // 第一次更新 err = this.loop() if err != nil { - remotelogs.Error("PROVINCE_MANAGER", err.Error()) + remotelogs.ErrorObject("PROVINCE_MANAGER", err) } // 定时更新 @@ -67,7 +67,7 @@ func (this *ProvinceManager) Start() { for range ticker.C { err := this.loop() if err != nil { - remotelogs.Error("PROVINCE_MANAGER", err.Error()) + remotelogs.ErrorObject("PROVINCE_MANAGER", err) } } } diff --git a/internal/iplibrary/updater.go b/internal/iplibrary/updater.go index d2af86f..1c471af 100644 --- a/internal/iplibrary/updater.go +++ b/internal/iplibrary/updater.go @@ -38,7 +38,7 @@ func (this *Updater) Start() { for range ticker.C { err := this.loop() if err != nil { - remotelogs.Error("IP_LIBRARY", err.Error()) + remotelogs.ErrorObject("IP_LIBRARY", err) } } }() diff --git a/internal/monitor/value_queue.go b/internal/monitor/value_queue.go index 3f1b4c1..eac7472 100644 --- a/internal/monitor/value_queue.go +++ b/internal/monitor/value_queue.go @@ -36,7 +36,7 @@ func (this *ValueQueue) Start() { // 这里单次循环就行,因为Loop里已经使用了Range通道 err := this.Loop() if err != nil { - remotelogs.Error("MONITOR_QUEUE", err.Error()) + remotelogs.ErrorObject("MONITOR_QUEUE", err) } } @@ -72,7 +72,11 @@ func (this *ValueQueue) Loop() error { CreatedAt: value.CreatedAt, }) if err != nil { - remotelogs.Error("MONITOR", err.Error()) + if rpc.IsConnError(err) { + remotelogs.Warn("MONITOR", err.Error()) + } else { + remotelogs.Error("MONITOR", err.Error()) + } continue } } diff --git a/internal/monitor/value_queue_test.go b/internal/monitor/value_queue_test.go new file mode 100644 index 0000000..c35c7d0 --- /dev/null +++ b/internal/monitor/value_queue_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package monitor + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/logs" + "google.golang.org/grpc/status" + "testing" +) + +func TestValueQueue_RPC(t *testing.T) { + rpcClient, err := rpc.SharedRPC() + if err != nil { + t.Fatal(err) + } + _, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{}) + if err != nil { + statusErr, ok:= status.FromError(err) + if ok { + logs.Println(statusErr.Code()) + } + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 77a0214..f90d50a 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -65,14 +65,14 @@ func (this *TCPListener) handleConn(conn net.Conn) error { var serverName = tlsConn.ConnectionState().ServerName if len(serverName) > 0 { // 统计 - stats.SharedTrafficStatManager.Add(firstServer.Id, serverName, 0, 0, 1, 0, 0, 0, firstServer.ShouldCheckTrafficLimit()) + stats.SharedTrafficStatManager.Add(firstServer.Id, serverName, 0, 0, 1, 0, 0, 0, firstServer.ShouldCheckTrafficLimit(), firstServer.PlanId()) recordStat = true } } // 统计 if !recordStat { - stats.SharedTrafficStatManager.Add(firstServer.Id, "", 0, 0, 1, 0, 0, 0, firstServer.ShouldCheckTrafficLimit()) + stats.SharedTrafficStatManager.Add(firstServer.Id, "", 0, 0, 1, 0, 0, 0, firstServer.ShouldCheckTrafficLimit(), firstServer.PlanId()) } originConn, err := this.connectOrigin(firstServer.ReverseProxy, conn.RemoteAddr().String()) @@ -126,7 +126,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { // 记录流量 if firstServer != nil { - stats.SharedTrafficStatManager.Add(firstServer.Id, "", int64(n), 0, 0, 0, 0, 0, firstServer.ShouldCheckTrafficLimit()) + stats.SharedTrafficStatManager.Add(firstServer.Id, "", int64(n), 0, 0, 0, 0, 0, firstServer.ShouldCheckTrafficLimit(), firstServer.PlanId()) } } if err != nil { diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index cf1c19e..6cd5b00 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -185,7 +185,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne // 统计 if server != nil { - stats.SharedTrafficStatManager.Add(server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit()) + stats.SharedTrafficStatManager.Add(server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) } go func() { @@ -206,7 +206,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne // 记录流量 if server != nil { - stats.SharedTrafficStatManager.Add(server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit()) + stats.SharedTrafficStatManager.Add(server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) } } if err != nil { diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index 731a8c4..7d0deff 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -101,7 +101,11 @@ func (this *NodeStatusExecutor) update() { StatusJSON: jsonData, }) if err != nil { - remotelogs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) + if rpc.IsConnError(err) { + remotelogs.Warn("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) + } else { + remotelogs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) + } return } } diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index 8ac79b2..0832d16 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -93,6 +93,18 @@ func Error(tag string, description string) { } } +// ErrorObject 打印错误对象 +func ErrorObject(tag string, err error) { + if err == nil { + return + } + if rpc.IsConnError(err) { + Warn(tag, err.Error()) + } else { + Error(tag, err.Error()) + } +} + // ServerError 打印服务相关错误信息 func ServerError(serverId int64, tag string, description string) { logs.Println("[" + tag + "]" + description) diff --git a/internal/rpc/rpc_utils.go b/internal/rpc/rpc_utils.go index 71eb607..6f06f14 100644 --- a/internal/rpc/rpc_utils.go +++ b/internal/rpc/rpc_utils.go @@ -2,12 +2,15 @@ package rpc import ( "github.com/TeaOSLab/EdgeNode/internal/configs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "sync" ) var sharedRPC *RPCClient = nil var locker = &sync.Mutex{} +// SharedRPC RPC对象 func SharedRPC() (*RPCClient, error) { locker.Lock() defer locker.Unlock() @@ -28,3 +31,18 @@ func SharedRPC() (*RPCClient, error) { sharedRPC = client return sharedRPC, nil } + +// IsConnError 是否为连接错误 +func IsConnError(err error) bool { + if err == nil { + return false + } + + // 检查是否为连接错误 + statusErr, ok := status.FromError(err) + if ok { + return statusErr.Code() == codes.Unavailable + } + + return false +} diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index 2e808f3..843cb39 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -81,13 +81,21 @@ func (this *HTTPRequestStatManager) Start() { for range loopTicker.C { err := this.Loop() if err != nil { - remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error()) + if rpc.IsConnError(err) { + remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error()) + } else { + remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error()) + } } select { case <-uploadTicker.C: err := this.Upload() if err != nil { - remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error()) + if !rpc.IsConnError(err) { + remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error()) + } else { + remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error()) + } } default: @@ -166,10 +174,10 @@ Loop: if iplibrary.SharedLibrary != nil { result, err := iplibrary.SharedLibrary.Lookup(ip) if err == nil && result != nil { - this.cityMap[serverId+"@"+result.Country+"@"+result.Province+"@"+result.City] ++ + this.cityMap[serverId+"@"+result.Country+"@"+result.Province+"@"+result.City]++ if len(result.ISP) > 0 { - this.providerMap[serverId+"@"+result.ISP] ++ + this.providerMap[serverId+"@"+result.ISP]++ } } } @@ -197,7 +205,7 @@ Loop: if dotIndex > -1 { browserVersion = browserVersion[:dotIndex] } - this.browserMap[serverId+"@"+browser+"@"+browserVersion] ++ + this.browserMap[serverId+"@"+browser+"@"+browserVersion]++ } case firewallRuleGroupString := <-this.firewallRuleGroupChan: this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++ diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 53e389f..de22123 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -26,6 +26,7 @@ type TrafficItem struct { CountCachedRequests int64 CountAttackRequests int64 AttackBytes int64 + PlanId int64 CheckingTrafficLimit bool } @@ -81,13 +82,17 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) for range ticker.C { err := this.Upload() if err != nil { - remotelogs.Error("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error()) + if !rpc.IsConnError(err) { + remotelogs.Error("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error()) + } else { + remotelogs.Warn("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error()) + } } } } // Add 添加流量 -func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool) { +func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool, planId int64) { if bytes == 0 && countRequests == 0 { return } @@ -112,6 +117,7 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, item.CountAttackRequests += countAttacks item.AttackBytes += attackBytes item.CheckingTrafficLimit = checkingTrafficLimit + item.PlanId = planId // 单个域名流量 var domainKey = strconv.FormatInt(timestamp, 10) + "@" + strconv.FormatInt(serverId, 10) + "@" + domain @@ -171,6 +177,7 @@ func (this *TrafficStatManager) Upload() error { CountAttackRequests: item.CountAttackRequests, AttackBytes: item.AttackBytes, CheckTrafficLimiting: item.CheckingTrafficLimit, + PlanId: item.PlanId, CreatedAt: timestamp, }) }