feat: linux支持ssh隧道访问&其他优化

This commit is contained in:
meilin.huang
2022-07-23 16:41:04 +08:00
parent f0540559bb
commit 76d6fc3ba5
26 changed files with 2003 additions and 1556 deletions

View File

@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"mayfly-go/internal/constant"
"mayfly-go/internal/devops/domain/entity"
"mayfly-go/internal/devops/domain/repository"
"mayfly-go/internal/devops/infrastructure/machine"
@@ -23,7 +24,6 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/lib/pq"
"golang.org/x/crypto/ssh"
)
type Db interface {
@@ -47,6 +47,9 @@ type Db interface {
// @param id 数据库实例id
// @param db 数据库
GetDbInstance(id uint64, db string) *DbInstance
// 获取数据库实例的所有数据库列表
GetDatabases(entity *entity.Db) []string
}
type dbAppImpl struct {
@@ -141,11 +144,34 @@ func (d *dbAppImpl) Delete(id uint64) {
d.dbSqlRepo.DeleteBy(&entity.DbSql{DbId: id})
}
func (d *dbAppImpl) GetDatabases(ed *entity.Db) []string {
databases := make([]string, 0)
var dbConn *sql.DB
var metaDb string
var getDatabasesSql string
if ed.Type == entity.DbTypeMysql {
metaDb = "information_schema"
getDatabasesSql = "SELECT SCHEMA_NAME AS dbname FROM SCHEMATA"
} else {
metaDb = "postgres"
getDatabasesSql = "SELECT datname AS dbname FROM pg_database"
}
dbConn, err := GetDbConn(ed, metaDb)
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer dbConn.Close()
_, res, err := SelectDataByDb(dbConn, getDatabasesSql)
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
for _, re := range res {
databases = append(databases, re["dbname"].(string))
}
return databases
}
var mutex sync.Mutex
func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
mutex.Lock()
defer mutex.Unlock()
// Id不为0则为需要缓存
needCache := id != 0
if needCache {
@@ -154,43 +180,17 @@ func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
return load.(*DbInstance)
}
}
biz.IsTrue(mutex.TryLock(), "有数据库实例在连接中...请稍后重试")
defer mutex.Unlock()
d := da.GetById(id)
biz.NotNil(d, "数据库信息不存在")
biz.IsTrue(strings.Contains(d.Database, db), "未配置该库的操作权限")
cacheKey := GetDbCacheKey(id, db)
dbi := &DbInstance{Id: cacheKey, Type: d.Type, ProjectId: d.ProjectId}
dbi := &DbInstance{Id: cacheKey, Type: d.Type, ProjectId: d.ProjectId, sshTunnelMachineId: d.SshTunnelMachineId}
//SSH Conect
if d.EnableSshTunnel == 1 && d.SshTunnelMachineId != 0 {
me := MachineApp.GetById(d.SshTunnelMachineId)
biz.NotNil(me, "隧道机器信息不存在")
sshClient, err := machine.GetSshClient(me)
biz.ErrIsNilAppendErr(err, "ssh隧道连接失败: %s")
dbi.sshTunnel = sshClient
if d.Type == entity.DbTypeMysql {
mysql.RegisterDialContext(d.Network, func(ctx context.Context, addr string) (net.Conn, error) {
return sshClient.Dial("tcp", addr)
})
} else if d.Type == entity.DbTypePostgres {
_, err := pq.DialOpen(&PqSqlDialer{sshTunnel: sshClient}, getDsn(d))
if err != nil {
dbi.Close()
panic(biz.NewBizErr(fmt.Sprintf("postgres隧道连接失败: %s", err.Error())))
}
}
}
// 将数据库替换为要访问的数据库,原本数据库为空格拼接的所有库
d.Database = db
DB, err := sql.Open(d.Type, getDsn(d))
if err != nil {
dbi.Close()
panic(biz.NewBizErr(fmt.Sprintf("Open %s failed, err:%v\n", d.Type, err)))
}
err = DB.Ping()
DB, err := GetDbConn(d, db)
if err != nil {
dbi.Close()
global.Log.Errorf("连接db失败: %s:%d/%s", d.Host, d.Port, db)
@@ -212,32 +212,29 @@ func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
return dbi
}
type PqSqlDialer struct {
sshTunnel *ssh.Client
}
func (pd *PqSqlDialer) Dial(network, address string) (net.Conn, error) {
if sshConn, err := pd.sshTunnel.Dial(network, address); err == nil {
// 将ssh conn包装否则redis内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &utils.WrapSshConn{Conn: sshConn}, nil
} else {
return nil, err
}
}
func (pd *PqSqlDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
return pd.Dial(network, address)
}
//------------------------------------------------------------------------------
// 客户端连接缓存,30分钟内没有访问则会被关闭, key为数据库实例id:数据库
var dbCache = cache.NewTimedCache(30*time.Minute, 5*time.Second).
// 客户端连接缓存,指定时间内没有访问则会被关闭, key为数据库实例id:数据库
var dbCache = cache.NewTimedCache(constant.DbConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(key interface{}, value interface{}) {
global.Log.Info(fmt.Sprintf("删除db连接缓存 id = %s", key))
value.(*DbInstance).Close()
})
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId uint64) bool {
// 遍历所有db连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := dbCache.Items()
for _, v := range items {
if v.Value.(*DbInstance).sshTunnelMachineId == machineId {
return true
}
}
return false
})
}
func GetDbCacheKey(dbId uint64, db string) string {
return fmt.Sprintf("%d:%s", dbId, db)
}
@@ -250,55 +247,45 @@ func GetDbInstanceByCache(id string) *DbInstance {
}
func TestConnection(d *entity.Db) {
//SSH Conect
// 验证第一个库是否可以连接即可
DB, err := GetDbConn(d, strings.Split(d.Database, " ")[0])
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer DB.Close()
}
// 获取数据库连接
func GetDbConn(d *entity.Db, db string) (*sql.DB, error) {
// SSH Conect
if d.EnableSshTunnel == 1 && d.SshTunnelMachineId != 0 {
me := MachineApp.GetById(d.SshTunnelMachineId)
sshClient, err := machine.GetSshClient(me)
biz.ErrIsNilAppendErr(err, "ssh隧道连接失败: %s")
defer sshClient.Close()
sshTunnelMachine := MachineApp.GetSshTunnelMachine(d.SshTunnelMachineId)
defer machine.CloseSshTunnelMachine(d.SshTunnelMachineId, 0)
if d.Type == entity.DbTypeMysql {
mysql.RegisterDialContext(d.Network, func(ctx context.Context, addr string) (net.Conn, error) {
return sshClient.Dial("tcp", addr)
return MachineApp.GetSshTunnelMachine(d.SshTunnelMachineId).GetDialConn("tcp", addr)
})
} else if d.Type == entity.DbTypePostgres {
_, err := pq.DialOpen(&PqSqlDialer{sshTunnel: sshClient}, getDsn(d))
_, err := pq.DialOpen(&PqSqlDialer{sshTunnelMachine: sshTunnelMachine}, getDsn(d, db))
if err != nil {
panic(biz.NewBizErr(fmt.Sprintf("postgres隧道连接失败: %s", err.Error())))
}
}
}
// 验证第一个库是否可以连接即可
d.Database = strings.Split(d.Database, " ")[0]
DB, err := sql.Open(d.Type, getDsn(d))
biz.ErrIsNil(err, "Open %s failed, err:%v\n", d.Type, err)
defer DB.Close()
perr := DB.Ping()
biz.ErrIsNilAppendErr(perr, "数据库连接失败: %s")
}
// db实例
type DbInstance struct {
Id string
Type string
ProjectId uint64
db *sql.DB
sshTunnel *ssh.Client
}
// 执行查询语句
// 依次返回 列名数组结果map错误
func (d *DbInstance) SelectData(execSql string) ([]string, []map[string]interface{}, error) {
execSql = strings.Trim(execSql, " ")
isSelect := strings.HasPrefix(execSql, "SELECT") || strings.HasPrefix(execSql, "select")
isShow := strings.HasPrefix(execSql, "show")
isExplain := strings.HasPrefix(execSql, "explain")
if !isSelect && !isShow && !isExplain {
return nil, nil, errors.New("该sql非查询语句")
DB, err := sql.Open(d.Type, getDsn(d, db))
if err != nil {
return nil, err
}
err = DB.Ping()
if err != nil {
DB.Close()
return nil, err
}
rows, err := d.db.Query(execSql)
return DB, nil
}
func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]interface{}, error) {
rows, err := db.Query(selectSql)
if err != nil {
return nil, nil, err
}
@@ -383,6 +370,45 @@ func (d *DbInstance) SelectData(execSql string) ([]string, []map[string]interfac
return colNames, result, nil
}
type PqSqlDialer struct {
sshTunnelMachine *machine.SshTunnelMachine
}
func (pd *PqSqlDialer) Dial(network, address string) (net.Conn, error) {
if sshConn, err := pd.sshTunnelMachine.GetDialConn("tcp", address); err == nil {
// 将ssh conn包装否则redis内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &utils.WrapSshConn{Conn: sshConn}, nil
} else {
return nil, err
}
}
func (pd *PqSqlDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
return pd.Dial(network, address)
}
// db实例
type DbInstance struct {
Id string
Type string
ProjectId uint64
db *sql.DB
sshTunnelMachineId uint64
}
// 执行查询语句
// 依次返回 列名数组结果map错误
func (d *DbInstance) SelectData(execSql string) ([]string, []map[string]interface{}, error) {
execSql = strings.Trim(execSql, " ")
isSelect := strings.HasPrefix(execSql, "SELECT") || strings.HasPrefix(execSql, "select")
isShow := strings.HasPrefix(execSql, "show")
isExplain := strings.HasPrefix(execSql, "explain")
if !isSelect && !isShow && !isExplain {
return nil, nil, errors.New("该sql非查询语句")
}
return SelectDataByDb(d.db, execSql)
}
// 执行 update, insert, delete建表等sql
// 返回影响条数和错误
func (d *DbInstance) Exec(sql string) (int64, error) {
@@ -399,19 +425,15 @@ func (d *DbInstance) Close() {
if err := d.db.Close(); err != nil {
global.Log.Errorf("关闭数据库实例[%s]连接失败: %s", d.Id, err.Error())
}
}
if d.sshTunnel != nil {
if err := d.sshTunnel.Close(); err != nil {
global.Log.Errorf("关闭数据库实例[%s]的ssh隧道失败: %s", d.Id, err.Error())
}
d.db = nil
}
}
// 获取dataSourceName
func getDsn(d *entity.Db) string {
func getDsn(d *entity.Db, db string) string {
var dsn string
if d.Type == entity.DbTypeMysql {
dsn = fmt.Sprintf("%s:%s@%s(%s:%d)/%s?timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, d.Database)
dsn = fmt.Sprintf("%s:%s@%s(%s:%d)/%s?timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, db)
if d.Params != "" {
dsn = fmt.Sprintf("%s&%s", dsn, d.Params)
}
@@ -419,7 +441,7 @@ func getDsn(d *entity.Db) string {
}
if d.Type == entity.DbTypePostgres {
dsn = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", d.Host, d.Port, d.Username, d.Password, d.Database)
dsn = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", d.Host, d.Port, d.Username, d.Password, db)
if d.Params != "" {
dsn = fmt.Sprintf("%s %s", dsn, strings.Join(strings.Split(d.Params, "&"), " "))
}

View File

@@ -32,6 +32,9 @@ type Machine interface {
// 获取机器连接
GetCli(id uint64) *machine.Cli
// 获取ssh隧道机器连接
GetSshTunnelMachine(id uint64) *machine.SshTunnelMachine
}
type machineAppImpl struct {
@@ -53,7 +56,7 @@ func (m *machineAppImpl) Count(condition *entity.Machine) int64 {
func (m *machineAppImpl) Save(me *entity.Machine) {
// ’修改机器信息且密码不为空‘ or ‘新增’需要测试是否可连接
if (me.Id != 0 && me.Password != "") || me.Id == 0 {
biz.ErrIsNilAppendErr(machine.TestConn(me), "该机器无法连接: %s")
biz.ErrIsNilAppendErr(machine.TestConn(*me, func(u uint64) *entity.Machine { return m.GetById(u) }), "该机器无法连接: %s")
}
oldMachine := &entity.Machine{Ip: me.Ip, Port: me.Port, Username: me.Username}
@@ -126,3 +129,13 @@ func (m *machineAppImpl) GetCli(id uint64) *machine.Cli {
biz.ErrIsNilAppendErr(err, "获取客户端错误: %s")
return cli
}
func (m *machineAppImpl) GetSshTunnelMachine(id uint64) *machine.SshTunnelMachine {
sshTunnel, err := machine.GetSshTunnelMachine(id, func(machineId uint64) *entity.Machine {
machine := m.GetById(machineId)
biz.IsTrue(machine.Status == entity.MachineStatusEnable, "该机器已被停用")
return machine
})
biz.ErrIsNilAppendErr(err, "获取ssh隧道连接失败: %s")
return sshTunnel
}

View File

@@ -2,6 +2,7 @@ package application
import (
"context"
"mayfly-go/internal/constant"
"mayfly-go/internal/devops/domain/entity"
"mayfly-go/internal/devops/domain/repository"
"mayfly-go/internal/devops/infrastructure/machine"
@@ -16,7 +17,6 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/crypto/ssh"
)
type Mongo interface {
@@ -95,14 +95,27 @@ func (d *mongoAppImpl) GetMongoCli(id uint64) *mongo.Client {
// -----------------------------------------------------------
//mongo客户端连接缓存30分钟内没有访问则会被关闭
var mongoCliCache = cache.NewTimedCache(30*time.Minute, 5*time.Second).
//mongo客户端连接缓存指定时间内没有访问则会被关闭
var mongoCliCache = cache.NewTimedCache(constant.MongoConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(key interface{}, value interface{}) {
global.Log.Info("删除mongo连接缓存: id = ", key)
value.(*MongoInstance).Close()
})
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId uint64) bool {
// 遍历所有mongo连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := mongoCliCache.Items()
for _, v := range items {
if v.Value.(*MongoInstance).sshTunnelMachineId == machineId {
return true
}
}
return false
})
}
// 获取mongo的连接实例
func GetMongoInstance(mongoId uint64, getMongoEntity func(uint64) *entity.Mongo) (*MongoInstance, error) {
mi, err := mongoCliCache.ComputeIfAbsent(mongoId, func(_ interface{}) (interface{}, error) {
@@ -124,10 +137,10 @@ func DeleteMongoCache(mongoId uint64) {
}
type MongoInstance struct {
Id uint64
ProjectId uint64
Cli *mongo.Client
sshTunnel *ssh.Client
Id uint64
ProjectId uint64
Cli *mongo.Client
sshTunnelMachineId uint64
}
func (mi *MongoInstance) Close() {
@@ -135,11 +148,7 @@ func (mi *MongoInstance) Close() {
if err := mi.Cli.Disconnect(context.Background()); err != nil {
global.Log.Errorf("关闭mongo实例[%d]连接失败: %s", mi.Id, err)
}
}
if mi.sshTunnel != nil {
if err := mi.sshTunnel.Close(); err != nil {
global.Log.Errorf("关闭mongo实例[%d]的ssh隧道失败: %s", mi.Id, err.Error())
}
mi.Cli = nil
}
}
@@ -154,19 +163,17 @@ func connect(me *entity.Mongo) (*MongoInstance, error) {
SetMaxPoolSize(1)
// 启用ssh隧道则连接隧道机器
if me.EnableSshTunnel == 1 {
machineEntity := MachineApp.GetById(4)
sshClient, err := machine.GetSshClient(machineEntity)
biz.ErrIsNilAppendErr(err, "ssh隧道连接失败: %s")
mongoInstance.sshTunnel = sshClient
mongoOptions.SetDialer(&MongoSshDialer{sshTunnel: sshClient})
mongoInstance.sshTunnelMachineId = me.SshTunnelMachineId
mongoOptions.SetDialer(&MongoSshDialer{machineId: me.SshTunnelMachineId})
}
client, err := mongo.Connect(ctx, mongoOptions)
if err != nil {
mongoInstance.Close()
return nil, err
}
if err = client.Ping(context.TODO(), nil); err != nil {
mongoInstance.Close()
return nil, err
}
@@ -176,11 +183,11 @@ func connect(me *entity.Mongo) (*MongoInstance, error) {
}
type MongoSshDialer struct {
sshTunnel *ssh.Client
machineId uint64
}
func (sd *MongoSshDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
if sshConn, err := sd.sshTunnel.Dial(network, address); err == nil {
if sshConn, err := MachineApp.GetSshTunnelMachine(sd.machineId).GetDialConn(network, address); err == nil {
// 将ssh conn包装否则内部部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &utils.WrapSshConn{Conn: sshConn}, nil
} else {

View File

@@ -3,6 +3,7 @@ package application
import (
"context"
"fmt"
"mayfly-go/internal/constant"
"mayfly-go/internal/devops/domain/entity"
"mayfly-go/internal/devops/domain/repository"
"mayfly-go/internal/devops/infrastructure/machine"
@@ -17,7 +18,6 @@ import (
"time"
"github.com/go-redis/redis/v8"
"golang.org/x/crypto/ssh"
)
type Redis interface {
@@ -151,9 +151,8 @@ func getRedisCient(re *entity.Redis) *RedisInstance {
WriteTimeout: -1,
}
if re.EnableSshTunnel == 1 {
sshClient, dialerFunc := getRedisDialer(re.SshTunnelMachineId)
ri.sshTunnel = sshClient
redisOptions.Dialer = dialerFunc
ri.sshTunnelMachineId = re.SshTunnelMachineId
redisOptions.Dialer = getRedisDialer(re.SshTunnelMachineId)
}
ri.Cli = redis.NewClient(redisOptions)
return ri
@@ -168,21 +167,17 @@ func getRedisClusterClient(re *entity.Redis) *RedisInstance {
DialTimeout: 8 * time.Second,
}
if re.EnableSshTunnel == 1 {
sshClient, dialerFunc := getRedisDialer(re.SshTunnelMachineId)
ri.sshTunnel = sshClient
redisClusterOptions.Dialer = dialerFunc
ri.sshTunnelMachineId = re.SshTunnelMachineId
redisClusterOptions.Dialer = getRedisDialer(re.SshTunnelMachineId)
}
ri.ClusterCli = redis.NewClusterClient(redisClusterOptions)
return ri
}
func getRedisDialer(machineId uint64) (*ssh.Client, func(ctx context.Context, network, addr string) (net.Conn, error)) {
me := MachineApp.GetById(machineId)
sshClient, err := machine.GetSshClient(me)
biz.ErrIsNilAppendErr(err, "ssh隧道连接失败: %s")
return sshClient, func(_ context.Context, network, addr string) (net.Conn, error) {
if sshConn, err := sshClient.Dial(network, addr); err == nil {
func getRedisDialer(machineId uint64) func(ctx context.Context, network, addr string) (net.Conn, error) {
sshTunnel := MachineApp.GetSshTunnelMachine(machineId)
return func(_ context.Context, network, addr string) (net.Conn, error) {
if sshConn, err := sshTunnel.GetDialConn(network, addr); err == nil {
// 将ssh conn包装否则redis内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &utils.WrapSshConn{Conn: sshConn}, nil
} else {
@@ -193,8 +188,8 @@ func getRedisDialer(machineId uint64) (*ssh.Client, func(ctx context.Context, ne
//------------------------------------------------------------------------------
// redis客户端连接缓存30分钟内没有访问则会被关闭
var redisCache = cache.NewTimedCache(30*time.Minute, 5*time.Second).
// redis客户端连接缓存指定时间内没有访问则会被关闭
var redisCache = cache.NewTimedCache(constant.RedisConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(key interface{}, value interface{}) {
global.Log.Info(fmt.Sprintf("删除redis连接缓存 id = %d", key))
@@ -206,6 +201,19 @@ func CloseRedis(id uint64) {
redisCache.Delete(id)
}
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId uint64) bool {
// 遍历所有redis连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := redisCache.Items()
for _, v := range items {
if v.Value.(*RedisInstance).sshTunnelMachineId == machineId {
return true
}
}
return false
})
}
func TestRedisConnection(re *entity.Redis) {
var cmd redis.Cmdable
if re.Mode == "" || re.Mode == entity.RedisModeStandalone {
@@ -225,12 +233,12 @@ func TestRedisConnection(re *entity.Redis) {
// redis实例
type RedisInstance struct {
Id uint64
ProjectId uint64
Mode string
Cli *redis.Client
ClusterCli *redis.ClusterClient
sshTunnel *ssh.Client
Id uint64
ProjectId uint64
Mode string
Cli *redis.Client
ClusterCli *redis.ClusterClient
sshTunnelMachineId uint64
}
// 获取命令执行接口的具体实现
@@ -256,15 +264,12 @@ func (r *RedisInstance) Close() {
if err := r.Cli.Close(); err != nil {
global.Log.Errorf("关闭redis单机实例[%d]连接失败: %s", r.Id, err.Error())
}
r.Cli = nil
}
if r.Mode == entity.RedisModeCluster {
if err := r.ClusterCli.Close(); err != nil {
global.Log.Errorf("关闭redis集群实例[%d]连接失败: %s", r.Id, err.Error())
}
}
if r.sshTunnel != nil {
if err := r.sshTunnel.Close(); err != nil {
global.Log.Errorf("关闭redis实例[%d]的ssh隧道失败: %s", r.Id, err.Error())
}
r.ClusterCli = nil
}
}