diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 0e0741f..8f3dffc 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -6,14 +6,21 @@ import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/nodes" _ "github.com/iwind/TeaGo/bootstrap" + "os" ) func main() { app := apps.NewAppCmd(). Version(teaconst.Version). Product(teaconst.ProductName). - Usage(teaconst.ProcessName + " [-v|start|stop|restart|sync|update]") + Usage(teaconst.ProcessName + " [-v|start|stop|restart|sync|update|test]") + app.On("test", func() { + err := nodes.NewNode().Test() + if err != nil { + _, _ = os.Stderr.WriteString(err.Error()) + } + }) app.On("sync", func() { // TODO fmt.Println("not implemented yet") diff --git a/go.mod b/go.mod index f06ec66..ee1ce1d 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/dchest/siphash v1.2.1 github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-yaml/yaml v2.1.0+incompatible + github.com/golang/protobuf v1.4.2 github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f github.com/shirou/gopsutil v2.20.9+incompatible golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 383d34f..0fb01c5 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -14,6 +14,7 @@ import ( "github.com/iwind/TeaGo/Tea" tealogs "github.com/iwind/TeaGo/logs" "io/ioutil" + "net" "os" "runtime" "time" @@ -31,11 +32,35 @@ func NewNode() *Node { return &Node{} } +// 检查配置 +func (this *Node) Test() error { + // 检查是否能连接API + rpcClient, err := rpc.SharedRPC() + if err != nil { + return errors.New("test rpc failed: " + err.Error()) + } + _, err = rpcClient.APINodeRPC().FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{}) + if err != nil { + return errors.New("test rpc failed: " + err.Error()) + } + + return nil +} + +// 启动 func (this *Node) Start() { - // 读取API配置 - err := this.syncConfig(false) + // 本地Sock + err := this.listenSock() if err != nil { logs.Error("NODE", err.Error()) + return + } + + // 读取API配置 + err = this.syncConfig(false) + if err != nil { + logs.Error("NODE", err.Error()) + return } // 启动同步计时器 @@ -226,3 +251,36 @@ func (this *Node) checkClusterConfig() error { return nil } + +// 监听本地sock +func (this *Node) listenSock() error { + path := os.TempDir() + "/edge-node.sock" + + // 检查是否已经存在 + _, err := os.Stat(path) + if err == nil { + conn, err := net.Dial("unix", path) + if err != nil { + _ = os.Remove(path) + } else { + _ = conn.Close() + } + } + + // 新的监听任务 + listener, err := net.Listen("unix", path) + if err != nil { + return err + } + + go func() { + for { + _, err := listener.Accept() + if err != nil { + return + } + } + }() + + return nil +} diff --git a/internal/nodes/node_test.go b/internal/nodes/node_test.go index 5704540..affa782 100644 --- a/internal/nodes/node_test.go +++ b/internal/nodes/node_test.go @@ -1,11 +1,50 @@ package nodes import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/golang/protobuf/proto" _ "github.com/iwind/TeaGo/bootstrap" + "io" + "strconv" "testing" ) -func TestNode(t *testing.T) { +func TestNode_Start(t *testing.T) { node := NewNode() node.Start() } + +func TestNode_Test(t *testing.T) { + node := NewNode() + err := node.Test() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestNode_Proto_Buffer(t *testing.T) { + buff := proto.NewBuffer([]byte{}) + for i := 0; i < 10; i++ { + err := buff.EncodeMessage(&pb.NodeStreamMessage{ + RequestId: int64(i), + Code: "msg" + strconv.Itoa(i), + }) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 11; i++ { + msg := &pb.NodeStreamMessage{} + err := buff.DecodeMessage(msg) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } else { + t.Fatal(err) + } + } + t.Log(msg.Code, msg.RequestId) + } +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index c6b53de..2b77a77 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -72,6 +72,10 @@ func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { return pb.NewHTTPAccessLogServiceClient(this.pickConn()) } +func (this *RPCClient) APINodeRPC() pb.APINodeServiceClient { + return pb.NewAPINodeServiceClient(this.pickConn()) +} + // 节点上下文信息 func (this *RPCClient) Context() context.Context { ctx := context.Background()