feat: 标签支持拖拽移动与机器支持执行命令查看

This commit is contained in:
meilin.huang
2024-04-21 19:35:58 +08:00
parent 44805ce580
commit ebe73e2f19
21 changed files with 595 additions and 118 deletions

View File

@@ -13,6 +13,7 @@ import (
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
"mayfly-go/pkg/utils/jsonx"
"mayfly-go/pkg/utils/stringx"
"os"
"path"
@@ -76,7 +77,24 @@ func (m *machineTermOpAppImpl) TermConn(ctx context.Context, cli *mcm.Cli, wsCon
recorder = mcm.NewRecorder(f)
}
mts, err := mcm.NewTerminalSession(stringx.Rand(16), wsConn, cli, rows, cols, recorder)
createTsParam := &mcm.CreateTerminalSessionParam{
SessionId: stringx.Rand(16),
Cli: cli,
WsConn: wsConn,
Rows: rows,
Cols: cols,
Recorder: recorder,
LogCmd: cli.Info.EnableRecorder == 1,
}
// createTsParam.CmdFilterFuncs = []mcm.CmdFilterFunc{func(cmd string) error {
// if strings.HasPrefix(cmd, "rm") {
// return errorx.NewBiz("该命令已被禁用...")
// }
// return nil
// }}
mts, err := mcm.NewTerminalSession(createTsParam)
if err != nil {
return err
}
@@ -87,6 +105,7 @@ func (m *machineTermOpAppImpl) TermConn(ctx context.Context, cli *mcm.Cli, wsCon
if termOpRecord != nil {
now := time.Now()
termOpRecord.EndTime = &now
termOpRecord.ExecCmds = jsonx.ToStr(mts.GetExecCmds())
return m.Insert(ctx, termOpRecord)
}
return nil

View File

@@ -11,6 +11,7 @@ type MachineTermOp struct {
MachineId uint64 `json:"machineId"`
Username string `json:"username"`
RecordFilePath string `json:"recordFilePath"` // 回放文件路径
ExecCmds string `json:"execCmds"` // 执行的命令
CreateTime *time.Time `json:"createTime"`
CreatorId uint64 `json:"creatorId"`

View File

@@ -8,7 +8,8 @@ import (
)
type Terminal struct {
SshSession *ssh.Session
SshSession *ssh.Session
StdinPipe io.WriteCloser
StdoutReader *bufio.Reader
}
@@ -44,7 +45,7 @@ func (t *Terminal) Write(p []byte) (int, error) {
return t.StdinPipe.Write(p)
}
func (t *Terminal) ReadRune() (r rune, size int, err error) {
func (t *Terminal) ReadRune() (rune, int, error) {
return t.StdoutReader.ReadRune()
}

View File

@@ -0,0 +1,214 @@
package mcm
import (
"bytes"
"fmt"
"mayfly-go/pkg/errorx"
"strings"
"time"
"github.com/veops/go-ansiterm"
)
// 命令过滤函数若返回error则不执行该命令
type CmdFilterFunc func(cmd string) error
const (
CR = 0x0d // 单个字节 13 通常表示发送一个 CRCarriage Return回车字符 \r
EOT = 0x03 // 通过向标准输入发送单个字节 3 通常表示发送一个 EOTEnd of Transmission信号。EOT 是一种控制字符,在通信中用于指示数据传输的结束。发送 EOT 信号可以被用来终止当前的交互或数据传输
)
type ExecutedCmd struct {
Cmd string `json:"cmd"` // 执行的命令
Time int64 `json:"time"` // 执行时间戳
}
// TerminalHandler 终端处理器
type TerminalHandler struct {
Filters []CmdFilterFunc
ExecutedCmds []*ExecutedCmd // 已执行的命令
Parser *Parser
}
// PreWriteHandle 写入数据至终端前的处理,可进行过滤等操作
func (tf *TerminalHandler) PreWriteHandle(p []byte) error {
tf.Parser.AppendInputData(p)
// 不是回车命令,则表示命令未结束
if bytes.LastIndex(p, []byte{CR}) != 0 {
return nil
}
// time.Sleep(time.Millisecond * 30)
command := tf.Parser.GetCmd()
// 重置终端输入输出
tf.Parser.Reset()
if command == "" {
return nil
}
// 执行命令过滤器
for _, filter := range tf.Filters {
if err := filter(command); err != nil {
msg := fmt.Sprintf("\r\n%s%s", tf.Parser.Ps1, GetErrorContent(err.Error()))
return errorx.NewBiz(msg)
}
}
// 记录执行命令
tf.ExecutedCmds = append(tf.ExecutedCmds, &ExecutedCmd{
Cmd: command,
Time: time.Now().Unix(),
})
return nil
}
// HandleRead 处理从终端读取的数据进行操作
func (tf *TerminalHandler) HandleRead(data []byte) error {
tf.Parser.AppendOutData(data)
return nil
}
type Parser struct {
Output *ansiterm.ByteStream
InputData []byte
OutputData []byte
Ps1 string
vimState bool
commandState bool
}
func NewParser(width, height int) *Parser {
return &Parser{
Output: NewParserByteStream(width, height),
vimState: false,
commandState: true,
}
}
func NewParserByteStream(width, height int) *ansiterm.ByteStream {
screen := ansiterm.NewScreen(width, height)
stream := ansiterm.InitByteStream(screen, false)
stream.Attach(screen)
return stream
}
var (
enterMarks = [][]byte{
[]byte("\x1b[?1049h"),
[]byte("\x1b[?1048h"),
[]byte("\x1b[?1047h"),
[]byte("\x1b[?47h"),
[]byte("\x1b[?25l"),
}
exitMarks = [][]byte{
[]byte("\x1b[?1049l"),
[]byte("\x1b[?1048l"),
[]byte("\x1b[?1047l"),
[]byte("\x1b[?47l"),
}
screenMarks = [][]byte{
{0x1b, 0x5b, 0x4b, 0x0d, 0x0a},
{0x1b, 0x5b, 0x34, 0x6c},
}
)
func (p *Parser) AppendInputData(data []byte) {
if len(p.InputData) == 0 {
// 如 "root@cloud-s0ervh-hh87:~# " 获取前一段用户名等提示内容
p.Ps1 = p.GetOutput()
}
p.InputData = append(p.InputData, data...)
}
func (p *Parser) AppendOutData(data []byte) {
// 非编辑等状态,则追加输出内容
if !p.State(data) {
p.OutputData = append(p.OutputData, data...)
}
}
// GetCmd 获取执行的命令
func (p *Parser) GetCmd() string {
// "root@cloud-s0ervh-hh87:~# ls"
s := p.GetOutput()
// Ps1 = "root@cloud-s0ervh-hh87:~# "
return strings.TrimPrefix(s, p.Ps1)
}
func (p *Parser) Reset() {
p.Output.Listener.Reset()
p.OutputData = nil
p.InputData = nil
}
func (p *Parser) GetOutput() string {
p.Output.Feed(p.OutputData)
res := parseOutput(p.Output.Listener.Display())
if len(res) == 0 {
return ""
}
return res[len(res)-1]
}
func parseOutput(data []string) (output []string) {
for _, line := range data {
if strings.TrimSpace(line) != "" {
output = append(output, line)
}
}
return output
}
func (p *Parser) State(b []byte) bool {
if !p.vimState && IsEditEnterMode(b) {
if !isNewScreen(b) {
p.vimState = true
p.commandState = false
}
}
if p.vimState && IsEditExitMode(b) {
// 重置终端输入输出
p.Reset()
p.vimState = false
p.commandState = true
}
return p.vimState
}
func isNewScreen(p []byte) bool {
return matchMark(p, screenMarks)
}
func IsEditEnterMode(p []byte) bool {
return matchMark(p, enterMarks)
}
func IsEditExitMode(p []byte) bool {
return matchMark(p, exitMarks)
}
func matchMark(p []byte, marks [][]byte) bool {
for _, item := range marks {
if bytes.Contains(p, item) {
return true
}
}
return false
}
// GetErrorContent 包装返回终端错误消息
func GetErrorContent(msg string) string {
return fmt.Sprintf("\033[1;31m%s\033[0m", msg)
}
// GetErrorContentRn 包装返回终端错误消息, 并自动回车换行
func GetErrorContentRn(msg string) string {
return fmt.Sprintf("\r\n%s", GetErrorContent(msg))
}

View File

@@ -2,6 +2,7 @@ package mcm
import (
"context"
"fmt"
"io"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
@@ -27,6 +28,7 @@ type TerminalSession struct {
ID string
wsConn *websocket.Conn
terminal *Terminal
handler *TerminalHandler
recorder *Recorder
ctx context.Context
cancel context.CancelFunc
@@ -34,7 +36,21 @@ type TerminalSession struct {
tick *time.Ticker
}
func NewTerminalSession(sessionId string, ws *websocket.Conn, cli *Cli, rows, cols int, recorder *Recorder) (*TerminalSession, error) {
type CreateTerminalSessionParam struct {
SessionId string
Cli *Cli
WsConn *websocket.Conn
Rows int
Cols int
Recorder *Recorder
LogCmd bool // 是否记录命令
CmdFilterFuncs []CmdFilterFunc // 命令过滤器
}
func NewTerminalSession(param *CreateTerminalSessionParam) (*TerminalSession, error) {
sessionId, rows, cols := param.SessionId, param.Rows, param.Cols
cli, ws, recorder := param.Cli, param.WsConn, param.Recorder
terminal, err := NewTerminal(cli)
if err != nil {
return nil, err
@@ -52,12 +68,19 @@ func NewTerminalSession(sessionId string, ws *websocket.Conn, cli *Cli, rows, co
recorder.WriteHeader(rows-3, cols)
}
var handler *TerminalHandler
// 记录命令或者存在命令过滤器时,则创建对应的终端处理器
if param.LogCmd || param.CmdFilterFuncs != nil {
handler = &TerminalHandler{Parser: NewParser(120, 40), Filters: param.CmdFilterFuncs}
}
ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(time.Millisecond * time.Duration(60))
ts := &TerminalSession{
ID: sessionId,
wsConn: ws,
terminal: terminal,
handler: handler,
recorder: recorder,
ctx: ctx,
cancel: cancel,
@@ -66,7 +89,7 @@ func NewTerminalSession(sessionId string, ws *websocket.Conn, cli *Cli, rows, co
}
// 清除终端内容
WriteMessage(ws, "\033[2J\033[3J\033[1;1H")
ts.WriteToWs("\033[2J\033[3J\033[1;1H")
return ts, nil
}
@@ -89,6 +112,14 @@ func (r TerminalSession) Stop() {
}
}
// 获取终端会话执行的所有命令
func (r TerminalSession) GetExecCmds() []*ExecutedCmd {
if r.handler != nil {
return r.handler.ExecutedCmds
}
return []*ExecutedCmd{}
}
func (ts TerminalSession) readFromTerminal() {
for {
select {
@@ -116,20 +147,27 @@ func (ts TerminalSession) writeToWebsocket() {
case <-ts.ctx.Done():
return
case <-ts.tick.C:
if len(buf) > 0 {
s := string(buf)
if err := WriteMessage(ts.wsConn, s); err != nil {
logx.Error("机器ssh终端发送消息至websocket失败: ", err)
return
}
// 如果记录器存在,则记录操作回放信息
if ts.recorder != nil {
ts.recorder.Lock()
ts.recorder.WriteData(OutPutType, s)
ts.recorder.Unlock()
}
buf = []byte{}
if len(buf) == 0 {
continue
}
if ts.handler != nil {
ts.handler.HandleRead(buf)
}
s := string(buf)
if err := ts.WriteToWs(s); err != nil {
logx.Error("机器ssh终端发送消息至websocket失败: ", err)
return
}
// 如果记录器存在,则记录操作回放信息
if ts.recorder != nil {
ts.recorder.Lock()
ts.recorder.WriteData(OutPutType, s)
ts.recorder.Unlock()
}
buf = []byte{}
case data := <-ts.dataChan:
if data != utf8.RuneError {
p := make([]byte, utf8.RuneLen(data))
@@ -149,16 +187,15 @@ type WsMsg struct {
Rows int `json:"rows"`
}
// 接收客户端ws发送过来的消息并写入终端会话中。
// receiveWsMsg 接收客户端ws发送过来的消息并写入终端会话中。
func (ts *TerminalSession) receiveWsMsg() {
wsConn := ts.wsConn
for {
select {
case <-ts.ctx.Done():
return
default:
// read websocket msg
_, wsData, err := wsConn.ReadMessage()
_, wsData, err := ts.wsConn.ReadMessage()
if err != nil {
logx.Debugf("机器ssh终端读取websocket消息失败: %s", err.Error())
return
@@ -166,7 +203,7 @@ func (ts *TerminalSession) receiveWsMsg() {
// 解析消息
msgObj, err := parseMsg(wsData)
if err != nil {
WriteMessage(wsConn, "\r\n\033[1;31m提示: 消息内容解析失败...\033[0m")
ts.WriteToWs(GetErrorContentRn("消息内容解析失败..."))
logx.Error("机器ssh终端消息解析失败: ", err)
return
}
@@ -179,14 +216,25 @@ func (ts *TerminalSession) receiveWsMsg() {
}
}
case Data:
data := []byte(msgObj.Msg)
if ts.handler != nil {
if err := ts.handler.PreWriteHandle(data); err != nil {
ts.WriteToWs(err.Error())
// 发送命令终止指令
ts.terminal.Write([]byte{EOT})
continue
}
}
_, err := ts.terminal.Write([]byte(msgObj.Msg))
if err != nil {
logx.Debugf("机器ssh终端写入消息失败: %s", err)
logx.Errorf("写入数据至ssh终端失败: %s", err)
ts.WriteToWs(GetErrorContentRn(fmt.Sprintf("写入数据至ssh终端失败: %s", err.Error())))
}
case Ping:
_, err := ts.terminal.SshSession.SendRequest("ping", true, nil)
if err != nil {
WriteMessage(wsConn, "\r\n\033[1;31m提示: 终端连接已断开...\033[0m")
ts.WriteToWs(GetErrorContentRn("终端连接已断开..."))
return
}
}
@@ -194,8 +242,9 @@ func (ts *TerminalSession) receiveWsMsg() {
}
}
func WriteMessage(ws *websocket.Conn, msg string) error {
return ws.WriteMessage(websocket.TextMessage, []byte(msg))
// WriteToWs 将消息写入websocket连接
func (ts *TerminalSession) WriteToWs(msg string) error {
return ts.wsConn.WriteMessage(websocket.TextMessage, []byte(msg))
}
// 解析消息