From 0f54d4a472df599961fbdc1ad41da1087e947956 Mon Sep 17 00:00:00 2001 From: "meilin.huang" <954537473@qq.com> Date: Sat, 13 Aug 2022 19:31:16 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20code=20rewiew&=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B0=8F=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mayfly_go_web/package.json | 2 +- mayfly_go_web/src/store/interface/index.ts | 1 + .../src/store/modules/themeConfig.ts | 1 + .../layout/navBars/breadcrumb/setings.vue | 4 +- .../src/views/ops/machine/ServiceManage.vue | 4 +- .../src/views/ops/machine/SshTerminal.vue | 71 +++---- mayfly_go_web/yarn.lock | 8 +- server/internal/devops/api/machine.go | 17 +- .../devops/infrastructure/machine/shell.go | 19 -- .../devops/infrastructure/machine/stats.go | 18 ++ .../devops/infrastructure/machine/terminal.go | 74 +++++++ .../machine/terminal_session.go | 165 +++++++++++++++ .../machine/ws_shell_session.go | 195 ------------------ server/pkg/ws/ws.go | 2 +- 14 files changed, 308 insertions(+), 273 deletions(-) delete mode 100644 server/internal/devops/infrastructure/machine/shell.go create mode 100644 server/internal/devops/infrastructure/machine/terminal.go create mode 100644 server/internal/devops/infrastructure/machine/terminal_session.go delete mode 100644 server/internal/devops/infrastructure/machine/ws_shell_session.go diff --git a/mayfly_go_web/package.json b/mayfly_go_web/package.json index b4e23837..c0b762a6 100644 --- a/mayfly_go_web/package.json +++ b/mayfly_go_web/package.json @@ -13,7 +13,7 @@ "countup.js": "^2.0.7", "cropperjs": "^1.5.11", "echarts": "^5.3.3", - "element-plus": "^2.2.12", + "element-plus": "^2.2.13", "jsencrypt": "^3.2.1", "jsoneditor": "^9.9.0", "lodash": "^4.17.21", diff --git a/mayfly_go_web/src/store/interface/index.ts b/mayfly_go_web/src/store/interface/index.ts index 9e1c915a..adb25ec4 100644 --- a/mayfly_go_web/src/store/interface/index.ts +++ b/mayfly_go_web/src/store/interface/index.ts @@ -52,6 +52,7 @@ export interface ThemeConfigState { terminalBackground: string; terminalCursor: string; terminalFontSize: number; + terminalFontWeight: string; }; } diff --git a/mayfly_go_web/src/store/modules/themeConfig.ts b/mayfly_go_web/src/store/modules/themeConfig.ts index 50ec7425..a9e7d8c4 100644 --- a/mayfly_go_web/src/store/modules/themeConfig.ts +++ b/mayfly_go_web/src/store/modules/themeConfig.ts @@ -113,6 +113,7 @@ const themeConfigModule: Module = { // ssh终端cursor色 terminalCursor: '#f0cc09', terminalFontSize: 15, + terminalFontWeight: 'normal', /* 后端控制路由 diff --git a/mayfly_go_web/src/views/layout/navBars/breadcrumb/setings.vue b/mayfly_go_web/src/views/layout/navBars/breadcrumb/setings.vue index 1b113cfe..71ed7839 100644 --- a/mayfly_go_web/src/views/layout/navBars/breadcrumb/setings.vue +++ b/mayfly_go_web/src/views/layout/navBars/breadcrumb/setings.vue @@ -40,7 +40,7 @@ - + 全局主题 diff --git a/mayfly_go_web/src/views/ops/machine/ServiceManage.vue b/mayfly_go_web/src/views/ops/machine/ServiceManage.vue index c17444d2..621bfbc9 100644 --- a/mayfly_go_web/src/views/ops/machine/ServiceManage.vue +++ b/mayfly_go_web/src/views/ops/machine/ServiceManage.vue @@ -97,12 +97,12 @@ v-if="terminalDialog.visible" title="终端" v-model="terminalDialog.visible" - width="70%" + width="80%" :close-on-click-modal="false" :modal="false" @close="closeTermnial" > - + { state.machineId = newValue.machineId; state.cmd = newValue.cmd; state.height = newValue.height; - if (state.machineId) { - initSocket(); - } }); onMounted(() => { state.machineId = props.machineId; state.height = props.height; state.cmd = props.cmd; - if (state.machineId) { - initSocket(); - } }); onBeforeUnmount(() => { @@ -56,14 +53,19 @@ export default defineComponent({ return store.state.themeConfig.themeConfig; }); + nextTick(() => { + initXterm(); + initSocket(); + }); + function initXterm() { const term: any = new Terminal({ fontSize: getThemeConfig.value.terminalFontSize || 15, - // fontWeight: getThemeConfig.value.terminalFontWeight || 'normal', - fontFamily: 'JetBrainsMono, Consolas, Menlo, Monaco', + fontWeight: getThemeConfig.value.terminalFontWeight || 'normal', + fontFamily: 'JetBrainsMono, monaco, Consolas, Lucida Console, monospace', cursorBlink: true, - // cursorStyle: 'underline', //光标样式 disableStdin: false, + letterSpacing: -1, theme: { foreground: getThemeConfig.value.terminalForeground || '#c5c8c6', //字体 background: getThemeConfig.value.terminalBackground || '#121212', //背景色 @@ -82,6 +84,14 @@ export default defineComponent({ try { // 窗口大小改变时,触发xterm的resize方法使自适应 fitAddon.fit(); + if (state.term) { + state.term.focus(); + send({ + type: resize, + Cols: parseInt(state.term.cols), + Rows: parseInt(state.term.rows), + }); + } } catch (e) { console.log(e); } @@ -104,20 +114,14 @@ export default defineComponent({ term.onData((key: any) => { sendCmd(key); }); - // 为解决窗体resize方法才会向后端发送列数和行数,所以页面加载时也要触发此方法 - send({ - type: 'resize', - Cols: parseInt(term.cols), - Rows: parseInt(term.rows), - }); - // 如果有初始要执行的命令,则发送执行命令 - if (state.cmd) { - sendCmd(state.cmd + ' \r'); - } } function initSocket() { - state.socket = new WebSocket(`${config.baseWsUrl}/machines/${state.machineId}/terminal?token=${getSession('token')}`); + state.socket = new WebSocket( + `${config.baseWsUrl}/machines/${state.machineId}/terminal?token=${getSession('token')}&cols=${state.term.cols}&rows=${ + state.term.rows + }` + ); // 监听socket连接 state.socket.onopen = open; // 监听socket错误信息 @@ -129,8 +133,10 @@ export default defineComponent({ } function open() { - console.log('socket连接成功'); - initXterm(); + // 如果有初始要执行的命令,则发送执行命令 + if (state.cmd) { + sendCmd(state.cmd + ' \r'); + } //开启心跳 // this.start(); } @@ -151,20 +157,9 @@ export default defineComponent({ // this.reconnect() } - function getMessage(msg: string) { - // console.log(msg) - state.term.write(msg['data']); - //msg是返回的数据 - // msg = JSON.parse(msg.data); - // this.socket.send("ping");//有事没事ping一下,看看ws还活着没 - // //switch用于处理返回的数据,根据返回数据的格式去判断 - // switch (msg["operation"]) { - // case "stdout": - // this.term.write(msg["data"]);//这里write也许不是固定的,失败后找后端看一下该怎么往term里面write - // break; - // default: - // console.error("Unexpected message type:", msg);//但是错误是固定的。。。。 - // } + function getMessage(msg: any) { + // msg.data是真正后端返回的数据 + state.term.write(msg.data); //收到服务器信息,心跳重置 // this.reset(); } @@ -175,7 +170,7 @@ export default defineComponent({ function sendCmd(key: any) { send({ - type: 'cmd', + type: data, msg: key, }); } diff --git a/mayfly_go_web/yarn.lock b/mayfly_go_web/yarn.lock index af44ef02..221cab84 100644 --- a/mayfly_go_web/yarn.lock +++ b/mayfly_go_web/yarn.lock @@ -633,10 +633,10 @@ echarts@^5.3.3: tslib "2.3.0" zrender "5.3.2" -element-plus@^2.2.12: - version "2.2.12" - resolved "https://registry.npmmirror.com/element-plus/-/element-plus-2.2.12.tgz#b6c4e298e02ba9b904d70daa54def27b2de8c43c" - integrity sha512-g/hIHj3b+dND2R3YRvyvCJtJhQvR7lWvXqhJaoxaQmajjNWedoe4rttxG26fOSv9YCC2wN4iFDcJHs70YFNgrA== +element-plus@^2.2.13: + version "2.2.13" + resolved "https://registry.npmmirror.com/element-plus/-/element-plus-2.2.13.tgz#9ec3a9fa6587c93a87bb0d30c200ac8ee4f69c8b" + integrity sha512-dKQ7BPZC8deUPhv+6s4GgOL0GyGj3KpUarywxm6s1nWnHjH6FqeZlUcxPqBvJd7W/d81POayx3B13GP+rfkG9g== dependencies: "@ctrl/tinycolor" "^3.4.1" "@element-plus/icons-vue" "^2.0.6" diff --git a/server/internal/devops/api/machine.go b/server/internal/devops/api/machine.go index 1013dedd..9692e302 100644 --- a/server/internal/devops/api/machine.go +++ b/server/internal/devops/api/machine.go @@ -160,21 +160,16 @@ func (m *Machine) WsSSH(g *gin.Context) { panic(biz.NewBizErr("\033[1;31m您没有权限操作该机器终端,请重新登录后再试~\033[0m")) } - cols := ginx.QueryInt(g, "cols", 80) - rows := ginx.QueryInt(g, "rows", 40) - cli := m.MachineApp.GetCli(GetMachineId(g)) biz.ErrIsNilAppendErr(m.ProjectApp.CanAccess(rc.LoginAccount.Id, cli.GetMachine().ProjectId), "%s") - sws, err := machine.NewLogicSshWsSession(cols, rows, cli, wsConn) - biz.ErrIsNilAppendErr(err, "\033[1;31m连接失败:%s\033[0m") - defer sws.Close() + cols := ginx.QueryInt(g, "cols", 80) + rows := ginx.QueryInt(g, "rows", 40) - quitChan := make(chan bool, 3) - sws.Start(quitChan) - go sws.Wait(quitChan) - - <-quitChan + mts, err := machine.NewTerminalSession(utils.RandString(16), wsConn, cli, rows, cols) + biz.ErrIsNilAppendErr(err, "\033[1;31m连接失败: %s\033[0m") + mts.Start() + defer mts.Stop() } func GetMachineId(g *gin.Context) uint64 { diff --git a/server/internal/devops/infrastructure/machine/shell.go b/server/internal/devops/infrastructure/machine/shell.go deleted file mode 100644 index d8a069c2..00000000 --- a/server/internal/devops/infrastructure/machine/shell.go +++ /dev/null @@ -1,19 +0,0 @@ -package machine - -const StatsShell = ` -cat /proc/uptime -echo '-----' -/bin/hostname -f -echo '-----' -cat /proc/loadavg -echo '-----' -cat /proc/meminfo -echo '-----' -df -B1 -echo '-----' -/sbin/ip -o addr -echo '-----' -/bin/cat /proc/net/dev -echo '-----' -top -b -n 1 | grep Cpu -` diff --git a/server/internal/devops/infrastructure/machine/stats.go b/server/internal/devops/infrastructure/machine/stats.go index b4372b20..5aa38ac7 100644 --- a/server/internal/devops/infrastructure/machine/stats.go +++ b/server/internal/devops/infrastructure/machine/stats.go @@ -53,6 +53,24 @@ type Stats struct { CPU CPUInfo // or []CPUInfo to get all the cpu-core's stats? } +const StatsShell = ` +cat /proc/uptime +echo '-----' +/bin/hostname -f +echo '-----' +cat /proc/loadavg +echo '-----' +cat /proc/meminfo +echo '-----' +df -B1 +echo '-----' +/sbin/ip -o addr +echo '-----' +/bin/cat /proc/net/dev +echo '-----' +top -b -n 1 | grep Cpu +` + func (c *Cli) GetAllStats() *Stats { res, _ := c.Run(StatsShell) infos := strings.Split(*res, "-----") diff --git a/server/internal/devops/infrastructure/machine/terminal.go b/server/internal/devops/infrastructure/machine/terminal.go new file mode 100644 index 00000000..97d3b8ef --- /dev/null +++ b/server/internal/devops/infrastructure/machine/terminal.go @@ -0,0 +1,74 @@ +package machine + +import ( + "bufio" + "io" + + "golang.org/x/crypto/ssh" +) + +type Terminal struct { + SshSession *ssh.Session + StdinPipe io.WriteCloser + StdoutReader *bufio.Reader +} + +// 新建机器ssh终端 +func NewTerminal(cli *Cli) (*Terminal, error) { + sshSession, err := cli.GetSession() + if err != nil { + return nil, err + } + + stdoutPipe, err := sshSession.StdoutPipe() + if err != nil { + return nil, err + } + stdoutReader := bufio.NewReader(stdoutPipe) + + stdinPipe, err := sshSession.StdinPipe() + if err != nil { + return nil, err + } + + terminal := Terminal{ + SshSession: sshSession, + StdinPipe: stdinPipe, + StdoutReader: stdoutReader, + } + + return &terminal, nil +} + +func (t *Terminal) Write(p []byte) (int, error) { + return t.StdinPipe.Write(p) +} + +func (t *Terminal) ReadRune() (r rune, size int, err error) { + return t.StdoutReader.ReadRune() +} + +func (t *Terminal) Close() error { + if t.SshSession != nil { + return t.SshSession.Close() + } + return nil +} + +func (t *Terminal) WindowChange(h int, w int) error { + return t.SshSession.WindowChange(h, w) +} + +func (t *Terminal) RequestPty(term string, h, w int) error { + modes := ssh.TerminalModes{ + ssh.ECHO: 1, + ssh.TTY_OP_ISPEED: 14400, + ssh.TTY_OP_OSPEED: 14400, + } + + return t.SshSession.RequestPty(term, h, w, modes) +} + +func (t *Terminal) Shell() error { + return t.SshSession.Shell() +} diff --git a/server/internal/devops/infrastructure/machine/terminal_session.go b/server/internal/devops/infrastructure/machine/terminal_session.go new file mode 100644 index 00000000..fede0bba --- /dev/null +++ b/server/internal/devops/infrastructure/machine/terminal_session.go @@ -0,0 +1,165 @@ +package machine + +import ( + "context" + "encoding/json" + "io" + "mayfly-go/pkg/global" + "time" + "unicode/utf8" + + "github.com/gorilla/websocket" +) + +const ( + Resize = 1 + Data = 2 +) + +type TerminalSession struct { + ID string + wsConn *websocket.Conn + terminal *Terminal + ctx context.Context + cancel context.CancelFunc + dataChan chan rune + tick *time.Ticker +} + +func NewTerminalSession(sessionId string, ws *websocket.Conn, cli *Cli, rows, cols int) (*TerminalSession, error) { + terminal, err := NewTerminal(cli) + if err != nil { + return nil, err + } + err = terminal.RequestPty("xterm-256color", rows, cols) + if err != nil { + return nil, err + } + err = terminal.Shell() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + tick := time.NewTicker(time.Millisecond * time.Duration(60)) + ts := &TerminalSession{ + ID: sessionId, + wsConn: ws, + terminal: terminal, + ctx: ctx, + cancel: cancel, + dataChan: make(chan rune), + tick: tick, + } + return ts, nil +} + +func (r TerminalSession) Start() { + go r.readFormTerminal() + go r.writeToWebsocket() + r.receiveWsMsg() +} + +func (r TerminalSession) Stop() { + global.Log.Debug("close machine ssh terminal session") + r.tick.Stop() + r.cancel() + if r.terminal != nil { + if err := r.terminal.Close(); err != nil { + global.Log.Errorf("关闭机器ssh终端失败: %s", err.Error()) + } + } +} + +func (ts TerminalSession) readFormTerminal() { + for { + select { + case <-ts.ctx.Done(): + return + default: + rn, size, err := ts.terminal.ReadRune() + if err != nil { + if err != io.EOF { + global.Log.Error("机器ssh终端读取消息失败: ", err) + } + return + } + if size > 0 { + ts.dataChan <- rn + } + } + } +} + +func (ts TerminalSession) writeToWebsocket() { + var buf []byte + for { + select { + case <-ts.ctx.Done(): + return + case <-ts.tick.C: + if len(buf) > 0 { + s := string(buf) + if err := WriteMessage(ts.wsConn, s); err != nil { + global.Log.Error("机器ssh终端发送消息至websocket失败: ", err) + return + } + buf = []byte{} + } + case data := <-ts.dataChan: + if data != utf8.RuneError { + p := make([]byte, utf8.RuneLen(data)) + utf8.EncodeRune(p, data) + buf = append(buf, p...) + } else { + buf = append(buf, []byte("@")...) + } + } + } +} + +type WsMsg struct { + Type int `json:"type"` + Msg string `json:"msg"` + Cols int `json:"cols"` + Rows int `json:"rows"` +} + +func (ts *TerminalSession) receiveWsMsg() { + wsConn := ts.wsConn + for { + select { + case <-ts.ctx.Done(): + return + default: + // read websocket msg + _, wsData, err := wsConn.ReadMessage() + if err != nil { + global.Log.Debug("机器ssh终端读取websocket消息失败: ", err) + return + } + // 解析消息 + msgObj := WsMsg{} + if err := json.Unmarshal(wsData, &msgObj); err != nil { + global.Log.Error("机器ssh终端消息解析失败: ", err) + } + switch msgObj.Type { + case Resize: + if msgObj.Cols > 0 && msgObj.Rows > 0 { + if err := ts.terminal.WindowChange(msgObj.Rows, msgObj.Cols); err != nil { + global.Log.Error("ssh pty change windows size failed") + } + } + case Data: + _, err := ts.terminal.Write([]byte(msgObj.Msg)) + if err != nil { + global.Log.Debug("机器ssh终端写入消息失败: %s", err) + } + } + } + } +} + +func WriteMessage(ws *websocket.Conn, msg string) error { + return ws.WriteMessage(websocket.TextMessage, []byte(msg)) +} diff --git a/server/internal/devops/infrastructure/machine/ws_shell_session.go b/server/internal/devops/infrastructure/machine/ws_shell_session.go deleted file mode 100644 index 5d1a1044..00000000 --- a/server/internal/devops/infrastructure/machine/ws_shell_session.go +++ /dev/null @@ -1,195 +0,0 @@ -package machine - -import ( - "bytes" - "encoding/json" - "io" - "mayfly-go/pkg/global" - "sync" - "time" - - "github.com/gorilla/websocket" - "golang.org/x/crypto/ssh" -) - -type safeBuffer struct { - buffer bytes.Buffer - mu sync.Mutex -} - -func (w *safeBuffer) Write(p []byte) (int, error) { - w.mu.Lock() - defer w.mu.Unlock() - return w.buffer.Write(p) -} -func (w *safeBuffer) Bytes() []byte { - w.mu.Lock() - defer w.mu.Unlock() - return w.buffer.Bytes() -} -func (w *safeBuffer) Reset() { - w.mu.Lock() - defer w.mu.Unlock() - w.buffer.Reset() -} - -const ( - wsMsgCmd = "cmd" - wsMsgResize = "resize" -) - -type WsMsg struct { - Type string `json:"type"` - Msg string `json:"msg"` - Cols int `json:"cols"` - Rows int `json:"rows"` -} - -type LogicSshWsSession struct { - stdinPipe io.WriteCloser - comboOutput *safeBuffer //ssh 终端混合输出 - inputFilterBuff *safeBuffer //用来过滤输入的命令和ssh_filter配置对比的 - session *ssh.Session - wsConn *websocket.Conn -} - -func NewLogicSshWsSession(cols, rows int, cli *Cli, wsConn *websocket.Conn) (*LogicSshWsSession, error) { - sshSession, err := cli.GetSession() - if err != nil { - return nil, err - } - - stdinP, err := sshSession.StdinPipe() - if err != nil { - return nil, err - } - - comboWriter := new(safeBuffer) - inputBuf := new(safeBuffer) - //ssh.stdout and stderr will write output into comboWriter - sshSession.Stdout = comboWriter - sshSession.Stderr = comboWriter - - modes := ssh.TerminalModes{ - ssh.ECHO: 1, // disable echo - ssh.TTY_OP_ISPEED: 14400, // input speed = 14.4kbaud - ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud - } - // Request pseudo terminal - if err := sshSession.RequestPty("xterm-256color", rows, cols, modes); err != nil { - return nil, err - } - // Start remote shell - if err := sshSession.Shell(); err != nil { - return nil, err - } - return &LogicSshWsSession{ - stdinPipe: stdinP, - comboOutput: comboWriter, - inputFilterBuff: inputBuf, - session: sshSession, - wsConn: wsConn, - }, nil -} - -//Close 关闭 -func (sws *LogicSshWsSession) Close() { - if sws.session != nil { - sws.session.Close() - } - if sws.comboOutput != nil { - sws.comboOutput = nil - } -} - -func (sws *LogicSshWsSession) Start(quitChan chan bool) { - go sws.receiveWsMsg(quitChan) - go sws.sendComboOutput(quitChan) -} - -//receiveWsMsg receive websocket msg do some handling then write into ssh.session.stdin -func (sws *LogicSshWsSession) receiveWsMsg(exitCh chan bool) { - wsConn := sws.wsConn - //tells other go routine quit - defer setQuit(exitCh) - for { - select { - case <-exitCh: - return - default: - //read websocket msg - _, wsData, err := wsConn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { - return - } - global.Log.Error("reading webSocket message failed: ", err) - return - } - //unmashal bytes into struct - msgObj := WsMsg{} - if err := json.Unmarshal(wsData, &msgObj); err != nil { - global.Log.Error("unmarshal websocket message failed:", err) - } - switch msgObj.Type { - case wsMsgResize: - //handle xterm.js size change - if msgObj.Cols > 0 && msgObj.Rows > 0 { - if err := sws.session.WindowChange(msgObj.Rows, msgObj.Cols); err != nil { - global.Log.Error("ssh pty change windows size failed") - } - } - case wsMsgCmd: - sws.sendWebsocketInputCommandToSshSessionStdinPipe([]byte(msgObj.Msg)) - } - } - } -} - -//sendWebsocketInputCommandToSshSessionStdinPipe -func (sws *LogicSshWsSession) sendWebsocketInputCommandToSshSessionStdinPipe(cmdBytes []byte) { - if _, err := sws.stdinPipe.Write(cmdBytes); err != nil { - global.Log.Error("ws cmd bytes write to ssh.stdin pipe failed") - } -} - -func (sws *LogicSshWsSession) sendComboOutput(exitCh chan bool) { - wsConn := sws.wsConn - //todo 优化成一个方法 - //tells other go routine quit - defer setQuit(exitCh) - - //every 120ms write combine output bytes into websocket response - tick := time.NewTicker(time.Millisecond * time.Duration(60)) - //for range time.Tick(120 * time.Millisecond){} - defer tick.Stop() - for { - select { - case <-tick.C: - if sws.comboOutput == nil { - return - } - bs := sws.comboOutput.Bytes() - if len(bs) > 0 { - err := wsConn.WriteMessage(websocket.TextMessage, bs) - if err != nil { - global.Log.Error("ssh sending combo output to webSocket failed") - } - sws.comboOutput.buffer.Reset() - } - - case <-exitCh: - return - } - } -} - -func (sws *LogicSshWsSession) Wait(quitChan chan bool) { - if err := sws.session.Wait(); err != nil { - setQuit(quitChan) - } -} - -func setQuit(ch chan bool) { - ch <- true -} diff --git a/server/pkg/ws/ws.go b/server/pkg/ws/ws.go index 33d4d356..d346eeec 100644 --- a/server/pkg/ws/ws.go +++ b/server/pkg/ws/ws.go @@ -56,7 +56,7 @@ func checkConn() { // 删除ws连接 func Delete(userid uint64) { - global.Log.Info("移除websocket连接:uid = ", userid) + global.Log.Debug("移除websocket连接:uid = ", userid) conn := conns[userid] if conn != nil { conn.Close()