From 550631c03b5fbd7fa0aa94aee0338fefc46ed40c Mon Sep 17 00:00:00 2001 From: "meilin.huang" <954537473@qq.com> Date: Wed, 20 Dec 2023 23:01:51 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E8=BE=BE=E6=A2=A6ssh=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/components/pagetable/PageTable.vue | 14 +++- mayfly_go_web/src/hooks/usePageTable.ts | 4 +- server/internal/db/dbm/conn.go | 5 ++ server/internal/db/dbm/dialect_dm.go | 83 ++----------------- .../internal/machine/application/machine.go | 4 + server/internal/machine/mcm/client.go | 2 +- server/internal/machine/mcm/machine.go | 8 +- server/internal/machine/mcm/sshtunnel.go | 12 +-- server/internal/mongo/application/mongo.go | 6 +- server/internal/redis/application/redis.go | 4 + 10 files changed, 53 insertions(+), 89 deletions(-) diff --git a/mayfly_go_web/src/components/pagetable/PageTable.vue b/mayfly_go_web/src/components/pagetable/PageTable.vue index 30dfaa0c..fdd41ffa 100644 --- a/mayfly_go_web/src/components/pagetable/PageTable.vue +++ b/mayfly_go_web/src/components/pagetable/PageTable.vue @@ -43,9 +43,14 @@ - + - + @@ -314,6 +319,11 @@ const calcuTableHeight = () => { state.tableMaxHeight = window.innerHeight - headerHeight + 'px'; }; +const searchFormItemKeyUpEnter = (event: any) => { + event.preventDefault(); + search(); +}; + const formatText = (data: any) => { state.formatVal = ''; try { diff --git a/mayfly_go_web/src/hooks/usePageTable.ts b/mayfly_go_web/src/hooks/usePageTable.ts index c5e3c503..7d874f34 100644 --- a/mayfly_go_web/src/hooks/usePageTable.ts +++ b/mayfly_go_web/src/hooks/usePageTable.ts @@ -1,6 +1,6 @@ import Api from '@/common/Api'; import { ElMessage } from 'element-plus'; -import { reactive, toRefs } from 'vue'; +import { reactive, toRefs, toValue } from 'vue'; /** * @description table 页面操作方法封装 @@ -39,7 +39,7 @@ export const usePageTable = ( if (!api) return; try { state.loading = true; - let sp = { ...state.searchParams }; + let sp = toValue(state.searchParams); if (beforeQueryFn) { sp = beforeQueryFn(sp); state.searchParams = sp; diff --git a/server/internal/db/dbm/conn.go b/server/internal/db/dbm/conn.go index 99f3a331..1c6c5eee 100644 --- a/server/internal/db/dbm/conn.go +++ b/server/internal/db/dbm/conn.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "mayfly-go/internal/machine/mcm" "mayfly-go/pkg/errorx" "mayfly-go/pkg/logx" "reflect" @@ -102,6 +103,10 @@ func (d *DbConn) Close() { if err := d.db.Close(); err != nil { logx.Errorf("关闭数据库实例[%s]连接失败: %s", d.Id, err.Error()) } + // 如果是达梦并且使用了ssh隧道,则需要手动将其关闭 + if d.Info.Type == DM && d.Info.SshTunnelMachineId > 0 { + mcm.CloseSshTunnelMachine(d.Info.SshTunnelMachineId, fmt.Sprintf("db:%d", d.Info.Id)) + } d.db = nil } } diff --git a/server/internal/db/dbm/dialect_dm.go b/server/internal/db/dbm/dialect_dm.go index dfe3f9a7..52e0b922 100644 --- a/server/internal/db/dbm/dialect_dm.go +++ b/server/internal/db/dbm/dialect_dm.go @@ -4,93 +4,20 @@ import ( "context" "database/sql" "fmt" - "io" machineapp "mayfly-go/internal/machine/application" "mayfly-go/pkg/errorx" - "mayfly-go/pkg/logx" "mayfly-go/pkg/utils/anyx" - "net" "strings" _ "gitee.com/chunanyong/dm" ) -type ConnectionInfo struct { - Port int - Listener net.Listener - remoteConn net.Conn -} - -var connectionMap = make(map[string]ConnectionInfo) - -func getLocalListener() (net.Listener, int, error) { - // Setup localListener (type net.Listener) - localListener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, 0, err - } - - // 获取本地端口 - localPort := localListener.Addr().(*net.TCPAddr).Port - return localListener, localPort, nil -} - -func acceptConn(listener net.Listener, sshConn net.Conn) { - for { - localConn, err := listener.Accept() - if err != nil { - logx.Warn("端口转发出错", err) - return - } - go forward(localConn, sshConn) - } -} - -func forward(localConn net.Conn, remoteConn net.Conn) { - copyConn := func(writer, reader net.Conn) { - _, err := io.Copy(writer, reader) - if err != nil { - logx.Warnf("io.Copy error: %s", err) - } - } - go copyConn(localConn, remoteConn) - go copyConn(remoteConn, localConn) -} - -func openSsh(d *DbInfo) error { - - sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(d.SshTunnelMachineId) - if err != nil { - return err - } - remoteConn, err := sshTunnelMachine.GetDialConn("tcp", fmt.Sprintf("%s:%d", d.Host, d.Port)) - if err != nil { - return err - } - // 获取sshConn的本地端口 - localLister, localPort, err := getLocalListener() - // defer localLister.Close() - go acceptConn(localLister, remoteConn) - connectionMap[d.Network] = ConnectionInfo{ - Port: localPort, - Listener: localLister, - remoteConn: remoteConn, - } - d.Host = "127.0.0.1" - d.Port = localPort - - return nil -} - -// 创建一个成员变量存放ssh隧道转发对应的本地连接 - func getDmDB(d *DbInfo) (*sql.DB, error) { driverName := "dm" - // SSH Conect 暂时不支持隧道连接 db := d.Database var dbParam string if db != "" { - // postgres database可以使用db/schema表示,方便连接指定schema, 若不存在schema则使用默认schema + // dm database可以使用db/schema表示,方便连接指定schema, 若不存在schema则使用默认schema ss := strings.Split(db, "/") if len(ss) > 1 { dbParam = fmt.Sprintf("%s?schema=%s", ss[0], ss[len(ss)-1]) @@ -101,10 +28,16 @@ func getDmDB(d *DbInfo) (*sql.DB, error) { // 开启ssh隧道 if d.SshTunnelMachineId > 0 { - err := openSsh(d) + sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(d.SshTunnelMachineId) if err != nil { return nil, err } + exposedIp, exposedPort, err := sshTunnelMachine.OpenSshTunnel(fmt.Sprintf("db:%d", d.Id), d.Host, d.Port) + if err != nil { + return nil, err + } + d.Host = exposedIp + d.Port = exposedPort } dsn := fmt.Sprintf("dm://%s:%s@%s:%d/%s", d.Username, d.Password, d.Host, d.Port, dbParam) diff --git a/server/internal/machine/application/machine.go b/server/internal/machine/application/machine.go index 35e5d656..6482d744 100644 --- a/server/internal/machine/application/machine.go +++ b/server/internal/machine/application/machine.go @@ -105,6 +105,10 @@ func (m *machineAppImpl) Save(ctx context.Context, me *entity.Machine, tagIds .. if err == nil && oldMachine.Id != me.Id { return errorx.NewBiz("该机器信息已存在") } + // 如果调整了ssh username等会查不到旧数据,故需要根据id获取旧信息将code赋值给标签进行关联 + if oldMachine.Code == "" { + oldMachine, _ = m.GetById(new(entity.Machine), me.Id) + } // 关闭连接 mcm.DeleteCli(me.Id) diff --git a/server/internal/machine/mcm/client.go b/server/internal/machine/mcm/client.go index 00cd9698..d2c792a1 100644 --- a/server/internal/machine/mcm/client.go +++ b/server/internal/machine/mcm/client.go @@ -106,6 +106,6 @@ func (c *Cli) Close() { } if c.Info.SshTunnelMachine != nil { logx.Infof("关闭机器的隧道信息: machineId=%d, sshTunnelMachineId=%d", c.Info.Id, c.Info.SshTunnelMachine.Id) - CloseSshTunnelMachine(int(c.Info.SshTunnelMachine.Id), c.Info.Id) + CloseSshTunnelMachine(int(c.Info.SshTunnelMachine.Id), c.Info.GetTunnelId()) } } diff --git a/server/internal/machine/mcm/machine.go b/server/internal/machine/mcm/machine.go index 99d72969..81db78d5 100644 --- a/server/internal/machine/mcm/machine.go +++ b/server/internal/machine/mcm/machine.go @@ -32,6 +32,10 @@ func (m *MachineInfo) UseSshTunnel() bool { return m.SshTunnelMachine != nil } +func (m *MachineInfo) GetTunnelId() string { + return fmt.Sprintf("machine:%d", m.Id) +} + // 连接 func (mi *MachineInfo) Conn() (*Cli, error) { logx.Infof("[%s]机器连接:%s:%d", mi.Name, mi.Ip, mi.Port) @@ -46,7 +50,7 @@ func (mi *MachineInfo) Conn() (*Cli, error) { sshClient, err := GetSshClient(mi) if err != nil { if mi.UseSshTunnel() { - CloseSshTunnelMachine(int(mi.SshTunnelMachine.Id), mi.Id) + CloseSshTunnelMachine(int(mi.SshTunnelMachine.Id), mi.GetTunnelId()) } return nil, err } @@ -72,7 +76,7 @@ func (me *MachineInfo) IfUseSshTunnelChangeIpPort() error { if err != nil { return err } - exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(me.Id, me.Ip, me.Port) + exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(me.GetTunnelId(), me.Ip, me.Port) if err != nil { return err } diff --git a/server/internal/machine/mcm/sshtunnel.go b/server/internal/machine/mcm/sshtunnel.go index 9a71124a..b8519623 100644 --- a/server/internal/machine/mcm/sshtunnel.go +++ b/server/internal/machine/mcm/sshtunnel.go @@ -68,10 +68,10 @@ type SshTunnelMachine struct { machineId int // 隧道机器id SshClient *ssh.Client mutex sync.Mutex - tunnels map[uint64]*Tunnel // 机器id -> 隧道 + tunnels map[string]*Tunnel // 隧道id -> 隧道 } -func (stm *SshTunnelMachine) OpenSshTunnel(id uint64, ip string, port int) (exposedIp string, exposedPort int, err error) { +func (stm *SshTunnelMachine) OpenSshTunnel(id string, ip string, port int) (exposedIp string, exposedPort int, err error) { stm.mutex.Lock() defer stm.mutex.Unlock() @@ -154,7 +154,7 @@ func GetSshTunnelMachine(machineId int, getMachine func(uint64) (*MachineInfo, e if err != nil { return nil, err } - sshTunnelMachine = &SshTunnelMachine{SshClient: sshClient, machineId: machineId, tunnels: map[uint64]*Tunnel{}} + sshTunnelMachine = &SshTunnelMachine{SshClient: sshClient, machineId: machineId, tunnels: map[string]*Tunnel{}} logx.Infof("初次连接ssh隧道机器[%d][%s:%d]", machineId, me.Ip, me.Port) sshTunnelMachines[machineId] = sshTunnelMachine @@ -168,7 +168,7 @@ func GetSshTunnelMachine(machineId int, getMachine func(uint64) (*MachineInfo, e } // 关闭ssh隧道机器的指定隧道 -func CloseSshTunnelMachine(machineId int, tunnelId uint64) { +func CloseSshTunnelMachine(machineId int, tunnelId string) { sshTunnelMachine := sshTunnelMachines[machineId] if sshTunnelMachine == nil { return @@ -184,7 +184,7 @@ func CloseSshTunnelMachine(machineId int, tunnelId uint64) { } type Tunnel struct { - id uint64 // 唯一标识 + id string // 唯一标识 machineId int // 隧道机器id localHost string // 本地监听地址 localPort int // 本地端口 @@ -237,7 +237,7 @@ func (r *Tunnel) Close() { } r.remoteConnections = nil _ = r.listener.Close() - logx.Debugf("隧道 %d 监听器关闭", r.id) + logx.Debugf("隧道 %s 监听器关闭", r.id) } func copyConn(writer, reader net.Conn) { diff --git a/server/internal/mongo/application/mongo.go b/server/internal/mongo/application/mongo.go index 5b56826e..f4367537 100644 --- a/server/internal/mongo/application/mongo.go +++ b/server/internal/mongo/application/mongo.go @@ -77,7 +77,7 @@ func (d *mongoAppImpl) TestConn(me *entity.Mongo) error { } func (d *mongoAppImpl) Save(ctx context.Context, m *entity.Mongo, tagIds ...uint64) error { - oldMongo := &entity.Mongo{Name: m.Name} + oldMongo := &entity.Mongo{Name: m.Name, SshTunnelMachineId: m.SshTunnelMachineId} err := d.GetBy(oldMongo) if m.Id == 0 { @@ -99,6 +99,10 @@ func (d *mongoAppImpl) Save(ctx context.Context, m *entity.Mongo, tagIds ...uint if err == nil && oldMongo.Id != m.Id { return errorx.NewBiz("该名称已存在") } + // 如果调整了ssh等会查不到旧数据,故需要根据id获取旧信息将code赋值给标签进行关联 + if oldMongo.Code == "" { + oldMongo, _ = d.GetById(new(entity.Mongo), m.Id) + } // 先关闭连接 mgm.CloseConn(m.Id) diff --git a/server/internal/redis/application/redis.go b/server/internal/redis/application/redis.go index c596ca60..1b7d74ad 100644 --- a/server/internal/redis/application/redis.go +++ b/server/internal/redis/application/redis.go @@ -103,6 +103,10 @@ func (r *redisAppImpl) Save(ctx context.Context, re *entity.Redis, tagIds ...uin rdm.CloseConn(re.Id, db) } } + // 如果调整了ssh等会查不到旧数据,故需要根据id获取旧信息将code赋值给标签进行关联 + if oldRedis.Code == "" { + oldRedis, _ = r.GetById(new(entity.Redis), re.Id) + } re.PwdEncrypt() return r.Tx(ctx, func(ctx context.Context) error {