mirror of
https://gitee.com/dromara/mayfly-go
synced 2026-03-01 23:55:36 +08:00
refactor: code rewiew&功能小优化
This commit is contained in:
@@ -1,19 +0,0 @@
|
||||
package machine
|
||||
|
||||
const StatsShell = `
|
||||
cat /proc/uptime
|
||||
echo '-----'
|
||||
/bin/hostname -f
|
||||
echo '-----'
|
||||
cat /proc/loadavg
|
||||
echo '-----'
|
||||
cat /proc/meminfo
|
||||
echo '-----'
|
||||
df -B1
|
||||
echo '-----'
|
||||
/sbin/ip -o addr
|
||||
echo '-----'
|
||||
/bin/cat /proc/net/dev
|
||||
echo '-----'
|
||||
top -b -n 1 | grep Cpu
|
||||
`
|
||||
@@ -53,6 +53,24 @@ type Stats struct {
|
||||
CPU CPUInfo // or []CPUInfo to get all the cpu-core's stats?
|
||||
}
|
||||
|
||||
const StatsShell = `
|
||||
cat /proc/uptime
|
||||
echo '-----'
|
||||
/bin/hostname -f
|
||||
echo '-----'
|
||||
cat /proc/loadavg
|
||||
echo '-----'
|
||||
cat /proc/meminfo
|
||||
echo '-----'
|
||||
df -B1
|
||||
echo '-----'
|
||||
/sbin/ip -o addr
|
||||
echo '-----'
|
||||
/bin/cat /proc/net/dev
|
||||
echo '-----'
|
||||
top -b -n 1 | grep Cpu
|
||||
`
|
||||
|
||||
func (c *Cli) GetAllStats() *Stats {
|
||||
res, _ := c.Run(StatsShell)
|
||||
infos := strings.Split(*res, "-----")
|
||||
|
||||
74
server/internal/devops/infrastructure/machine/terminal.go
Normal file
74
server/internal/devops/infrastructure/machine/terminal.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package machine
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
|
||||
"golang.org/x/crypto/ssh"
|
||||
)
|
||||
|
||||
type Terminal struct {
|
||||
SshSession *ssh.Session
|
||||
StdinPipe io.WriteCloser
|
||||
StdoutReader *bufio.Reader
|
||||
}
|
||||
|
||||
// 新建机器ssh终端
|
||||
func NewTerminal(cli *Cli) (*Terminal, error) {
|
||||
sshSession, err := cli.GetSession()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stdoutPipe, err := sshSession.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stdoutReader := bufio.NewReader(stdoutPipe)
|
||||
|
||||
stdinPipe, err := sshSession.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
terminal := Terminal{
|
||||
SshSession: sshSession,
|
||||
StdinPipe: stdinPipe,
|
||||
StdoutReader: stdoutReader,
|
||||
}
|
||||
|
||||
return &terminal, nil
|
||||
}
|
||||
|
||||
func (t *Terminal) Write(p []byte) (int, error) {
|
||||
return t.StdinPipe.Write(p)
|
||||
}
|
||||
|
||||
func (t *Terminal) ReadRune() (r rune, size int, err error) {
|
||||
return t.StdoutReader.ReadRune()
|
||||
}
|
||||
|
||||
func (t *Terminal) Close() error {
|
||||
if t.SshSession != nil {
|
||||
return t.SshSession.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Terminal) WindowChange(h int, w int) error {
|
||||
return t.SshSession.WindowChange(h, w)
|
||||
}
|
||||
|
||||
func (t *Terminal) RequestPty(term string, h, w int) error {
|
||||
modes := ssh.TerminalModes{
|
||||
ssh.ECHO: 1,
|
||||
ssh.TTY_OP_ISPEED: 14400,
|
||||
ssh.TTY_OP_OSPEED: 14400,
|
||||
}
|
||||
|
||||
return t.SshSession.RequestPty(term, h, w, modes)
|
||||
}
|
||||
|
||||
func (t *Terminal) Shell() error {
|
||||
return t.SshSession.Shell()
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
package machine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mayfly-go/pkg/global"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const (
|
||||
Resize = 1
|
||||
Data = 2
|
||||
)
|
||||
|
||||
type TerminalSession struct {
|
||||
ID string
|
||||
wsConn *websocket.Conn
|
||||
terminal *Terminal
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
dataChan chan rune
|
||||
tick *time.Ticker
|
||||
}
|
||||
|
||||
func NewTerminalSession(sessionId string, ws *websocket.Conn, cli *Cli, rows, cols int) (*TerminalSession, error) {
|
||||
terminal, err := NewTerminal(cli)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = terminal.RequestPty("xterm-256color", rows, cols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = terminal.Shell()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
tick := time.NewTicker(time.Millisecond * time.Duration(60))
|
||||
ts := &TerminalSession{
|
||||
ID: sessionId,
|
||||
wsConn: ws,
|
||||
terminal: terminal,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
dataChan: make(chan rune),
|
||||
tick: tick,
|
||||
}
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (r TerminalSession) Start() {
|
||||
go r.readFormTerminal()
|
||||
go r.writeToWebsocket()
|
||||
r.receiveWsMsg()
|
||||
}
|
||||
|
||||
func (r TerminalSession) Stop() {
|
||||
global.Log.Debug("close machine ssh terminal session")
|
||||
r.tick.Stop()
|
||||
r.cancel()
|
||||
if r.terminal != nil {
|
||||
if err := r.terminal.Close(); err != nil {
|
||||
global.Log.Errorf("关闭机器ssh终端失败: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ts TerminalSession) readFormTerminal() {
|
||||
for {
|
||||
select {
|
||||
case <-ts.ctx.Done():
|
||||
return
|
||||
default:
|
||||
rn, size, err := ts.terminal.ReadRune()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
global.Log.Error("机器ssh终端读取消息失败: ", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if size > 0 {
|
||||
ts.dataChan <- rn
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ts TerminalSession) writeToWebsocket() {
|
||||
var buf []byte
|
||||
for {
|
||||
select {
|
||||
case <-ts.ctx.Done():
|
||||
return
|
||||
case <-ts.tick.C:
|
||||
if len(buf) > 0 {
|
||||
s := string(buf)
|
||||
if err := WriteMessage(ts.wsConn, s); err != nil {
|
||||
global.Log.Error("机器ssh终端发送消息至websocket失败: ", err)
|
||||
return
|
||||
}
|
||||
buf = []byte{}
|
||||
}
|
||||
case data := <-ts.dataChan:
|
||||
if data != utf8.RuneError {
|
||||
p := make([]byte, utf8.RuneLen(data))
|
||||
utf8.EncodeRune(p, data)
|
||||
buf = append(buf, p...)
|
||||
} else {
|
||||
buf = append(buf, []byte("@")...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type WsMsg struct {
|
||||
Type int `json:"type"`
|
||||
Msg string `json:"msg"`
|
||||
Cols int `json:"cols"`
|
||||
Rows int `json:"rows"`
|
||||
}
|
||||
|
||||
func (ts *TerminalSession) receiveWsMsg() {
|
||||
wsConn := ts.wsConn
|
||||
for {
|
||||
select {
|
||||
case <-ts.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// read websocket msg
|
||||
_, wsData, err := wsConn.ReadMessage()
|
||||
if err != nil {
|
||||
global.Log.Debug("机器ssh终端读取websocket消息失败: ", err)
|
||||
return
|
||||
}
|
||||
// 解析消息
|
||||
msgObj := WsMsg{}
|
||||
if err := json.Unmarshal(wsData, &msgObj); err != nil {
|
||||
global.Log.Error("机器ssh终端消息解析失败: ", err)
|
||||
}
|
||||
switch msgObj.Type {
|
||||
case Resize:
|
||||
if msgObj.Cols > 0 && msgObj.Rows > 0 {
|
||||
if err := ts.terminal.WindowChange(msgObj.Rows, msgObj.Cols); err != nil {
|
||||
global.Log.Error("ssh pty change windows size failed")
|
||||
}
|
||||
}
|
||||
case Data:
|
||||
_, err := ts.terminal.Write([]byte(msgObj.Msg))
|
||||
if err != nil {
|
||||
global.Log.Debug("机器ssh终端写入消息失败: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WriteMessage(ws *websocket.Conn, msg string) error {
|
||||
return ws.WriteMessage(websocket.TextMessage, []byte(msg))
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
package machine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mayfly-go/pkg/global"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"golang.org/x/crypto/ssh"
|
||||
)
|
||||
|
||||
type safeBuffer struct {
|
||||
buffer bytes.Buffer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (w *safeBuffer) Write(p []byte) (int, error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.buffer.Write(p)
|
||||
}
|
||||
func (w *safeBuffer) Bytes() []byte {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.buffer.Bytes()
|
||||
}
|
||||
func (w *safeBuffer) Reset() {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.buffer.Reset()
|
||||
}
|
||||
|
||||
const (
|
||||
wsMsgCmd = "cmd"
|
||||
wsMsgResize = "resize"
|
||||
)
|
||||
|
||||
type WsMsg struct {
|
||||
Type string `json:"type"`
|
||||
Msg string `json:"msg"`
|
||||
Cols int `json:"cols"`
|
||||
Rows int `json:"rows"`
|
||||
}
|
||||
|
||||
type LogicSshWsSession struct {
|
||||
stdinPipe io.WriteCloser
|
||||
comboOutput *safeBuffer //ssh 终端混合输出
|
||||
inputFilterBuff *safeBuffer //用来过滤输入的命令和ssh_filter配置对比的
|
||||
session *ssh.Session
|
||||
wsConn *websocket.Conn
|
||||
}
|
||||
|
||||
func NewLogicSshWsSession(cols, rows int, cli *Cli, wsConn *websocket.Conn) (*LogicSshWsSession, error) {
|
||||
sshSession, err := cli.GetSession()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stdinP, err := sshSession.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
comboWriter := new(safeBuffer)
|
||||
inputBuf := new(safeBuffer)
|
||||
//ssh.stdout and stderr will write output into comboWriter
|
||||
sshSession.Stdout = comboWriter
|
||||
sshSession.Stderr = comboWriter
|
||||
|
||||
modes := ssh.TerminalModes{
|
||||
ssh.ECHO: 1, // disable echo
|
||||
ssh.TTY_OP_ISPEED: 14400, // input speed = 14.4kbaud
|
||||
ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud
|
||||
}
|
||||
// Request pseudo terminal
|
||||
if err := sshSession.RequestPty("xterm-256color", rows, cols, modes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Start remote shell
|
||||
if err := sshSession.Shell(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LogicSshWsSession{
|
||||
stdinPipe: stdinP,
|
||||
comboOutput: comboWriter,
|
||||
inputFilterBuff: inputBuf,
|
||||
session: sshSession,
|
||||
wsConn: wsConn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//Close 关闭
|
||||
func (sws *LogicSshWsSession) Close() {
|
||||
if sws.session != nil {
|
||||
sws.session.Close()
|
||||
}
|
||||
if sws.comboOutput != nil {
|
||||
sws.comboOutput = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (sws *LogicSshWsSession) Start(quitChan chan bool) {
|
||||
go sws.receiveWsMsg(quitChan)
|
||||
go sws.sendComboOutput(quitChan)
|
||||
}
|
||||
|
||||
//receiveWsMsg receive websocket msg do some handling then write into ssh.session.stdin
|
||||
func (sws *LogicSshWsSession) receiveWsMsg(exitCh chan bool) {
|
||||
wsConn := sws.wsConn
|
||||
//tells other go routine quit
|
||||
defer setQuit(exitCh)
|
||||
for {
|
||||
select {
|
||||
case <-exitCh:
|
||||
return
|
||||
default:
|
||||
//read websocket msg
|
||||
_, wsData, err := wsConn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
|
||||
return
|
||||
}
|
||||
global.Log.Error("reading webSocket message failed: ", err)
|
||||
return
|
||||
}
|
||||
//unmashal bytes into struct
|
||||
msgObj := WsMsg{}
|
||||
if err := json.Unmarshal(wsData, &msgObj); err != nil {
|
||||
global.Log.Error("unmarshal websocket message failed:", err)
|
||||
}
|
||||
switch msgObj.Type {
|
||||
case wsMsgResize:
|
||||
//handle xterm.js size change
|
||||
if msgObj.Cols > 0 && msgObj.Rows > 0 {
|
||||
if err := sws.session.WindowChange(msgObj.Rows, msgObj.Cols); err != nil {
|
||||
global.Log.Error("ssh pty change windows size failed")
|
||||
}
|
||||
}
|
||||
case wsMsgCmd:
|
||||
sws.sendWebsocketInputCommandToSshSessionStdinPipe([]byte(msgObj.Msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//sendWebsocketInputCommandToSshSessionStdinPipe
|
||||
func (sws *LogicSshWsSession) sendWebsocketInputCommandToSshSessionStdinPipe(cmdBytes []byte) {
|
||||
if _, err := sws.stdinPipe.Write(cmdBytes); err != nil {
|
||||
global.Log.Error("ws cmd bytes write to ssh.stdin pipe failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (sws *LogicSshWsSession) sendComboOutput(exitCh chan bool) {
|
||||
wsConn := sws.wsConn
|
||||
//todo 优化成一个方法
|
||||
//tells other go routine quit
|
||||
defer setQuit(exitCh)
|
||||
|
||||
//every 120ms write combine output bytes into websocket response
|
||||
tick := time.NewTicker(time.Millisecond * time.Duration(60))
|
||||
//for range time.Tick(120 * time.Millisecond){}
|
||||
defer tick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
if sws.comboOutput == nil {
|
||||
return
|
||||
}
|
||||
bs := sws.comboOutput.Bytes()
|
||||
if len(bs) > 0 {
|
||||
err := wsConn.WriteMessage(websocket.TextMessage, bs)
|
||||
if err != nil {
|
||||
global.Log.Error("ssh sending combo output to webSocket failed")
|
||||
}
|
||||
sws.comboOutput.buffer.Reset()
|
||||
}
|
||||
|
||||
case <-exitCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sws *LogicSshWsSession) Wait(quitChan chan bool) {
|
||||
if err := sws.session.Wait(); err != nil {
|
||||
setQuit(quitChan)
|
||||
}
|
||||
}
|
||||
|
||||
func setQuit(ch chan bool) {
|
||||
ch <- true
|
||||
}
|
||||
Reference in New Issue
Block a user