refactor: machine包调整

This commit is contained in:
meilin.huang
2023-10-30 17:34:56 +08:00
parent 12f63ef3dd
commit f2b0f294d8
19 changed files with 382 additions and 390 deletions

View File

@@ -159,7 +159,7 @@
</template>
<script lang="ts" setup>
import { ref, toRefs, reactive, onMounted, defineAsyncComponent, nextTick } from 'vue';
import { ref, toRefs, reactive, onMounted, defineAsyncComponent } from 'vue';
import { useRouter } from 'vue-router';
import { ElMessage, ElMessageBox } from 'element-plus';
import { machineApi, getMachineTerminalSocketUrl } from './api';
@@ -358,7 +358,9 @@ const deleteMachine = async () => {
await machineApi.del.request({ id: state.selectionData.map((x: any) => x.id).join(',') });
ElMessage.success('操作成功');
search();
} catch (err) {}
} catch (err) {
//
}
};
const serviceManager = (row: any) => {

View File

@@ -3,7 +3,7 @@ package dbm
import (
"fmt"
"mayfly-go/internal/common/consts"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/logx"
"sync"
@@ -19,7 +19,7 @@ var connCache = cache.NewTimedCache(consts.DbConnExpireTime, 5*time.Second).
})
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
mcm.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有db连接实例若存在db实例使用该ssh隧道机器则返回true表示还在使用中...
items := connCache.Items()
for _, v := range items {

View File

@@ -7,7 +7,7 @@ import (
"mayfly-go/internal/machine/api/vo"
"mayfly-go/internal/machine/application"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
tagapp "mayfly-go/internal/tag/application"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/config"
@@ -54,7 +54,7 @@ func (m *Machine) Machines(rc *req.Ctx) {
}
for _, mv := range *res.List {
mv.HasCli = machine.HasCli(mv.Id)
mv.HasCli = mcm.HasCli(mv.Id)
}
rc.ResData = res
}
@@ -109,7 +109,7 @@ func (m *Machine) DeleteMachine(rc *req.Ctx) {
// 关闭机器客户端
func (m *Machine) CloseCli(rc *req.Ctx) {
machine.DeleteCli(GetMachineId(rc.GinCtx))
mcm.DeleteCli(GetMachineId(rc.GinCtx))
}
// 获取进程列表信息
@@ -133,7 +133,7 @@ func (m *Machine) GetProcess(rc *req.Ctx) {
cli, err := m.MachineApp.GetCli(GetMachineId(rc.GinCtx))
biz.ErrIsNilAppendErr(err, "获取客户端连接失败: %s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.GetMachine().TagPath), "%s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.Info.TagPath), "%s")
res, err := cli.Run(cmd)
biz.ErrIsNilAppendErr(err, "获取进程信息失败: %s")
@@ -147,7 +147,7 @@ func (m *Machine) KillProcess(rc *req.Ctx) {
cli, err := m.MachineApp.GetCli(GetMachineId(rc.GinCtx))
biz.ErrIsNilAppendErr(err, "获取客户端连接失败: %s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.GetMachine().TagPath), "%s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.Info.TagPath), "%s")
res, err := cli.Run("sudo kill -9 " + pid)
biz.ErrIsNil(err, "终止进程失败: %s", res)
@@ -173,30 +173,30 @@ func (m *Machine) WsSSH(g *gin.Context) {
cli, err := m.MachineApp.GetCli(GetMachineId(g))
biz.ErrIsNilAppendErr(err, "获取客户端连接失败: %s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.GetMachine().TagPath), "%s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.Info.TagPath), "%s")
cols := ginx.QueryInt(g, "cols", 80)
rows := ginx.QueryInt(g, "rows", 40)
var recorder *machine.Recorder
if cli.GetMachine().EnableRecorder == 1 {
var recorder *mcm.Recorder
if cli.Info.EnableRecorder == 1 {
now := time.Now()
// 回放文件路径为: 基础配置路径/机器id/操作日期/操作者账号/操作时间.cast
recPath := fmt.Sprintf("%s/%d/%s/%s", config.Conf.Server.GetMachineRecPath(), cli.GetMachine().Id, now.Format("20060102"), rc.LoginAccount.Username)
recPath := fmt.Sprintf("%s/%d/%s/%s", config.Conf.Server.GetMachineRecPath(), cli.Info.Id, now.Format("20060102"), rc.LoginAccount.Username)
os.MkdirAll(recPath, 0766)
fileName := path.Join(recPath, fmt.Sprintf("%s.cast", now.Format("20060102_150405")))
f, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0766)
biz.ErrIsNilAppendErr(err, "创建终端回放记录文件失败: %s")
defer f.Close()
recorder = machine.NewRecorder(f)
recorder = mcm.NewRecorder(f)
}
mts, err := machine.NewTerminalSession(stringx.Rand(16), wsConn, cli, rows, cols, recorder)
mts, err := mcm.NewTerminalSession(stringx.Rand(16), wsConn, cli, rows, cols, recorder)
biz.ErrIsNilAppendErr(err, "\033[1;31m连接失败: %s\033[0m")
// 记录系统操作日志
rc.WithLog(req.NewLogSave("机器-终端操作"))
rc.ReqParam = cli.GetMachine()
rc.ReqParam = cli.Info
req.LogHandler(rc)
mts.Start()

View File

@@ -8,7 +8,7 @@ import (
"mayfly-go/internal/machine/api/vo"
"mayfly-go/internal/machine/application"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
msgapp "mayfly-go/internal/msg/application"
msgdto "mayfly-go/internal/msg/application/dto"
"mayfly-go/pkg/biz"
@@ -72,7 +72,7 @@ func (m *MachineFile) CreateFile(rc *req.Ctx) {
path := form.Path
attrs := collx.Kvs("path", path)
var mi *machine.Info
var mi *mcm.MachineInfo
var err error
if form.Type == dir {
attrs["type"] = "目录"
@@ -231,7 +231,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
folderName := filepath.Dir(paths[0])
mcli, err := m.MachineFileApp.GetMachineCli(fid, basePath+"/"+folderName)
biz.ErrIsNil(err)
mi := mcli.GetMachine()
mi := mcli.Info
sftpCli, err := mcli.GetSftpCli()
biz.ErrIsNil(err)
@@ -261,6 +261,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
// 设置要等待的协程数量
wg.Add(len(chunks))
isSuccess := true
la := rc.LoginAccount
for _, chunk := range chunks {
go func(files []FolderFile, wg *sync.WaitGroup) {
@@ -268,6 +269,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
// 协程执行完成后调用Done方法
wg.Done()
if err := recover(); err != nil {
isSuccess = false
logx.Errorf("文件上传失败: %s", err)
switch t := err.(type) {
case errorx.BizError:
@@ -294,8 +296,10 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
// 等待所有协程执行完成
wg.Wait()
// 保存消息并发送文件上传成功通知
m.MsgApp.CreateAndSend(rc.LoginAccount, msgdto.SuccessSysMsg("文件上传成功", fmt.Sprintf("[%s]文件夹已成功上传至 %s[%s:%s]", folderName, mi.Name, mi.Ip, basePath)))
if isSuccess {
// 保存消息并发送文件上传成功通知
m.MsgApp.CreateAndSend(rc.LoginAccount, msgdto.SuccessSysMsg("文件上传成功", fmt.Sprintf("[%s]文件夹已成功上传至 %s[%s:%s]", folderName, mi.Name, mi.Ip, basePath)))
}
}
func (m *MachineFile) RemoveFile(rc *req.Ctx) {

View File

@@ -70,11 +70,11 @@ func (m *MachineScript) RunMachineScript(rc *req.Ctx) {
}
cli, err := m.MachineApp.GetCli(machineId)
biz.ErrIsNilAppendErr(err, "获取客户端连接失败: %s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.GetMachine().TagPath), "%s")
biz.ErrIsNilAppendErr(m.TagApp.CanAccess(rc.LoginAccount.Id, cli.Info.TagPath), "%s")
res, err := cli.Run(script)
// 记录请求参数
rc.ReqParam = collx.Kvs("machine", cli.GetMachine(), "scriptId", scriptId, "name", ms.Name)
rc.ReqParam = collx.Kvs("machine", cli.Info, "scriptId", scriptId, "name", ms.Name)
if res == "" {
biz.ErrIsNilAppendErr(err, "执行命令失败:%s")
}

View File

@@ -4,7 +4,7 @@ import (
"mayfly-go/internal/machine/api/vo"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/internal/machine/domain/repository"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gormx"
@@ -32,10 +32,10 @@ type Machine interface {
GetMachineList(condition *entity.MachineQuery, pageParam *model.PageParam, toEntity *[]*vo.MachineVO, orderBy ...string) (*model.PageResult[*[]*vo.MachineVO], error)
// 获取机器连接
GetCli(id uint64) (*machine.Cli, error)
GetCli(id uint64) (*mcm.Cli, error)
// 获取ssh隧道机器连接
GetSshTunnelMachine(id int) (*machine.SshTunnelMachine, error)
GetSshTunnelMachine(id int) (*mcm.SshTunnelMachine, error)
}
func newMachineApp(machineRepo repository.Machine, authCertApp AuthCert) Machine {
@@ -84,7 +84,7 @@ func (m *machineAppImpl) Save(me *entity.Machine) error {
}
// 关闭连接
machine.DeleteCli(me.Id)
mcm.DeleteCli(me.Id)
return m.UpdateById(me)
}
@@ -94,16 +94,18 @@ func (m *machineAppImpl) TestConn(me *entity.Machine) error {
if err != nil {
return err
}
return machine.TestConn(*mi, func(u uint64) (*machine.Info, error) {
return m.toMachineInfoById(u)
})
cli, err := mi.Conn()
if err != nil {
return err
}
cli.Close()
return nil
}
func (m *machineAppImpl) ChangeStatus(id uint64, status int8) error {
if status == entity.MachineStatusDisable {
// 关闭连接
machine.DeleteCli(id)
mcm.DeleteCli(id)
}
machine := new(entity.Machine)
machine.Id = id
@@ -114,7 +116,7 @@ func (m *machineAppImpl) ChangeStatus(id uint64, status int8) error {
// 根据条件获取机器信息
func (m *machineAppImpl) Delete(id uint64) error {
// 关闭连接
machine.DeleteCli(id)
mcm.DeleteCli(id)
return gormx.Tx(
func(db *gorm.DB) error {
// 删除machine表信息
@@ -131,20 +133,20 @@ func (m *machineAppImpl) Delete(id uint64) error {
)
}
func (m *machineAppImpl) GetCli(machineId uint64) (*machine.Cli, error) {
return machine.GetCli(machineId, func(mid uint64) (*machine.Info, error) {
func (m *machineAppImpl) GetCli(machineId uint64) (*mcm.Cli, error) {
return mcm.GetMachineCli(machineId, func(mid uint64) (*mcm.MachineInfo, error) {
return m.toMachineInfoById(mid)
})
}
func (m *machineAppImpl) GetSshTunnelMachine(machineId int) (*machine.SshTunnelMachine, error) {
return machine.GetSshTunnelMachine(machineId, func(mid uint64) (*machine.Info, error) {
func (m *machineAppImpl) GetSshTunnelMachine(machineId int) (*mcm.SshTunnelMachine, error) {
return mcm.GetSshTunnelMachine(machineId, func(mid uint64) (*mcm.MachineInfo, error) {
return m.toMachineInfoById(mid)
})
}
// 生成机器信息根据授权凭证id填充用户密码等
func (m *machineAppImpl) toMachineInfoById(machineId uint64) (*machine.Info, error) {
func (m *machineAppImpl) toMachineInfoById(machineId uint64) (*mcm.MachineInfo, error) {
me, err := m.GetById(new(entity.Machine), machineId)
if err != nil {
return nil, errorx.NewBiz("机器信息不存在")
@@ -153,20 +155,22 @@ func (m *machineAppImpl) toMachineInfoById(machineId uint64) (*machine.Info, err
return nil, errorx.NewBiz("该机器已被停用")
}
return m.toMachineInfo(me)
if mi, err := m.toMachineInfo(me); err != nil {
return nil, err
} else {
return mi, nil
}
}
func (m *machineAppImpl) toMachineInfo(me *entity.Machine) (*machine.Info, error) {
mi := new(machine.Info)
func (m *machineAppImpl) toMachineInfo(me *entity.Machine) (*mcm.MachineInfo, error) {
mi := new(mcm.MachineInfo)
mi.Id = me.Id
mi.Name = me.Name
mi.Ip = me.Ip
mi.Port = me.Port
mi.Username = me.Username
mi.TagId = me.TagId
mi.TagPath = me.TagPath
mi.EnableRecorder = me.EnableRecorder
mi.SshTunnelMachineId = me.SshTunnelMachineId
if me.UseAuthCert() {
ac, err := m.authCertApp.GetById(new(entity.AuthCert), uint64(me.AuthCertId))
@@ -184,5 +188,18 @@ func (m *machineAppImpl) toMachineInfo(me *entity.Machine) (*machine.Info, error
}
mi.Password = me.Password
}
// 使用了ssh隧道则将隧道机器信息也附上
if me.SshTunnelMachineId > 0 {
sshTunnelMe, err := m.GetById(new(entity.Machine), uint64(me.SshTunnelMachineId))
if err != nil {
return nil, errorx.NewBiz("隧道机器信息不存在")
}
sshTunnelMi, err := m.toMachineInfo(sshTunnelMe)
if err != nil {
return nil, err
}
mi.SshTunnelMachine = sshTunnelMi
}
return mi, nil
}

View File

@@ -7,7 +7,7 @@ import (
"io/fs"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/internal/machine/domain/repository"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
@@ -32,18 +32,18 @@ type MachineFile interface {
Delete(id uint64) error
// 获取文件关联的机器信息,主要用于记录日志使用
// GetMachine(fileId uint64) *machine.Info
// GetMachine(fileId uint64) *mcm.Info
// 检查文件路径并返回机器id
GetMachineCli(fileId uint64, path ...string) (*machine.Cli, error)
GetMachineCli(fileId uint64, path ...string) (*mcm.Cli, error)
/** sftp 相关操作 **/
// 创建目录
MkDir(fid uint64, path string) (*machine.Info, error)
MkDir(fid uint64, path string) (*mcm.MachineInfo, error)
// 创建文件
CreateFile(fid uint64, path string) (*machine.Info, error)
CreateFile(fid uint64, path string) (*mcm.MachineInfo, error)
// 读取目录
ReadDir(fid uint64, path string) ([]fs.FileInfo, error)
@@ -55,22 +55,22 @@ type MachineFile interface {
FileStat(fid uint64, path string) (string, error)
// 读取文件内容
ReadFile(fileId uint64, path string) (*sftp.File, *machine.Info, error)
ReadFile(fileId uint64, path string) (*sftp.File, *mcm.MachineInfo, error)
// 写文件
WriteFileContent(fileId uint64, path string, content []byte) (*machine.Info, error)
WriteFileContent(fileId uint64, path string, content []byte) (*mcm.MachineInfo, error)
// 文件上传
UploadFile(fileId uint64, path, filename string, reader io.Reader) (*machine.Info, error)
UploadFile(fileId uint64, path, filename string, reader io.Reader) (*mcm.MachineInfo, error)
// 移除文件
RemoveFile(fileId uint64, path ...string) (*machine.Info, error)
RemoveFile(fileId uint64, path ...string) (*mcm.MachineInfo, error)
Copy(fileId uint64, toPath string, paths ...string) (*machine.Info, error)
Copy(fileId uint64, toPath string, paths ...string) (*mcm.MachineInfo, error)
Mv(fileId uint64, toPath string, paths ...string) (*machine.Info, error)
Mv(fileId uint64, toPath string, paths ...string) (*mcm.MachineInfo, error)
Rename(fileId uint64, oldname string, newname string) (*machine.Info, error)
Rename(fileId uint64, oldname string, newname string) (*mcm.MachineInfo, error)
}
func newMachineFileApp(machineFileRepo repository.MachineFile, machineApp Machine) MachineFile {
@@ -163,7 +163,7 @@ func (m *machineFileAppImpl) FileStat(fid uint64, path string) (string, error) {
return mcli.Run(fmt.Sprintf("stat -L %s", path))
}
func (m *machineFileAppImpl) MkDir(fid uint64, path string) (*machine.Info, error) {
func (m *machineFileAppImpl) MkDir(fid uint64, path string) (*mcm.MachineInfo, error) {
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
@@ -177,7 +177,7 @@ func (m *machineFileAppImpl) MkDir(fid uint64, path string) (*machine.Info, erro
return mi, err
}
func (m *machineFileAppImpl) CreateFile(fid uint64, path string) (*machine.Info, error) {
func (m *machineFileAppImpl) CreateFile(fid uint64, path string) (*mcm.MachineInfo, error) {
mi, sftpCli, err := m.GetMachineSftpCli(fid, path)
if err != nil {
return nil, err
@@ -191,7 +191,7 @@ func (m *machineFileAppImpl) CreateFile(fid uint64, path string) (*machine.Info,
return mi, err
}
func (m *machineFileAppImpl) ReadFile(fileId uint64, path string) (*sftp.File, *machine.Info, error) {
func (m *machineFileAppImpl) ReadFile(fileId uint64, path string) (*sftp.File, *mcm.MachineInfo, error) {
mi, sftpCli, err := m.GetMachineSftpCli(fileId, path)
if err != nil {
return nil, nil, err
@@ -203,7 +203,7 @@ func (m *machineFileAppImpl) ReadFile(fileId uint64, path string) (*sftp.File, *
}
// 写文件内容
func (m *machineFileAppImpl) WriteFileContent(fileId uint64, path string, content []byte) (*machine.Info, error) {
func (m *machineFileAppImpl) WriteFileContent(fileId uint64, path string, content []byte) (*mcm.MachineInfo, error) {
mi, sftpCli, err := m.GetMachineSftpCli(fileId, path)
if err != nil {
return nil, err
@@ -219,7 +219,7 @@ func (m *machineFileAppImpl) WriteFileContent(fileId uint64, path string, conten
}
// 上传文件
func (m *machineFileAppImpl) UploadFile(fileId uint64, path, filename string, reader io.Reader) (*machine.Info, error) {
func (m *machineFileAppImpl) UploadFile(fileId uint64, path, filename string, reader io.Reader) (*mcm.MachineInfo, error) {
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
@@ -239,12 +239,12 @@ func (m *machineFileAppImpl) UploadFile(fileId uint64, path, filename string, re
}
// 删除文件
func (m *machineFileAppImpl) RemoveFile(fileId uint64, path ...string) (*machine.Info, error) {
func (m *machineFileAppImpl) RemoveFile(fileId uint64, path ...string) (*mcm.MachineInfo, error) {
mcli, err := m.GetMachineCli(fileId, path...)
if err != nil {
return nil, err
}
minfo := mcli.GetMachine()
minfo := mcli.Info
// 优先使用命令删除速度快sftp需要递归遍历删除子文件等
res, err := mcli.Run(fmt.Sprintf("rm -rf %s", strings.Join(path, " ")))
@@ -267,13 +267,13 @@ func (m *machineFileAppImpl) RemoveFile(fileId uint64, path ...string) (*machine
return minfo, err
}
func (m *machineFileAppImpl) Copy(fileId uint64, toPath string, paths ...string) (*machine.Info, error) {
func (m *machineFileAppImpl) Copy(fileId uint64, toPath string, paths ...string) (*mcm.MachineInfo, error) {
mcli, err := m.GetMachineCli(fileId, paths...)
if err != nil {
return nil, err
}
mi := mcli.GetMachine()
mi := mcli.Info
res, err := mcli.Run(fmt.Sprintf("cp -r %s %s", strings.Join(paths, " "), toPath))
if err != nil {
return mi, errors.New(res)
@@ -281,13 +281,13 @@ func (m *machineFileAppImpl) Copy(fileId uint64, toPath string, paths ...string)
return mi, err
}
func (m *machineFileAppImpl) Mv(fileId uint64, toPath string, paths ...string) (*machine.Info, error) {
func (m *machineFileAppImpl) Mv(fileId uint64, toPath string, paths ...string) (*mcm.MachineInfo, error) {
mcli, err := m.GetMachineCli(fileId, paths...)
if err != nil {
return nil, err
}
mi := mcli.GetMachine()
mi := mcli.Info
res, err := mcli.Run(fmt.Sprintf("mv %s %s", strings.Join(paths, " "), toPath))
if err != nil {
return mi, errorx.NewBiz(res)
@@ -295,7 +295,7 @@ func (m *machineFileAppImpl) Mv(fileId uint64, toPath string, paths ...string) (
return mi, err
}
func (m *machineFileAppImpl) Rename(fileId uint64, oldname string, newname string) (*machine.Info, error) {
func (m *machineFileAppImpl) Rename(fileId uint64, oldname string, newname string) (*mcm.MachineInfo, error) {
mi, sftpCli, err := m.GetMachineSftpCli(fileId, newname)
if err != nil {
return nil, err
@@ -304,7 +304,7 @@ func (m *machineFileAppImpl) Rename(fileId uint64, oldname string, newname strin
}
// 获取文件机器cli
func (m *machineFileAppImpl) GetMachineCli(fid uint64, inputPath ...string) (*machine.Cli, error) {
func (m *machineFileAppImpl) GetMachineCli(fid uint64, inputPath ...string) (*mcm.Cli, error) {
mf := m.GetById(fid)
if mf == nil {
return nil, errorx.NewBiz("文件不存在")
@@ -320,7 +320,7 @@ func (m *machineFileAppImpl) GetMachineCli(fid uint64, inputPath ...string) (*ma
}
// 获取文件机器 sftp cli
func (m *machineFileAppImpl) GetMachineSftpCli(fid uint64, inputPath ...string) (*machine.Info, *sftp.Client, error) {
func (m *machineFileAppImpl) GetMachineSftpCli(fid uint64, inputPath ...string) (*mcm.MachineInfo, *sftp.Client, error) {
mcli, err := m.GetMachineCli(fid, inputPath...)
if err != nil {
return nil, nil, err
@@ -331,5 +331,5 @@ func (m *machineFileAppImpl) GetMachineSftpCli(fid uint64, inputPath ...string)
return nil, nil, err
}
return mcli.GetMachine(), sftpCli, nil
return mcli.Info, sftpCli, nil
}

View File

@@ -1,291 +0,0 @@
package machine
import (
"fmt"
"mayfly-go/internal/common/consts"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"net"
"time"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)
// 机器信息
type Info struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Ip string `json:"ip"` // IP地址
Port int `json:"-"` // 端口号
AuthMethod int8 `json:"-"` // 授权认证方式
Username string `json:"-"` // 用户名
Password string `json:"-"`
Passphrase string `json:"-"` // 私钥口令
Status int8 `json:"-"` // 状态 1:启用2:停用
SshTunnelMachineId int `json:"-"` // ssh隧道机器id
EnableRecorder int8 `json:"-"` // 是否启用终端回放记录
TagId uint64 `json:"-"`
TagPath string `json:"tagPath"`
}
func (m *Info) UseSshTunnel() bool {
return m.SshTunnelMachineId > 0
}
// 获取记录日志的描述
func (m *Info) GetLogDesc() string {
return fmt.Sprintf("Machine[id=%d, tag=%s, name=%s, ip=%s:%d]", m.Id, m.TagPath, m.Name, m.Ip, m.Port)
}
// 客户端信息
type Cli struct {
machine *Info
client *ssh.Client // ssh客户端
sftpClient *sftp.Client // sftp客户端
sshTunnelMachineId int
}
// 连接
func (c *Cli) connect() error {
// 如果已经有client则直接返回
if c.client != nil {
return nil
}
m := c.machine
sshClient, err := GetSshClient(m)
if err != nil {
return err
}
c.client = sshClient
return nil
}
// 关闭client并从缓存中移除如果使用隧道则也关闭
func (c *Cli) Close() {
m := c.machine
logx.Info(fmt.Sprintf("关闭机器客户端连接-> id: %d, name: %s, ip: %s", m.Id, m.Name, m.Ip))
if c.client != nil {
c.client.Close()
c.client = nil
}
if c.sftpClient != nil {
c.sftpClient.Close()
c.sftpClient = nil
}
if c.sshTunnelMachineId > 0 {
CloseSshTunnelMachine(c.sshTunnelMachineId, c.machine.Id)
}
}
// 获取sftp client
func (c *Cli) GetSftpCli() (*sftp.Client, error) {
if c.client == nil {
if err := c.connect(); err != nil {
return nil, errorx.NewBiz("连接ssh失败: %s", err.Error())
}
}
sftpclient := c.sftpClient
// 如果sftpClient为nil则连接
if sftpclient == nil {
sc, serr := sftp.NewClient(c.client)
if serr != nil {
return nil, errorx.NewBiz("获取sftp client失败: %s", serr.Error())
}
sftpclient = sc
c.sftpClient = sftpclient
}
return sftpclient, nil
}
// 获取session
func (c *Cli) GetSession() (*ssh.Session, error) {
if c.client == nil {
if err := c.connect(); err != nil {
return nil, err
}
}
return c.client.NewSession()
}
// 执行shell
// @param shell shell脚本命令
// @return 返回执行成功或错误的消息
func (c *Cli) Run(shell string) (string, error) {
session, err := c.GetSession()
if err != nil {
c.Close()
return "", err
}
defer session.Close()
buf, err := session.CombinedOutput(shell)
if err != nil {
return string(buf), err
}
return string(buf), nil
}
func (c *Cli) GetMachine() *Info {
return c.machine
}
// 机器客户端连接缓存,指定时间内没有访问则会被关闭
var cliCache = cache.NewTimedCache(consts.MachineConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(_, value any) {
value.(*Cli).Close()
})
func init() {
AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有机器连接实例若存在机器连接实例使用该ssh隧道机器则返回true表示还在使用中...
items := cliCache.Items()
for _, v := range items {
if v.Value.(*Cli).sshTunnelMachineId == machineId {
return true
}
}
return false
})
}
// 是否存在指定id的客户端连接
func HasCli(machineId uint64) bool {
if _, ok := cliCache.Get(machineId); ok {
return true
}
return false
}
// 删除指定机器客户端,并关闭客户端连接
func DeleteCli(id uint64) {
cliCache.Delete(id)
}
// 从缓存中获取客户端信息,不存在则回调获取机器信息函数,并新建
func GetCli(machineId uint64, getMachine func(uint64) (*Info, error)) (*Cli, error) {
if load, ok := cliCache.Get(machineId); ok {
return load.(*Cli), nil
}
me, err := getMachine(machineId)
if err != nil {
return nil, err
}
err = IfUseSshTunnelChangeIpPort(me, getMachine)
if err != nil {
return nil, errorx.NewBiz("ssh隧道连接失败: %s", err.Error())
}
c, err := newClient(me)
if err != nil {
CloseSshTunnelMachine(me.SshTunnelMachineId, me.Id)
return nil, err
}
c.sshTunnelMachineId = me.SshTunnelMachineId
cliCache.Put(machineId, c)
return c, nil
}
// 测试连接使用传值的方式而非引用。因为如果使用了ssh隧道则ip和端口会变为本地映射地址与端口
func TestConn(me Info, getSshTunnelMachine func(uint64) (*Info, error)) error {
originId := me.Id
if originId == 0 {
// 随机设置一个ip如果使用了隧道则用于临时保存隧道
me.Id = uint64(time.Now().Nanosecond())
}
err := IfUseSshTunnelChangeIpPort(&me, getSshTunnelMachine)
if err != nil {
return fmt.Errorf("ssh隧道连接失败: %s", err.Error())
}
if me.UseSshTunnel() {
defer CloseSshTunnelMachine(me.SshTunnelMachineId, me.Id)
}
sshClient, err := GetSshClient(&me)
if err != nil {
return err
}
defer sshClient.Close()
return nil
}
// 如果使用了ssh隧道则修改机器ip port为暴露的ip port
func IfUseSshTunnelChangeIpPort(me *Info, getMachine func(uint64) (*Info, error)) error {
if !me.UseSshTunnel() {
return nil
}
sshTunnelMachine, err := GetSshTunnelMachine(me.SshTunnelMachineId, func(u uint64) (*Info, error) {
return getMachine(u)
})
if err != nil {
return err
}
exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(me.Id, me.Ip, me.Port)
if err != nil {
return err
}
// 修改机器ip地址
me.Ip = exposeIp
me.Port = exposePort
return nil
}
func GetSshClient(m *Info) (*ssh.Client, error) {
config := &ssh.ClientConfig{
User: m.Username,
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
return nil
},
Timeout: 5 * time.Second,
}
if m.AuthMethod == entity.AuthCertAuthMethodPassword {
config.Auth = []ssh.AuthMethod{ssh.Password(m.Password)}
} else if m.AuthMethod == entity.MachineAuthMethodPublicKey {
var key ssh.Signer
var err error
if len(m.Passphrase) > 0 {
key, err = ssh.ParsePrivateKeyWithPassphrase([]byte(m.Password), []byte(m.Passphrase))
} else {
key, err = ssh.ParsePrivateKey([]byte(m.Password))
}
if err != nil {
return nil, err
}
config.Auth = []ssh.AuthMethod{ssh.PublicKeys(key)}
}
addr := fmt.Sprintf("%s:%d", m.Ip, m.Port)
sshClient, err := ssh.Dial("tcp", addr, config)
if err != nil {
return nil, err
}
return sshClient, nil
}
// 根据机器信息创建客户端对象
func newClient(machine *Info) (*Cli, error) {
if machine == nil {
return nil, errorx.NewBiz("机器不存在")
}
logx.Infof("[%s]机器连接:%s:%d", machine.Name, machine.Ip, machine.Port)
cli := new(Cli)
cli.machine = machine
err := cli.connect()
if err != nil {
return nil, err
}
return cli, nil
}

View File

@@ -0,0 +1,97 @@
package mcm
import (
"fmt"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"strings"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)
// 机器客户端
type Cli struct {
Info *MachineInfo // 机器信息
sshClient *ssh.Client // ssh客户端
sftpClient *sftp.Client // sftp客户端
}
// 获取sftp client
func (c *Cli) GetSftpCli() (*sftp.Client, error) {
if c.sshClient == nil {
return nil, errorx.NewBiz("请先进行机器客户端连接")
}
sftpclient := c.sftpClient
// 如果sftpClient为nil则连接
if sftpclient == nil {
sc, serr := sftp.NewClient(c.sshClient)
if serr != nil {
return nil, errorx.NewBiz("获取sftp client失败: %s", serr.Error())
}
sftpclient = sc
c.sftpClient = sftpclient
}
return sftpclient, nil
}
// 获取session
func (c *Cli) GetSession() (*ssh.Session, error) {
if c.sshClient == nil {
return nil, errorx.NewBiz("请先进行机器客户端连接")
}
return c.sshClient.NewSession()
}
// 执行shell
// @param shell shell脚本命令
// @return 返回执行成功或错误的消息
func (c *Cli) Run(shell string) (string, error) {
session, err := c.GetSession()
if err != nil {
c.Close()
return "", err
}
defer session.Close()
buf, err := session.CombinedOutput(shell)
if err != nil {
return string(buf), err
}
return string(buf), nil
}
// 获取机器的所有状态信息
func (c *Cli) GetAllStats() *Stats {
res, _ := c.Run(StatsShell)
infos := strings.Split(res, "-----")
stats := new(Stats)
getUptime(infos[0], stats)
getHostname(infos[1], stats)
getLoad(infos[2], stats)
getMemInfo(infos[3], stats)
getFSInfo(infos[4], stats)
getInterfaces(infos[5], stats)
getInterfaceInfo(infos[6], stats)
getCPU(infos[7], stats)
return stats
}
// 关闭client并从缓存中移除如果使用隧道则也关闭
func (c *Cli) Close() {
m := c.Info
logx.Info(fmt.Sprintf("关闭机器客户端连接-> id: %d, name: %s, ip: %s", m.Id, m.Name, m.Ip))
if c.sshClient != nil {
c.sshClient.Close()
c.sshClient = nil
}
if c.sftpClient != nil {
c.sftpClient.Close()
c.sftpClient = nil
}
if c.Info.SshTunnelMachine != nil {
logx.Infof("关闭机器的隧道信息: machineId=%d, sshTunnelMachineId=%d", c.Info.Id, c.Info.SshTunnelMachine.Id)
CloseSshTunnelMachine(int(c.Info.SshTunnelMachine.Id), c.Info.Id)
}
}

View File

@@ -0,0 +1,61 @@
package mcm
import (
"mayfly-go/internal/common/consts"
"mayfly-go/pkg/cache"
"time"
)
// 机器客户端连接缓存,指定时间内没有访问则会被关闭
var cliCache = cache.NewTimedCache(consts.MachineConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(_, value any) {
value.(*Cli).Close()
})
func init() {
AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有机器连接实例若存在机器连接实例使用该ssh隧道机器则返回true表示还在使用中...
items := cliCache.Items()
for _, v := range items {
sshTunnelMachine := v.Value.(*Cli).Info.SshTunnelMachine
if sshTunnelMachine != nil && int(sshTunnelMachine.Id) == machineId {
return true
}
}
return false
})
}
// 从缓存中获取客户端信息,不存在则回调获取机器信息函数,并新建
func GetMachineCli(machineId uint64, getMachine func(uint64) (*MachineInfo, error)) (*Cli, error) {
if load, ok := cliCache.Get(machineId); ok {
return load.(*Cli), nil
}
me, err := getMachine(machineId)
if err != nil {
return nil, err
}
c, err := me.Conn()
if err != nil {
return nil, err
}
cliCache.Put(machineId, c)
return c, nil
}
// 是否存在指定id的客户端连接
func HasCli(machineId uint64) bool {
if _, ok := cliCache.Get(machineId); ok {
return true
}
return false
}
// 删除指定机器客户端,并关闭客户端连接
func DeleteCli(id uint64) {
cliCache.Delete(id)
}

View File

@@ -0,0 +1,117 @@
package mcm
import (
"fmt"
"mayfly-go/internal/machine/domain/entity"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"net"
"time"
"golang.org/x/crypto/ssh"
)
// 机器信息
type MachineInfo struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Ip string `json:"ip"` // IP地址
Port int `json:"-"` // 端口号
AuthMethod int8 `json:"-"` // 授权认证方式
Username string `json:"-"` // 用户名
Password string `json:"-"`
Passphrase string `json:"-"` // 私钥口令
SshTunnelMachine *MachineInfo `json:"-"` // ssh隧道机器
EnableRecorder int8 `json:"-"` // 是否启用终端回放记录
TagPath string `json:"tagPath"`
}
func (m *MachineInfo) UseSshTunnel() bool {
return m.SshTunnelMachine != nil
}
// 连接
func (mi *MachineInfo) Conn() (*Cli, error) {
logx.Infof("[%s]机器连接:%s:%d", mi.Name, mi.Ip, mi.Port)
// 如果使用了ssh隧道则修改机器ip port为暴露的ip port
err := mi.IfUseSshTunnelChangeIpPort()
if err != nil {
return nil, errorx.NewBiz("ssh隧道连接失败: %s", err.Error())
}
cli := &Cli{Info: mi}
sshClient, err := GetSshClient(mi)
if err != nil {
if mi.UseSshTunnel() {
CloseSshTunnelMachine(int(mi.SshTunnelMachine.Id), mi.Id)
}
return nil, err
}
cli.sshClient = sshClient
return cli, nil
}
// 如果使用了ssh隧道则修改机器ip port为暴露的ip port
func (me *MachineInfo) IfUseSshTunnelChangeIpPort() error {
if !me.UseSshTunnel() {
return nil
}
originId := me.Id
if originId == 0 {
// 随机设置一个id如果使用了隧道则用于临时保存隧道
me.Id = uint64(time.Now().Nanosecond())
}
sshTunnelMachine, err := GetSshTunnelMachine(int(me.SshTunnelMachine.Id), func(u uint64) (*MachineInfo, error) {
return me.SshTunnelMachine, nil
})
if err != nil {
return err
}
exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(me.Id, me.Ip, me.Port)
if err != nil {
return err
}
// 修改机器ip地址
me.Ip = exposeIp
me.Port = exposePort
return nil
}
func GetSshClient(m *MachineInfo) (*ssh.Client, error) {
config := &ssh.ClientConfig{
User: m.Username,
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
return nil
},
Timeout: 5 * time.Second,
}
if m.AuthMethod == entity.AuthCertAuthMethodPassword {
config.Auth = []ssh.AuthMethod{ssh.Password(m.Password)}
} else if m.AuthMethod == entity.MachineAuthMethodPublicKey {
var key ssh.Signer
var err error
if len(m.Passphrase) > 0 {
key, err = ssh.ParsePrivateKeyWithPassphrase([]byte(m.Password), []byte(m.Passphrase))
} else {
key, err = ssh.ParsePrivateKey([]byte(m.Password))
}
if err != nil {
return nil, err
}
config.Auth = []ssh.AuthMethod{ssh.PublicKeys(key)}
}
addr := fmt.Sprintf("%s:%d", m.Ip, m.Port)
sshClient, err := ssh.Dial("tcp", addr, config)
if err != nil {
return nil, err
}
return sshClient, nil
}

View File

@@ -1,4 +1,4 @@
package machine
package mcm
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package machine
package mcm
import (
"encoding/json"

View File

@@ -1,4 +1,4 @@
package machine
package mcm
import (
"fmt"
@@ -136,15 +136,15 @@ func (stm *SshTunnelMachine) Close() {
}
// 获取ssh隧道机器方便统一管理充当ssh隧道的机器避免创建多个ssh client
func GetSshTunnelMachine(machineId int, getMachine func(uint64) (*Info, error)) (*SshTunnelMachine, error) {
func GetSshTunnelMachine(machineId int, getMachine func(uint64) (*MachineInfo, error)) (*SshTunnelMachine, error) {
mutex.Lock()
defer mutex.Unlock()
sshTunnelMachine := sshTunnelMachines[machineId]
if sshTunnelMachine != nil {
return sshTunnelMachine, nil
}
mutex.Lock()
defer mutex.Unlock()
me, err := getMachine(uint64(machineId))
if err != nil {
return nil, err

View File

@@ -1,4 +1,4 @@
package machine
package mcm
import (
"bufio"
@@ -71,21 +71,6 @@ echo '-----'
top -b -n 1 | grep Cpu
`
func (c *Cli) GetAllStats() *Stats {
res, _ := c.Run(StatsShell)
infos := strings.Split(res, "-----")
stats := new(Stats)
getUptime(infos[0], stats)
getHostname(infos[1], stats)
getLoad(infos[2], stats)
getMemInfo(infos[3], stats)
getFSInfo(infos[4], stats)
getInterfaces(infos[5], stats)
getInterfaceInfo(infos[6], stats)
getCPU(infos[7], stats)
return stats
}
func getUptime(uptime string, stats *Stats) (err error) {
parts := strings.Fields(uptime)
if len(parts) == 2 {

View File

@@ -1,4 +1,4 @@
package machine
package mcm
import (
"bufio"

View File

@@ -2,7 +2,7 @@ package mgm
import (
"mayfly-go/internal/common/consts"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/logx"
"sync"
@@ -18,7 +18,7 @@ var connCache = cache.NewTimedCache(consts.MongoConnExpireTime, 5*time.Second).
})
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
mcm.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有mongo连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := connCache.Items()
for _, v := range items {

View File

@@ -3,7 +3,7 @@ package rdm
import (
"fmt"
"mayfly-go/internal/common/consts"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/logx"
"sync"
@@ -19,7 +19,7 @@ var connCache = cache.NewTimedCache(consts.RedisConnExpireTime, 5*time.Second).
})
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
mcm.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有redis连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := connCache.Items()
for _, v := range items {