mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	实现节点运行日志上传
This commit is contained in:
		@@ -2,7 +2,7 @@ package caches
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -35,7 +35,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
 | 
				
			|||||||
	// 停止旧有的
 | 
						// 停止旧有的
 | 
				
			||||||
	for _, oldPolicy := range this.policyMap {
 | 
						for _, oldPolicy := range this.policyMap {
 | 
				
			||||||
		if !this.containsInt64(newPolicyIds, oldPolicy.Id) {
 | 
							if !this.containsInt64(newPolicyIds, oldPolicy.Id) {
 | 
				
			||||||
			logs.Println("[CACHE]remove policy", strconv.FormatInt(oldPolicy.Id, 10))
 | 
								logs.Error("CACHE", "remove policy "+strconv.FormatInt(oldPolicy.Id, 10))
 | 
				
			||||||
			delete(this.policyMap, oldPolicy.Id)
 | 
								delete(this.policyMap, oldPolicy.Id)
 | 
				
			||||||
			storage, ok := this.storageMap[oldPolicy.Id]
 | 
								storage, ok := this.storageMap[oldPolicy.Id]
 | 
				
			||||||
			if ok {
 | 
								if ok {
 | 
				
			||||||
@@ -49,13 +49,13 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
 | 
				
			|||||||
	for _, newPolicy := range newPolicies {
 | 
						for _, newPolicy := range newPolicies {
 | 
				
			||||||
		_, ok := this.policyMap[newPolicy.Id]
 | 
							_, ok := this.policyMap[newPolicy.Id]
 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			logs.Println("[CACHE]add policy", strconv.FormatInt(newPolicy.Id, 10))
 | 
								logs.Println("CACHE", "add policy "+strconv.FormatInt(newPolicy.Id, 10))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// 初始化
 | 
							// 初始化
 | 
				
			||||||
		err := newPolicy.Init()
 | 
							err := newPolicy.Init()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[CACHE]UpdatePolicies: init policy error: " + err.Error())
 | 
								logs.Error("CACHE", "UpdatePolicies: init policy error: "+err.Error())
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		this.policyMap[newPolicy.Id] = newPolicy
 | 
							this.policyMap[newPolicy.Id] = newPolicy
 | 
				
			||||||
@@ -67,19 +67,19 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
 | 
				
			|||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			storage := this.NewStorageWithPolicy(policy)
 | 
								storage := this.NewStorageWithPolicy(policy)
 | 
				
			||||||
			if storage == nil {
 | 
								if storage == nil {
 | 
				
			||||||
				logs.Println("[CACHE]can not find storage type '" + policy.Type + "'")
 | 
									logs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			err := storage.Init()
 | 
								err := storage.Init()
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error())
 | 
									logs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error())
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			this.storageMap[policy.Id] = storage
 | 
								this.storageMap[policy.Id] = storage
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			// 检查policy是否有变化
 | 
								// 检查policy是否有变化
 | 
				
			||||||
			if !storage.Policy().IsSame(policy) {
 | 
								if !storage.Policy().IsSame(policy) {
 | 
				
			||||||
				logs.Println("[CACHE]policy " + strconv.FormatInt(policy.Id, 10) + " changed")
 | 
									logs.Println("CACHE", "policy "+strconv.FormatInt(policy.Id, 10)+" changed")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// 停止老的
 | 
									// 停止老的
 | 
				
			||||||
				storage.Stop()
 | 
									storage.Stop()
 | 
				
			||||||
@@ -88,12 +88,12 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
 | 
				
			|||||||
				// 启动新的
 | 
									// 启动新的
 | 
				
			||||||
				storage := this.NewStorageWithPolicy(policy)
 | 
									storage := this.NewStorageWithPolicy(policy)
 | 
				
			||||||
				if storage == nil {
 | 
									if storage == nil {
 | 
				
			||||||
					logs.Println("[CACHE]can not find storage type '" + policy.Type + "'")
 | 
										logs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				err := storage.Init()
 | 
									err := storage.Init()
 | 
				
			||||||
				if err != nil {
 | 
									if err != nil {
 | 
				
			||||||
					logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error())
 | 
										logs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error())
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				this.storageMap[policy.Id] = storage
 | 
									this.storageMap[policy.Id] = storage
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,9 +6,9 @@ import (
 | 
				
			|||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
				
			||||||
	"github.com/iwind/TeaGo/Tea"
 | 
						"github.com/iwind/TeaGo/Tea"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"github.com/iwind/TeaGo/types"
 | 
						"github.com/iwind/TeaGo/types"
 | 
				
			||||||
	stringutil "github.com/iwind/TeaGo/utils/string"
 | 
						stringutil "github.com/iwind/TeaGo/utils/string"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
@@ -81,7 +81,7 @@ func (this *FileStorage) Init() error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		cost := time.Since(before).Seconds() * 1000
 | 
							cost := time.Since(before).Seconds() * 1000
 | 
				
			||||||
		logs.Println("[CACHE]init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: ", fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M")
 | 
							logs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M")
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 配置
 | 
						// 配置
 | 
				
			||||||
@@ -545,7 +545,7 @@ func (this *FileStorage) initList() error {
 | 
				
			|||||||
		item, err := this.decodeFile(path)
 | 
							item, err := this.decodeFile(path)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			if err != ErrNotFound {
 | 
								if err != ErrNotFound {
 | 
				
			||||||
				logs.Println("[CACHE]decode path '" + path + "': " + err.Error())
 | 
									logs.Error("CACHE", "decode path '"+path+"': "+err.Error())
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -634,7 +634,7 @@ func (this *FileStorage) purgeLoop() {
 | 
				
			|||||||
		path := this.hashPath(hash)
 | 
							path := this.hashPath(hash)
 | 
				
			||||||
		err := os.Remove(path)
 | 
							err := os.Remove(path)
 | 
				
			||||||
		if err != nil && !os.IsNotExist(err) {
 | 
							if err != nil && !os.IsNotExist(err) {
 | 
				
			||||||
			logs.Println("[CACHE]purge '" + path + "' error: " + err.Error())
 | 
								logs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										117
									
								
								internal/logs/utils.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										117
									
								
								internal/logs/utils.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,117 @@
 | 
				
			|||||||
 | 
					package logs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
				
			||||||
 | 
						teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/logs"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var logChan = make(chan *pb.NodeLog, 1024)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						// 定期上传日志
 | 
				
			||||||
 | 
						ticker := time.NewTicker(60 * time.Second)
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							for range ticker.C {
 | 
				
			||||||
 | 
								err := uploadLogs()
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									logs.Println("[LOG]" + err.Error())
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 打印普通信息
 | 
				
			||||||
 | 
					func Println(tag string, description string) {
 | 
				
			||||||
 | 
						logs.Println("[" + tag + "]" + description)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nodeConfig, _ := nodeconfigs.SharedNodeConfig()
 | 
				
			||||||
 | 
						if nodeConfig == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case logChan <- &pb.NodeLog{
 | 
				
			||||||
 | 
							Role:        teaconst.Role,
 | 
				
			||||||
 | 
							Tag:         tag,
 | 
				
			||||||
 | 
							Description: description,
 | 
				
			||||||
 | 
							Level:       "info",
 | 
				
			||||||
 | 
							NodeId:      nodeConfig.Id,
 | 
				
			||||||
 | 
							CreatedAt:   time.Now().Unix(),
 | 
				
			||||||
 | 
						}:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 打印警告信息
 | 
				
			||||||
 | 
					func Warn(tag string, description string) {
 | 
				
			||||||
 | 
						logs.Println("[" + tag + "]" + description)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nodeConfig, _ := nodeconfigs.SharedNodeConfig()
 | 
				
			||||||
 | 
						if nodeConfig == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case logChan <- &pb.NodeLog{
 | 
				
			||||||
 | 
							Role:        teaconst.Role,
 | 
				
			||||||
 | 
							Tag:         tag,
 | 
				
			||||||
 | 
							Description: description,
 | 
				
			||||||
 | 
							Level:       "warning",
 | 
				
			||||||
 | 
							NodeId:      nodeConfig.Id,
 | 
				
			||||||
 | 
							CreatedAt:   time.Now().Unix(),
 | 
				
			||||||
 | 
						}:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 打印错误信息
 | 
				
			||||||
 | 
					func Error(tag string, description string) {
 | 
				
			||||||
 | 
						logs.Println("[" + tag + "]" + description)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nodeConfig, _ := nodeconfigs.SharedNodeConfig()
 | 
				
			||||||
 | 
						if nodeConfig == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case logChan <- &pb.NodeLog{
 | 
				
			||||||
 | 
							Role:        teaconst.Role,
 | 
				
			||||||
 | 
							Tag:         tag,
 | 
				
			||||||
 | 
							Description: description,
 | 
				
			||||||
 | 
							Level:       "error",
 | 
				
			||||||
 | 
							NodeId:      nodeConfig.Id,
 | 
				
			||||||
 | 
							CreatedAt:   time.Now().Unix(),
 | 
				
			||||||
 | 
						}:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 上传日志
 | 
				
			||||||
 | 
					func uploadLogs() error {
 | 
				
			||||||
 | 
						logList := []*pb.NodeLog{}
 | 
				
			||||||
 | 
					Loop:
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case log := <-logChan:
 | 
				
			||||||
 | 
								logList = append(logList, log)
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								break Loop
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(logList) == 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						rpcClient, err := rpc.SharedRPC()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = rpcClient.NodeLogRPC().CreateNodeLogs(rpcClient.Context(), &pb.CreateNodeLogsRequest{NodeLogs: logList})
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -8,8 +8,8 @@ import (
 | 
				
			|||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/errors"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/errors"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
@@ -30,7 +30,7 @@ func (this *APIStream) Start() {
 | 
				
			|||||||
	for {
 | 
						for {
 | 
				
			||||||
		err := this.loop()
 | 
							err := this.loop()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[API STREAM]" + err.Error())
 | 
								logs.Error("API_STREAM", err.Error())
 | 
				
			||||||
			time.Sleep(10 * time.Second)
 | 
								time.Sleep(10 * time.Second)
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -74,7 +74,7 @@ func (this *APIStream) loop() error {
 | 
				
			|||||||
			err = this.handleUnknownMessage(message)
 | 
								err = this.handleUnknownMessage(message)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[API STREAM]handle message failed: " + err.Error())
 | 
								logs.Error("API_STREAM", "handle message failed: "+err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -100,7 +100,7 @@ func (this *APIStream) handleConnectedAPINode(message *pb.NodeStreamMessage) err
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return errors.Wrap(err)
 | 
							return errors.Wrap(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	logs.Println("[API STREAM]connected to api node '" + strconv.FormatInt(msg.APINodeId, 10) + "'")
 | 
						logs.Println("API_STREAM", "connected to api node '"+strconv.FormatInt(msg.APINodeId, 10)+"'")
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -544,7 +544,7 @@ func (this *HTTPRequest) Format(source string) string {
 | 
				
			|||||||
		if prefix == "node" {
 | 
							if prefix == "node" {
 | 
				
			||||||
			switch suffix {
 | 
								switch suffix {
 | 
				
			||||||
			case "id":
 | 
								case "id":
 | 
				
			||||||
				return sharedNodeConfig.Id
 | 
									return strconv.FormatInt(sharedNodeConfig.Id, 10)
 | 
				
			||||||
			case "name":
 | 
								case "name":
 | 
				
			||||||
				return sharedNodeConfig.Name
 | 
									return sharedNodeConfig.Name
 | 
				
			||||||
			case "role":
 | 
								case "role":
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,7 +3,7 @@ package nodes
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/types"
 | 
						"github.com/iwind/TeaGo/types"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
@@ -139,7 +139,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
 | 
				
			|||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		logs.Println("read from cache failed: " + err.Error())
 | 
							logs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -4,8 +4,8 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"net/url"
 | 
						"net/url"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
@@ -30,7 +30,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
	origin := this.reverseProxy.NextOrigin(requestCall)
 | 
						origin := this.reverseProxy.NextOrigin(requestCall)
 | 
				
			||||||
	if origin == nil {
 | 
						if origin == nil {
 | 
				
			||||||
		err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
 | 
							err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
		this.write500(err)
 | 
							this.write500(err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -50,7 +50,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
	// 处理Scheme
 | 
						// 处理Scheme
 | 
				
			||||||
	if origin.Addr == nil {
 | 
						if origin.Addr == nil {
 | 
				
			||||||
		err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
 | 
							err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
		this.write500(err)
 | 
							this.write500(err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -127,7 +127,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
	// 获取请求客户端
 | 
						// 获取请求客户端
 | 
				
			||||||
	client, addr, err := SharedHTTPClientPool.Client(this, origin)
 | 
						client, addr, err := SharedHTTPClientPool.Client(this, origin)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
		this.write500(err)
 | 
							this.write500(err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -143,7 +143,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
			// TODO 如果超过最大失败次数,则下线
 | 
								// TODO 如果超过最大失败次数,则下线
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			this.write500(err)
 | 
								this.write500(err)
 | 
				
			||||||
			logs.Println("[proxy]'" + this.RawReq.URL.String() + "': " + err.Error())
 | 
								logs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			// 是否为客户端方面的错误
 | 
								// 是否为客户端方面的错误
 | 
				
			||||||
			isClientError := false
 | 
								isClientError := false
 | 
				
			||||||
@@ -170,7 +170,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
		if this.doWAFResponse(resp) {
 | 
							if this.doWAFResponse(resp) {
 | 
				
			||||||
			err = resp.Body.Close()
 | 
								err = resp.Body.Close()
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logs.Error(err)
 | 
									logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -182,7 +182,7 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
	if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
 | 
						if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
 | 
				
			||||||
		err = resp.Body.Close()
 | 
							err = resp.Body.Close()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Error(err)
 | 
								logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -237,11 +237,11 @@ func (this *HTTPRequest) doReverseProxy() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	err1 := resp.Body.Close()
 | 
						err1 := resp.Body.Close()
 | 
				
			||||||
	if err1 != nil {
 | 
						if err1 != nil {
 | 
				
			||||||
		logs.Error(err1)
 | 
							logs.Error("REQUEST_REVERSE_PROXY", err1.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil && err != io.EOF {
 | 
						if err != nil && err != io.EOF {
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("REQUEST_REVERSE_PROXY", err.Error())
 | 
				
			||||||
		this.addError(err)
 | 
							this.addError(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,9 +6,9 @@ import (
 | 
				
			|||||||
	"compress/gzip"
 | 
						"compress/gzip"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
				
			||||||
	"github.com/iwind/TeaGo/lists"
 | 
						"github.com/iwind/TeaGo/lists"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
@@ -115,7 +115,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
 | 
				
			|||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				_ = this.cacheWriter.Discard()
 | 
									_ = this.cacheWriter.Discard()
 | 
				
			||||||
				this.cacheWriter = nil
 | 
									this.cacheWriter = nil
 | 
				
			||||||
				logs.Println("write cache failed: " + err.Error())
 | 
									logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
@@ -127,7 +127,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
 | 
				
			|||||||
		if this.gzipBodyWriter != nil {
 | 
							if this.gzipBodyWriter != nil {
 | 
				
			||||||
			_, err := this.gzipBodyWriter.Write(data)
 | 
								_, err := this.gzipBodyWriter.Write(data)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logs.Error(err)
 | 
									logs.Error("REQUEST_WRITER", err.Error())
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			this.body = append(this.body, data...)
 | 
								this.body = append(this.body, data...)
 | 
				
			||||||
@@ -281,7 +281,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
 | 
				
			|||||||
	var err error = nil
 | 
						var err error = nil
 | 
				
			||||||
	this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
 | 
						this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("REQUEST_WRITER", err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -290,7 +290,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
 | 
				
			|||||||
		this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
 | 
							this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
 | 
				
			||||||
		this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
 | 
							this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Error(err)
 | 
								logs.Error("REQUEST_WRITER", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -357,7 +357,7 @@ func (this *HTTPWriter) prepareCache(size int64) {
 | 
				
			|||||||
	expiredAt := utils.UnixTime() + life
 | 
						expiredAt := utils.UnixTime() + life
 | 
				
			||||||
	cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt)
 | 
						cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("write cache failed: " + err.Error())
 | 
							logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	this.cacheWriter = cacheWriter
 | 
						this.cacheWriter = cacheWriter
 | 
				
			||||||
@@ -369,7 +369,7 @@ func (this *HTTPWriter) prepareCache(size int64) {
 | 
				
			|||||||
	headerData := this.HeaderData()
 | 
						headerData := this.HeaderData()
 | 
				
			||||||
	_, err = cacheWriter.Write(headerData)
 | 
						_, err = cacheWriter.Write(headerData)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("write cache failed: " + err.Error())
 | 
							logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
 | 
				
			||||||
		_ = this.cacheWriter.Discard()
 | 
							_ = this.cacheWriter.Discard()
 | 
				
			||||||
		this.cacheWriter = nil
 | 
							this.cacheWriter = nil
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -4,7 +4,7 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -88,7 +88,7 @@ func (this *Listener) Listen() error {
 | 
				
			|||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		err := this.listener.Serve()
 | 
							err := this.listener.Serve()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[LISTENER]" + err.Error())
 | 
								logs.Error("LISTENER", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,7 +2,7 @@ package nodes
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"golang.org/x/net/http2"
 | 
						"golang.org/x/net/http2"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
@@ -53,7 +53,7 @@ func (this *HTTPListener) Serve() error {
 | 
				
			|||||||
		// support http/2
 | 
							// support http/2
 | 
				
			||||||
		err := http2.ConfigureServer(this.httpServer, nil)
 | 
							err := http2.ConfigureServer(this.httpServer, nil)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[HTTP_LISTENER]configure http2 error: " + err.Error())
 | 
								logs.Error("HTTP_LISTENER", "configure http2 error: "+err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		err = this.httpServer.ServeTLS(this.Listener, "", "")
 | 
							err = this.httpServer.ServeTLS(this.Listener, "", "")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,8 +2,8 @@ package nodes
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/lists"
 | 
						"github.com/iwind/TeaGo/lists"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"net/url"
 | 
						"net/url"
 | 
				
			||||||
	"regexp"
 | 
						"regexp"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
@@ -47,7 +47,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
 | 
				
			|||||||
	availableServerGroups := node.AvailableGroups()
 | 
						availableServerGroups := node.AvailableGroups()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(availableServerGroups) == 0 {
 | 
						if len(availableServerGroups) == 0 {
 | 
				
			||||||
		logs.Println("[LISTENER_MANAGER]no available servers to startup")
 | 
							logs.Println("LISTENER_MANAGER", "no available servers to startup")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, group := range availableServerGroups {
 | 
						for _, group := range availableServerGroups {
 | 
				
			||||||
@@ -59,7 +59,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
 | 
				
			|||||||
	for listenerKey, listener := range this.listenersMap {
 | 
						for listenerKey, listener := range this.listenersMap {
 | 
				
			||||||
		addr := listener.FullAddr()
 | 
							addr := listener.FullAddr()
 | 
				
			||||||
		if !lists.ContainsString(groupAddrs, addr) {
 | 
							if !lists.ContainsString(groupAddrs, addr) {
 | 
				
			||||||
			logs.Println("[LISTENER_MANAGER]close '" + addr + "'")
 | 
								logs.Println("LISTENER_MANAGER", "close '"+addr+"'")
 | 
				
			||||||
			_ = listener.Close()
 | 
								_ = listener.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			delete(this.listenersMap, listenerKey)
 | 
								delete(this.listenersMap, listenerKey)
 | 
				
			||||||
@@ -71,15 +71,15 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
 | 
				
			|||||||
		addr := group.FullAddr()
 | 
							addr := group.FullAddr()
 | 
				
			||||||
		listener, ok := this.listenersMap[addr]
 | 
							listener, ok := this.listenersMap[addr]
 | 
				
			||||||
		if ok {
 | 
							if ok {
 | 
				
			||||||
			logs.Println("[LISTENER_MANAGER]reload '" + this.prettyAddress(addr) + "'")
 | 
								logs.Println("LISTENER_MANAGER", "reload '"+this.prettyAddress(addr)+"'")
 | 
				
			||||||
			listener.Reload(group)
 | 
								listener.Reload(group)
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			logs.Println("[LISTENER_MANAGER]listen '" + this.prettyAddress(addr) + "'")
 | 
								logs.Println("LISTENER_MANAGER", "listen '"+this.prettyAddress(addr)+"'")
 | 
				
			||||||
			listener = NewListener()
 | 
								listener = NewListener()
 | 
				
			||||||
			listener.Reload(group)
 | 
								listener.Reload(group)
 | 
				
			||||||
			err := listener.Listen()
 | 
								err := listener.Listen()
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logs.Println("[LISTENER_MANAGER]" + err.Error())
 | 
									logs.Error("LISTENER_MANAGER", err.Error())
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			this.listenersMap[addr] = listener
 | 
								this.listenersMap[addr] = listener
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -4,7 +4,7 @@ import (
 | 
				
			|||||||
	"crypto/tls"
 | 
						"crypto/tls"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -27,7 +27,7 @@ func (this *TCPListener) Serve() error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		err = this.handleConn(conn)
 | 
							err = this.handleConn(conn)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[TCP_LISTENER]" + err.Error())
 | 
								logs.Error("TCP_LISTENER", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -112,7 +112,7 @@ func (this *TCPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyC
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		conn, err = OriginConnect(origin)
 | 
							conn, err = OriginConnect(origin)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[TCP_LISTENER]unable to connect origin: " + origin.Addr.Host + ":" + origin.Addr.PortRange + ": " + err.Error())
 | 
								logs.Error("TCP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error())
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,9 +6,9 @@ import (
 | 
				
			|||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -27,7 +27,7 @@ func (this *Node) Start() {
 | 
				
			|||||||
	// 读取API配置
 | 
						// 读取API配置
 | 
				
			||||||
	err := this.syncConfig(false)
 | 
						err := this.syncConfig(false)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println(err.Error())
 | 
							logs.Error("NODE", err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 启动同步计时器
 | 
						// 启动同步计时器
 | 
				
			||||||
@@ -39,12 +39,12 @@ func (this *Node) Start() {
 | 
				
			|||||||
	// 读取配置
 | 
						// 读取配置
 | 
				
			||||||
	nodeConfig, err := nodeconfigs.SharedNodeConfig()
 | 
						nodeConfig, err := nodeconfigs.SharedNodeConfig()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]start failed: read node config failed: " + err.Error())
 | 
							logs.Error("NODE", "start failed: read node config failed: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	err = nodeConfig.Init()
 | 
						err = nodeConfig.Init()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]init node config failed: " + err.Error())
 | 
							logs.Error("NODE", "init node config failed: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	sharedNodeConfig = nodeConfig
 | 
						sharedNodeConfig = nodeConfig
 | 
				
			||||||
@@ -58,7 +58,7 @@ func (this *Node) Start() {
 | 
				
			|||||||
	// 启动端口
 | 
						// 启动端口
 | 
				
			||||||
	err = sharedListenerManager.Start(nodeConfig)
 | 
						err = sharedListenerManager.Start(nodeConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]start failed: " + err.Error())
 | 
							logs.Error("NODE", "start failed: "+err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// hold住进程
 | 
						// hold住进程
 | 
				
			||||||
@@ -101,7 +101,7 @@ func (this *Node) syncConfig(isFirstTime bool) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 刷新配置
 | 
						// 刷新配置
 | 
				
			||||||
	logs.Println("[NODE]reload config ...")
 | 
						logs.Println("NODE", "reload config ...")
 | 
				
			||||||
	nodeconfigs.ResetNodeConfig(nodeConfig)
 | 
						nodeconfigs.ResetNodeConfig(nodeConfig)
 | 
				
			||||||
	caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies())
 | 
						caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies())
 | 
				
			||||||
	sharedWAFManager.UpdatePolicies(nodeConfig.AllHTTPFirewallPolicies())
 | 
						sharedWAFManager.UpdatePolicies(nodeConfig.AllHTTPFirewallPolicies())
 | 
				
			||||||
@@ -122,7 +122,7 @@ func (this *Node) startSyncTimer() {
 | 
				
			|||||||
		for range ticker.C {
 | 
							for range ticker.C {
 | 
				
			||||||
			err := this.syncConfig(false)
 | 
								err := this.syncConfig(false)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logs.Println("[NODE]sync config error: " + err.Error())
 | 
									logs.Error("NODE", "sync config error: "+err.Error())
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -4,9 +4,9 @@ import (
 | 
				
			|||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
				
			||||||
	teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
						teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
				
			||||||
	"github.com/iwind/TeaGo/lists"
 | 
						"github.com/iwind/TeaGo/lists"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"github.com/shirou/gopsutil/cpu"
 | 
						"github.com/shirou/gopsutil/cpu"
 | 
				
			||||||
	"github.com/shirou/gopsutil/disk"
 | 
						"github.com/shirou/gopsutil/disk"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
@@ -62,19 +62,19 @@ func (this *NodeStatusExecutor) update() {
 | 
				
			|||||||
	//  发送数据
 | 
						//  发送数据
 | 
				
			||||||
	jsonData, err := json.Marshal(status)
 | 
						jsonData, err := json.Marshal(status)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]serial NodeStatus fail: " + err.Error())
 | 
							logs.Error("NODE_STATUS", "serial NodeStatus fail: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	rpcClient, err := rpc.SharedRPC()
 | 
						rpcClient, err := rpc.SharedRPC()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]failed to open rpc: " + err.Error())
 | 
							logs.Error("NODE_STATUS", "failed to open rpc: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	_, err = rpcClient.NodeRPC().UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
 | 
						_, err = rpcClient.NodeRPC().UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
 | 
				
			||||||
		StatusJSON: jsonData,
 | 
							StatusJSON: jsonData,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Println("[NODE]rpc UpdateNodeStatus() failed: " + err.Error())
 | 
							logs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -120,7 +120,7 @@ func (this *NodeStatusExecutor) updateCPU(status *NodeStatus) {
 | 
				
			|||||||
func (this *NodeStatusExecutor) updateDisk(status *NodeStatus) {
 | 
					func (this *NodeStatusExecutor) updateDisk(status *NodeStatus) {
 | 
				
			||||||
	partitions, err := disk.Partitions(false)
 | 
						partitions, err := disk.Partitions(false)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		logs.Error(err)
 | 
							logs.Error("NODE_STATUS", err.Error())
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	lists.Sort(partitions, func(i int, j int) bool {
 | 
						lists.Sort(partitions, func(i int, j int) bool {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,8 +3,8 @@ package nodes
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/errors"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/errors"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/logs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/waf"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/waf"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -33,7 +33,7 @@ func (this *WAFManager) UpdatePolicies(policies []*firewallconfigs.HTTPFirewallP
 | 
				
			|||||||
	for _, p := range policies {
 | 
						for _, p := range policies {
 | 
				
			||||||
		w, err := this.convertWAF(p)
 | 
							w, err := this.convertWAF(p)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logs.Println("[WAF]initialize policy '" + strconv.FormatInt(p.Id, 10) + "' failed: " + err.Error())
 | 
								logs.Error("WAF", "initialize policy '"+strconv.FormatInt(p.Id, 10)+"' failed: "+err.Error())
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if w == nil {
 | 
							if w == nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -48,6 +48,10 @@ func (this *RPCClient) NodeRPC() pb.NodeServiceClient {
 | 
				
			|||||||
	return pb.NewNodeServiceClient(this.pickConn())
 | 
						return pb.NewNodeServiceClient(this.pickConn())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient {
 | 
				
			||||||
 | 
						return pb.NewNodeLogServiceClient(this.pickConn())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *RPCClient) Context() context.Context {
 | 
					func (this *RPCClient) Context() context.Context {
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	m := maps.Map{
 | 
						m := maps.Map{
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user