diff --git a/internal/const/const.go b/internal/const/const.go index 4649154..06b37c0 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -10,4 +10,7 @@ const ( EncryptKey = "8f983f4d69b83aaa0d74b21a212f6967" EncryptMethod = "aes-256-cfb" + + // systemd + SystemdServiceName = "edge-node" ) diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index c1b9ef8..417f219 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -7,13 +7,16 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/utils" "io" "net/http" + "os/exec" "strconv" "strings" "sync" @@ -98,6 +101,8 @@ func (this *APIStream) loop() error { err = this.handleConfigChanged(message) case messageconfigs.MessageCodeIPListChanged: // IPList变化 err = this.handleIPListChanged(message) + case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务 + err = this.handleCheckSystemdService(message) default: err = this.handleUnknownMessage(message) } @@ -373,6 +378,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { }() // 检查最大内容长度 + // TODO 需要解决Chunked Transfer Encoding的长度判断问题 maxSize := storage.Policy().MaxSizeBytes() if maxSize > 0 && resp.ContentLength > maxSize { locker.Lock() @@ -461,6 +467,34 @@ func (this *APIStream) handleIPListChanged(message *pb.NodeStreamMessage) error return nil } +// 检查Systemd服务 +func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage) error { + systemctl, err := exec.LookPath("systemctl") + if err != nil { + this.replyFail(message.RequestId, "'systemctl' not found") + return nil + } + if len(systemctl) == 0 { + this.replyFail(message.RequestId, "'systemctl' not found") + return nil + } + + cmd := utils.NewCommandExecutor() + shortName := teaconst.SystemdServiceName + cmd.Add(systemctl, "is-enabled", shortName) + output, err := cmd.Run() + if err != nil { + this.replyFail(message.RequestId, "'systemctl' command error: " + err.Error()) + return nil + } + if output == "enabled" { + this.replyOk(message.RequestId, "ok") + } else { + this.replyFail(message.RequestId, "not installed") + } + return nil +} + // 处理未知消息 func (this *APIStream) handleUnknownMessage(message *pb.NodeStreamMessage) error { this.replyFail(message.RequestId, "unknown message code '"+message.Code+"'") diff --git a/internal/nodes/node.go b/internal/nodes/node.go index d897e1c..1346b2a 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -16,7 +16,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/go-yaml/yaml" "github.com/iwind/TeaGo/Tea" - tealogs "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/logs" "io/ioutil" "net" "os" @@ -69,10 +69,24 @@ func (this *Node) Start() { } // 读取API配置 - err = this.syncConfig(false) - if err != nil { - remotelogs.Error("NODE", err.Error()) - return + tryTimes := 0 + for { + err = this.syncConfig(false) + if err != nil { + tryTimes++ + + if tryTimes%10 == 0 { + remotelogs.Error("NODE", err.Error()) + } + time.Sleep(1 * time.Second) + + // 不做长时间的无意义的重试 + if tryTimes > 1000 { + return + } + } else { + break + } } // 启动同步计时器 @@ -292,12 +306,12 @@ func (this *Node) checkClusterConfig() error { return err } - tealogs.Println("[NODE]registering node ...") + logs.Println("[NODE]registering node ...") resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME}) if err != nil { return err } - tealogs.Println("[NODE]registered successfully") + logs.Println("[NODE]registered successfully") // 写入到配置文件中 if len(resp.Endpoints) == 0 { @@ -312,12 +326,12 @@ func (this *Node) checkClusterConfig() error { NodeId: resp.UniqueId, Secret: resp.Secret, } - tealogs.Println("[NODE]writing 'configs/api.yaml' ...") + logs.Println("[NODE]writing 'configs/api.yaml' ...") err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml")) if err != nil { return err } - tealogs.Println("[NODE]wrote 'configs/api.yaml' successfully") + logs.Println("[NODE]wrote 'configs/api.yaml' successfully") return nil } diff --git a/internal/nodes/system_services.go b/internal/nodes/system_services.go new file mode 100644 index 0000000..96999cf --- /dev/null +++ b/internal/nodes/system_services.go @@ -0,0 +1,118 @@ +package nodes + +import ( + "bytes" + "encoding/json" + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/maps" + "io/ioutil" + "os" + "os/exec" + "runtime" +) + +func init() { + var manager = NewSystemServiceManager() + events.On(events.EventReload, func() { + err := manager.Setup() + if err != nil { + remotelogs.Error("SYSTEM_SERVICE", "setup system services failed: "+err.Error()) + } + }) +} + +// 系统服务管理 +type SystemServiceManager struct { +} + +func NewSystemServiceManager() *SystemServiceManager { + return &SystemServiceManager{} +} + +func (this *SystemServiceManager) Setup() error { + if sharedNodeConfig == nil || !sharedNodeConfig.IsOn { + return nil + } + + if len(sharedNodeConfig.SystemServices) == 0 { + return nil + } + + systemdParams, ok := sharedNodeConfig.SystemServices[nodeconfigs.SystemServiceTypeSystemd] + if ok { + err := this.setupSystemd(systemdParams) + if err != nil { + return err + } + } + + return nil +} + +func (this *SystemServiceManager) setupSystemd(params maps.Map) error { + // 只有在Linux下运行 + if runtime.GOOS != "linux" { + return nil + } + + if params == nil { + params = maps.Map{} + } + data, err := json.Marshal(params) + if err != nil { + return err + } + config := &nodeconfigs.SystemdServiceConfig{} + err = json.Unmarshal(data, config) + if err != nil { + return err + } + + // 检查当前的service + systemctl, err := exec.LookPath("systemctl") + if err != nil { + return err + } + if len(systemctl) == 0 { + return errors.New("can not find 'systemctl' on the system") + } + cmd := utils.NewCommandExecutor() + shortName := teaconst.SystemdServiceName + cmd.Add(systemctl, "is-enabled", shortName) + output, err := cmd.Run() + if err != nil { + return err + } + if config.IsOn { + exe, err := os.Executable() + if err != nil { + return err + } + + if output == "enabled" { + // 检查文件路径是否变化 + data, err := ioutil.ReadFile("/etc/systemd/system/" + teaconst.SystemdServiceName + ".service") + if err == nil && bytes.Index(data, []byte(exe)) > 0 { + return nil + } + } + manager := utils.NewServiceManager(shortName, teaconst.ProductName) + err = manager.Install(exe, []string{}) + if err != nil { + return err + } + } else { + manager := utils.NewServiceManager(shortName, teaconst.ProductName) + err = manager.Uninstall() + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/utils/command.go b/internal/utils/command.go new file mode 100644 index 0000000..5ab60e8 --- /dev/null +++ b/internal/utils/command.go @@ -0,0 +1,7 @@ +package utils + +// 命令定义 +type Command struct { + Name string + Args []string +} diff --git a/internal/utils/command_executor.go b/internal/utils/command_executor.go new file mode 100644 index 0000000..8d7acc3 --- /dev/null +++ b/internal/utils/command_executor.go @@ -0,0 +1,61 @@ +package utils + +import ( + "bytes" + "errors" + "os/exec" +) + +// 命令执行器 +type CommandExecutor struct { + commands []*Command +} + +// 获取新对象 +func NewCommandExecutor() *CommandExecutor { + return &CommandExecutor{} +} + +// 添加命令 +func (this *CommandExecutor) Add(command string, arg ...string) { + this.commands = append(this.commands, &Command{ + Name: command, + Args: arg, + }) +} + +// 执行命令 +func (this *CommandExecutor) Run() (output string, err error) { + if len(this.commands) == 0 { + return "", errors.New("no commands no run") + } + var lastCmd *exec.Cmd = nil + var lastData []byte = nil + for _, command := range this.commands { + cmd := exec.Command(command.Name, command.Args...) + stdout := bytes.NewBuffer([]byte{}) + cmd.Stdout = stdout + if lastCmd != nil { + cmd.Stdin = bytes.NewBuffer(lastData) + } + err = cmd.Start() + if err != nil { + return "", err + } + + err = cmd.Wait() + if err != nil { + _, ok := err.(*exec.ExitError) + if ok { + return "", nil + } + + return "", err + } + lastData = stdout.Bytes() + + lastCmd = cmd + } + + return string(bytes.TrimSpace(lastData)), nil +} diff --git a/internal/utils/service.go b/internal/utils/service.go new file mode 100644 index 0000000..c5f7436 --- /dev/null +++ b/internal/utils/service.go @@ -0,0 +1,111 @@ +package utils + +import ( + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/files" + "github.com/iwind/TeaGo/logs" + "log" + "os" + "path/filepath" + "runtime" + "sync" +) + +// 服务管理器 +type ServiceManager struct { + Name string + Description string + + fp *os.File + logger *log.Logger + onceLocker sync.Once +} + +// 获取对象 +func NewServiceManager(name, description string) *ServiceManager { + manager := &ServiceManager{ + Name: name, + Description: description, + } + + // root + manager.resetRoot() + + return manager +} + +// 设置服务 +func (this *ServiceManager) setup() { + this.onceLocker.Do(func() { + logFile := files.NewFile(Tea.Root + "/logs/service.log") + if logFile.Exists() { + logFile.Delete() + } + + //logger + fp, err := os.OpenFile(Tea.Root+"/logs/service.log", os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + logs.Error(err) + return + } + this.fp = fp + this.logger = log.New(fp, "", log.LstdFlags) + }) +} + +//记录普通日志 +func (this *ServiceManager) Log(msg string) { + this.setup() + if this.logger == nil { + return + } + this.logger.Println("[info]" + msg) +} + +// 记录错误日志 +func (this *ServiceManager) LogError(msg string) { + this.setup() + if this.logger == nil { + return + } + this.logger.Println("[error]" + msg) +} + +// 关闭 +func (this *ServiceManager) Close() error { + if this.fp != nil { + return this.fp.Close() + } + return nil +} + +// 重置Root +func (this *ServiceManager) resetRoot() { + if !Tea.IsTesting() { + exePath, err := os.Executable() + if err != nil { + exePath = os.Args[0] + } + link, err := filepath.EvalSymlinks(exePath) + if err == nil { + exePath = link + } + fullPath, err := filepath.Abs(exePath) + if err == nil { + Tea.UpdateRoot(filepath.Dir(filepath.Dir(fullPath))) + } + } + Tea.SetPublicDir(Tea.Root + Tea.DS + "web" + Tea.DS + "public") + Tea.SetViewsDir(Tea.Root + Tea.DS + "web" + Tea.DS + "views") + Tea.SetTmpDir(Tea.Root + Tea.DS + "web" + Tea.DS + "tmp") +} + +// 保持命令行窗口是打开的 +func (this *ServiceManager) PauseWindow() { + if runtime.GOOS != "windows" { + return + } + + b := make([]byte, 1) + _, _ = os.Stdin.Read(b) +} diff --git a/internal/utils/service_linux.go b/internal/utils/service_linux.go new file mode 100644 index 0000000..86f6489 --- /dev/null +++ b/internal/utils/service_linux.go @@ -0,0 +1,151 @@ +// +build linux + +package utils + +import ( + "errors" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/files" + "io/ioutil" + "os" + "os/exec" + "regexp" +) + +var systemdServiceFile = "/etc/systemd/system/edge-node.service" +var initServiceFile = "/etc/init.d/" + teaconst.SystemdServiceName + +// 安装服务 +func (this *ServiceManager) Install(exePath string, args []string) error { + if os.Getgid() != 0 { + return errors.New("only root users can install the service") + } + + systemd, err := exec.LookPath("systemctl") + if err != nil { + return this.installInitService(exePath, args) + } + + return this.installSystemdService(systemd, exePath, args) +} + +// 启动服务 +func (this *ServiceManager) Start() error { + if os.Getgid() != 0 { + return errors.New("only root users can start the service") + } + + if files.NewFile(systemdServiceFile).Exists() { + systemd, err := exec.LookPath("systemctl") + if err != nil { + return err + } + + return exec.Command(systemd, "start", teaconst.SystemdServiceName+".service").Start() + } + return exec.Command("service", teaconst.ProcessName, "start").Start() +} + +// 删除服务 +func (this *ServiceManager) Uninstall() error { + if os.Getgid() != 0 { + return errors.New("only root users can uninstall the service") + } + + if files.NewFile(systemdServiceFile).Exists() { + systemd, err := exec.LookPath("systemctl") + if err != nil { + return err + } + + // disable service + exec.Command(systemd, "disable", teaconst.SystemdServiceName+".service").Start() + + // reload + exec.Command(systemd, "daemon-reload") + + return files.NewFile(systemdServiceFile).Delete() + } + + f := files.NewFile(initServiceFile) + if f.Exists() { + return f.Delete() + } + return nil +} + +// install init service +func (this *ServiceManager) installInitService(exePath string, args []string) error { + shortName := teaconst.SystemdServiceName + scriptFile := Tea.Root + "/scripts/" + shortName + if !files.NewFile(scriptFile).Exists() { + return errors.New("'scripts/" + shortName + "' file not exists") + } + + data, err := ioutil.ReadFile(scriptFile) + if err != nil { + return err + } + + data = regexp.MustCompile("INSTALL_DIR=.+").ReplaceAll(data, []byte("INSTALL_DIR="+Tea.Root)) + err = ioutil.WriteFile(initServiceFile, data, 0777) + if err != nil { + return err + } + + chkCmd, err := exec.LookPath("chkconfig") + if err != nil { + return err + } + + err = exec.Command(chkCmd, "--add", teaconst.ProcessName).Start() + if err != nil { + return err + } + + return nil +} + +// install systemd service +func (this *ServiceManager) installSystemdService(systemd, exePath string, args []string) error { + shortName := teaconst.SystemdServiceName + longName := "GoEdge Node" // TODO 将来可以修改 + + desc := `# Provides: ` + shortName + ` +# Required-Start: $all +# Required-Stop: +# Default-Start: 2 3 4 5 +# Default-Stop: +# Short-Description: ` + longName + ` Service +### END INIT INFO + +[Unit] +Description=` + longName + ` Service +Before=shutdown.target + +[Service] +Type=forking +ExecStart=` + exePath + ` start +ExecStop=` + exePath + ` stop +ExecReload=` + exePath + ` reload + +[Install] +WantedBy=multi-user.target` + + // write file + err := ioutil.WriteFile(systemdServiceFile, []byte(desc), 0777) + if err != nil { + return err + } + + // stop current systemd service if running + exec.Command(systemd, "stop", shortName+".service") + + // reload + exec.Command(systemd, "daemon-reload") + + // enable + cmd := exec.Command(systemd, "enable", shortName+".service") + return cmd.Run() +} diff --git a/internal/utils/service_others.go b/internal/utils/service_others.go new file mode 100644 index 0000000..43755de --- /dev/null +++ b/internal/utils/service_others.go @@ -0,0 +1,18 @@ +// +build !linux,!windows + +package utils + +// 安装服务 +func (this *ServiceManager) Install(exePath string, args []string) error { + return nil +} + +// 启动服务 +func (this *ServiceManager) Start() error { + return nil +} + +// 删除服务 +func (this *ServiceManager) Uninstall() error { + return nil +} diff --git a/internal/utils/service_test.go b/internal/utils/service_test.go new file mode 100644 index 0000000..8f1c416 --- /dev/null +++ b/internal/utils/service_test.go @@ -0,0 +1,12 @@ +package utils + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "testing" +) + +func TestServiceManager_Log(t *testing.T) { + manager := NewServiceManager(teaconst.ProductName, teaconst.ProductName+" Server") + manager.Log("Hello, World") + manager.LogError("Hello, World") +} diff --git a/internal/utils/service_windows.go b/internal/utils/service_windows.go new file mode 100644 index 0000000..8faabb4 --- /dev/null +++ b/internal/utils/service_windows.go @@ -0,0 +1,173 @@ +// +build windows + +package utils + +import ( + "fmt" + "github.com/iwind/TeaGo/Tea" + "golang.org/x/sys/windows" + "golang.org/x/sys/windows/svc" + "golang.org/x/sys/windows/svc/mgr" + "os/exec" +) + +// 安装服务 +func (this *ServiceManager) Install(exePath string, args []string) error { + m, err := mgr.Connect() + if err != nil { + return fmt.Errorf("connecting: %s please 'Run as administrator' again", err.Error()) + } + defer m.Disconnect() + s, err := m.OpenService(this.Name) + if err == nil { + s.Close() + return fmt.Errorf("service %s already exists", this.Name) + } + + s, err = m.CreateService(this.Name, exePath, mgr.Config{ + DisplayName: this.Name, + Description: this.Description, + StartType: windows.SERVICE_AUTO_START, + }, args...) + if err != nil { + return fmt.Errorf("creating: %s", err.Error()) + } + defer s.Close() + + return nil +} + +// 启动服务 +func (this *ServiceManager) Start() error { + m, err := mgr.Connect() + if err != nil { + return err + } + defer m.Disconnect() + s, err := m.OpenService(this.Name) + if err != nil { + return fmt.Errorf("could not access service: %v", err) + } + defer s.Close() + err = s.Start("service") + if err != nil { + return fmt.Errorf("could not start service: %v", err) + } + + return nil +} + +// 删除服务 +func (this *ServiceManager) Uninstall() error { + m, err := mgr.Connect() + if err != nil { + return fmt.Errorf("connecting: %s please 'Run as administrator' again", err.Error()) + } + defer m.Disconnect() + s, err := m.OpenService(this.Name) + if err != nil { + return fmt.Errorf("open service: %s", err.Error()) + } + + // shutdown service + _, err = s.Control(svc.Stop) + if err != nil { + fmt.Printf("shutdown service: %s\n", err.Error()) + } + + defer s.Close() + err = s.Delete() + if err != nil { + return fmt.Errorf("deleting: %s", err.Error()) + } + return nil +} + +// 运行 +func (this *ServiceManager) Run() { + err := svc.Run(this.Name, this) + if err != nil { + this.LogError(err.Error()) + } +} + +// 同服务管理器的交互 +func (this *ServiceManager) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (ssec bool, errno uint32) { + const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown | svc.AcceptPauseAndContinue + + changes <- svc.Status{ + State: svc.StartPending, + } + + changes <- svc.Status{ + State: svc.Running, + Accepts: cmdsAccepted, + } + + // start service + this.Log("start") + this.cmdStart() + +loop: + for { + select { + case c := <-r: + switch c.Cmd { + case svc.Interrogate: + this.Log("cmd: Interrogate") + changes <- c.CurrentStatus + case svc.Stop, svc.Shutdown: + this.Log("cmd: Stop|Shutdown") + + // stop service + this.cmdStop() + + break loop + case svc.Pause: + this.Log("cmd: Pause") + + // stop service + this.cmdStop() + + changes <- svc.Status{ + State: svc.Paused, + Accepts: cmdsAccepted, + } + case svc.Continue: + this.Log("cmd: Continue") + + // start service + this.cmdStart() + + changes <- svc.Status{ + State: svc.Running, + Accepts: cmdsAccepted, + } + default: + this.LogError(fmt.Sprintf("unexpected control request #%d\r\n", c)) + } + } + } + changes <- svc.Status{ + State: svc.StopPending, + } + return +} + +// 启动Web服务 +func (this *ServiceManager) cmdStart() { + cmd := exec.Command(Tea.Root+Tea.DS+"bin"+Tea.DS+teaconst.SystemdServiceName+".exe", "start") + err := cmd.Start() + if err != nil { + this.LogError(err.Error()) + } +} + +// 停止Web服务 +func (this *ServiceManager) cmdStop() { + cmd := exec.Command(Tea.Root+Tea.DS+"bin"+Tea.DS+teaconst.SystemdServiceName+".exe", "stop") + err := cmd.Start() + if err != nil { + this.LogError(err.Error()) + } +}