package api import ( "context" "fmt" "io" "io/fs" "mayfly-go/internal/machine/api/form" "mayfly-go/internal/machine/api/vo" "mayfly-go/internal/machine/application" "mayfly-go/internal/machine/application/dto" "mayfly-go/internal/machine/config" "mayfly-go/internal/machine/domain/entity" "mayfly-go/internal/machine/imsg" "mayfly-go/internal/machine/mcm" msgdto "mayfly-go/internal/msg/application/dto" "mayfly-go/internal/pkg/event" "mayfly-go/pkg/biz" "mayfly-go/pkg/contextx" "mayfly-go/pkg/global" "mayfly-go/pkg/gox" "mayfly-go/pkg/logx" "mayfly-go/pkg/model" "mayfly-go/pkg/req" "mayfly-go/pkg/utils/collx" "mayfly-go/pkg/utils/timex" "mime/multipart" "os" "path/filepath" "sort" "strings" "sync" "time" "github.com/pkg/sftp" "github.com/spf13/cast" ) type MachineFile struct { machineFileApp application.MachineFile `inject:"T"` } func (mf *MachineFile) ReqConfs() *req.Confs { reqs := [...]*req.Conf{ // 获取指定机器文件列表 req.NewGet(":machineId/files", mf.MachineFiles), req.NewPost(":machineId/files", mf.SaveMachineFiles).Log(req.NewLogSaveI(imsg.LogMachineFileConfSave)).RequiredPermissionCode("machine:file:add"), req.NewDelete(":machineId/files/:fileId", mf.DeleteFile).Log(req.NewLogSaveI(imsg.LogMachineFileConfDelete)).RequiredPermissionCode("machine:file:del"), req.NewGet(":machineId/files/:fileId/read", mf.ReadFileContent).Log(req.NewLogSaveI(imsg.LogMachineFileRead)), req.NewGet(":machineId/files/:fileId/download", mf.DownloadFile).NoRes().Log(req.NewLogSaveI(imsg.LogMachineFileDownload)), req.NewGet(":machineId/files/:fileId/read-dir", mf.GetDirEntry), req.NewGet(":machineId/files/:fileId/dir-size", mf.GetDirSize), req.NewGet(":machineId/files/:fileId/file-stat", mf.GetFileStat), req.NewPost(":machineId/files/:fileId/write", mf.WriteFileContent).Log(req.NewLogSaveI(imsg.LogMachineFileModify)).RequiredPermissionCode("machine:file:write"), req.NewPost(":machineId/files/:fileId/create-file", mf.CreateFile).Log(req.NewLogSaveI(imsg.LogMachineFileCreate)), req.NewPost(":machineId/files/:fileId/upload", mf.UploadFile).Log(req.NewLogSaveI(imsg.LogMachineFileUpload)).RequiredPermissionCode("machine:file:upload"), req.NewPost(":machineId/files/:fileId/upload-folder", mf.UploadFolder).Log(req.NewLogSaveI(imsg.LogMachineFileUploadFolder)).RequiredPermissionCode("machine:file:upload"), req.NewPost(":machineId/files/:fileId/remove", mf.RemoveFile).Log(req.NewLogSaveI(imsg.LogMachineFileDelete)).RequiredPermissionCode("machine:file:rm"), req.NewPost(":machineId/files/:fileId/cp", mf.CopyFile).Log(req.NewLogSaveI(imsg.LogMachineFileCopy)).RequiredPermissionCode("machine:file:rm"), req.NewPost(":machineId/files/:fileId/mv", mf.MvFile).Log(req.NewLogSaveI(imsg.LogMachineFileMove)).RequiredPermissionCode("machine:file:rm"), req.NewPost(":machineId/files/:fileId/rename", mf.Rename).Log(req.NewLogSaveI(imsg.LogMachineFileRename)).RequiredPermissionCode("machine:file:write"), } return req.NewConfs("machines", reqs[:]...) } const ( file = "-" dir = "d" link = "l" max_read_size = 1 * 1024 * 1024 ) // progressReader 用于 HTTP 上传时推送进度 type progressReader struct { reader io.Reader total int64 readSize int64 uploadId string filename string path string ctx context.Context startTime time.Time onProgress func(readSize int64) // 进度回调函数 } func (r *progressReader) Read(p []byte) (n int, err error) { n, err = r.reader.Read(p) if n > 0 { r.readSize += int64(n) // 如果有回调函数,调用它 if r.onProgress != nil { r.onProgress(r.readSize) } } return n, err } func (m *MachineFile) MachineFiles(rc *req.Ctx) { condition := &entity.MachineFile{MachineId: GetMachineId(rc)} res, err := m.machineFileApp.GetPageList(condition, rc.GetPageParam()) biz.ErrIsNil(err) rc.ResData = model.PageResultConv[*entity.MachineFile, *vo.MachineFileVO](res) } func (m *MachineFile) SaveMachineFiles(rc *req.Ctx) { fileForm, entity := req.BindJsonAndCopyTo[form.MachineFileForm, entity.MachineFile](rc) rc.ReqParam = fileForm biz.ErrIsNil(m.machineFileApp.Save(rc.MetaCtx, entity)) } func (m *MachineFile) DeleteFile(rc *req.Ctx) { biz.ErrIsNil(m.machineFileApp.DeleteById(rc.MetaCtx, GetMachineFileId(rc))) } /*** sftp相关操作 */ func (m *MachineFile) CreateFile(rc *req.Ctx) { opForm := req.BindJson[form.CreateFileForm](rc) path := opForm.Path attrs := collx.Kvs("path", path) var mi *mcm.MachineInfo var err error if opForm.Type == dir { attrs["type"] = "Folder" mi, err = m.machineFileApp.MkDir(rc.MetaCtx, opForm.MachineFileOp) } else { attrs["type"] = "File" mi, err = m.machineFileApp.CreateFile(rc.MetaCtx, opForm.MachineFileOp) } attrs["machine"] = mi rc.ReqParam = attrs biz.ErrIsNil(err) } func (m *MachineFile) ReadFileContent(rc *req.Ctx) { opForm := req.BindQuery[dto.MachineFileOp](rc) readPath := opForm.Path ctx := rc.MetaCtx // 特殊处理rdp文件 if opForm.Protocol == entity.MachineProtocolRdp { path := m.machineFileApp.GetRdpFilePath(rc.GetLoginAccount(), opForm.Path) fi, err := os.Stat(path) biz.ErrIsNil(err) biz.IsTrueI(ctx, fi.Size() < max_read_size, imsg.ErrFileTooLargeUseDownload) datas, err := os.ReadFile(path) biz.ErrIsNil(err) rc.ResData = string(datas) return } sftpFile, mi, err := m.machineFileApp.ReadFile(rc.MetaCtx, opForm) rc.ReqParam = collx.Kvs("machine", mi, "path", readPath) biz.ErrIsNil(err) defer sftpFile.Close() fileInfo, _ := sftpFile.Stat() filesize := fileInfo.Size() biz.IsTrueI(ctx, filesize < max_read_size, imsg.ErrFileTooLargeUseDownload) datas, err := io.ReadAll(sftpFile) biz.ErrIsNil(err) rc.ResData = string(datas) } func (m *MachineFile) DownloadFile(rc *req.Ctx) { opForm := req.BindQuery[dto.MachineFileOp](rc) readPath := opForm.Path // 截取文件名,如/usr/local/test.java -》 test.java path := strings.Split(readPath, "/") fileName := path[len(path)-1] if opForm.Protocol == entity.MachineProtocolRdp { path := m.machineFileApp.GetRdpFilePath(rc.GetLoginAccount(), opForm.Path) file, err := os.Open(path) if err != nil { return } defer file.Close() rc.Download(file, fileName) return } sftpFile, mi, err := m.machineFileApp.ReadFile(rc.MetaCtx, opForm) rc.ReqParam = collx.Kvs("machine", mi, "path", readPath) biz.ErrIsNilAppendErr(err, "open file error: %s") defer sftpFile.Close() rc.Download(sftpFile, fileName) } func (m *MachineFile) GetDirEntry(rc *req.Ctx) { opForm := req.BindQuery[dto.MachineFileOp](rc) readPath := opForm.Path rc.ReqParam = fmt.Sprintf("path: %s", readPath) fis, err := m.machineFileApp.ReadDir(rc.MetaCtx, opForm) biz.ErrIsNilAppendErr(err, "read dir error: %s") fisVO := make([]vo.MachineFileInfo, 0) for _, fi := range fis { name := fi.Name() if !strings.HasPrefix(name, "/") { name = "/" + name } path := name if readPath != "/" && readPath != "" { path = readPath + name } mfi := vo.MachineFileInfo{ Name: fi.Name(), Size: fi.Size(), Path: path, Type: getFileType(fi.Mode()), Mode: fi.Mode().String(), ModTime: timex.DefaultFormat(fi.ModTime()), } if sftpFs, ok := fi.Sys().(*sftp.FileStat); ok { mfi.UID = sftpFs.UID mfi.GID = sftpFs.GID } fisVO = append(fisVO, mfi) } sort.Sort(vo.MachineFileInfos(fisVO)) rc.ResData = fisVO } func (m *MachineFile) GetDirSize(rc *req.Ctx) { opForm := req.BindQuery[dto.MachineFileOp](rc) size, err := m.machineFileApp.GetDirSize(rc.MetaCtx, opForm) biz.ErrIsNil(err) rc.ResData = size } func (m *MachineFile) GetFileStat(rc *req.Ctx) { opForm := req.BindQuery[dto.MachineFileOp](rc) res, err := m.machineFileApp.FileStat(rc.MetaCtx, opForm) biz.ErrIsNil(err, res) rc.ResData = res } func (m *MachineFile) WriteFileContent(rc *req.Ctx) { opForm := req.BindJson[form.WriteFileContentForm](rc) path := opForm.Path mi, err := m.machineFileApp.WriteFileContent(rc.MetaCtx, opForm.MachineFileOp, []byte(opForm.Content)) rc.ReqParam = collx.Kvs("machine", mi, "path", path) biz.ErrIsNilAppendErr(err, "open file error: %s") } func (m *MachineFile) UploadFile(rc *req.Ctx) { path := rc.PostForm("path") protocol := cast.ToInt(rc.PostForm("protocol")) machineId := cast.ToUint64(rc.PostForm("machineId")) authCertName := rc.PostForm("authCertName") uploadId := rc.PostForm("uploadId") // 前端传递的 uploadId fileheader, err := rc.FormFile("file") biz.ErrIsNilAppendErr(err, "read form file error: %s") ctx := rc.MetaCtx maxUploadFileSize := config.GetMachine().UploadMaxFileSize biz.IsTrueI(ctx, fileheader.Size <= maxUploadFileSize, imsg.ErrUploadFileOutOfLimit, "size", maxUploadFileSize) file, _ := fileheader.Open() defer file.Close() // 是否需要推送进度通知 hasProgressNotify := uploadId != "" startTime := time.Now() var mi *mcm.MachineInfo var reader io.Reader = file if hasProgressNotify { // 创建带进度回调的 Reader reader = &progressReader{ reader: file, total: fileheader.Size, uploadId: uploadId, filename: fileheader.Filename, path: path, ctx: ctx, startTime: startTime, onProgress: func(readSize int64) { progressMsgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFileUploadProgress, Params: collx.M{ "uploadId": uploadId, "filename": fileheader.Filename, "uploadedSize": readSize, "totalSize": fileheader.Size, "status": "uploading", "timestamp": time.Now().UnixMilli(), }, ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent) }, } } opForm := &dto.MachineFileOp{ MachineId: machineId, AuthCertName: authCertName, Protocol: protocol, Path: path, } mi, err = m.machineFileApp.UploadFile(ctx, opForm, fileheader.Filename, reader) rc.ReqParam = collx.Kvs("machine", mi, "path", fmt.Sprintf("%s/%s", path, fileheader.Filename)) // 发送完成通知 if hasProgressNotify && err == nil { progressMsgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFileUploadProgress, Params: collx.M{ "uploadId": uploadId, "status": "complete", }, ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent) } // 发送文件上传结果消息 msgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess, Params: collx.M{ "filename": fileheader.Filename, "path": path, }, ReceiverIds: []uint64{rc.GetLoginAccount().Id}, } if err != nil { msgEvent.Params["error"] = err.Error() msgEvent.TmplChannel = msgdto.MsgTmplMachineFileUploadFail } if mi != nil { msgEvent.Params["machineName"] = mi.Name msgEvent.Params["machineCode"] = mi.Code } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent) biz.ErrIsNilAppendErr(err, "upload file error: %s") } type FolderFile struct { Dir string Fileheader *multipart.FileHeader } func (m *MachineFile) UploadFolder(rc *req.Ctx) { mf, err := rc.MultipartForm() biz.ErrIsNilAppendErr(err, "get multipart form error: %s") basePath := mf.Value["basePath"][0] biz.NotEmpty(basePath, "basePath cannot be empty") fileheaders := mf.File["files"] biz.IsTrue(len(fileheaders) > 0, "files cannot be empty") totalSize := collx.ArrayReduce(fileheaders, 0, func(i int64, fh *multipart.FileHeader) int64 { return i + fh.Size }) ctx := rc.MetaCtx maxUploadFileSize := config.GetMachine().UploadMaxFileSize biz.IsTrueI(ctx, totalSize <= maxUploadFileSize, imsg.ErrUploadFileOutOfLimit, "size", maxUploadFileSize) paths := mf.Value["paths"] authCertName := mf.Value["authCertName"][0] machineId := cast.ToUint64(mf.Value["machineId"][0]) protocol := cast.ToInt(mf.Value["protocol"][0]) uploadId := mf.Value["uploadId"][0] // 前端传递的 uploadId opForm := &dto.MachineFileOp{ MachineId: machineId, Protocol: protocol, AuthCertName: authCertName, } folderName := filepath.Dir(paths[0]) totalFiles := len(fileheaders) uploadedFiles := 0 // 是否需要推送进度通知 hasProgressNotify := uploadId != "" // 发送开始通知 if hasProgressNotify { startMsgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress, Params: collx.M{ "uploadId": uploadId, "folderName": folderName, "totalFiles": totalFiles, "uploadedFiles": 0, "totalSize": totalSize, "uploadedSize": 0, "percent": 0, "status": "uploading", }, ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, startMsgEvent) } if protocol == entity.MachineProtocolRdp { // RDP 协议上传 m.machineFileApp.UploadFiles(ctx, opForm, basePath, fileheaders, paths) uploadedFiles = totalFiles } else { // SSH 协议上传 mcli, err := m.machineFileApp.GetMachineCli(rc.MetaCtx, authCertName) biz.ErrIsNil(err) mi := mcli.Info sftpCli, err := mcli.GetSftpCli() biz.ErrIsNil(err) rc.ReqParam = collx.Kvs("machine", mi, "path", fmt.Sprintf("%s/%s", basePath, folderName)) folderFiles := make([]FolderFile, len(paths)) // 先创建目录,并将其包装为folderFile结构 mkdirs := make(map[string]bool, 0) for i, path := range paths { dir := filepath.Dir(path) // 目录已建,则无需重复建 if !mkdirs[dir] { biz.ErrIsNilAppendErr(sftpCli.MkdirAll(basePath+"/"+dir), "create dir error: %s") mkdirs[dir] = true } folderFiles[i] = FolderFile{ Dir: dir, Fileheader: fileheaders[i], } } // 分组处理 groupNum := 3 chunks := collx.ArraySplit(folderFiles, groupNum) var wg sync.WaitGroup var mu sync.Mutex // 保护并发访问 var currentUploading []string // 正在上传的文件列表 var uploadedSize int64 = 0 // 已上传的总大小 for _, chunk := range chunks { wg.Go(func() { defer gox.Recover(func(e error) { logx.Errorf("upload folder error: %s", e) }) for _, file := range chunk { fileHeader := file.Fileheader dir := file.Dir fullPath := dir + "/" + fileHeader.Filename file, _ := fileHeader.Open() // 添加到正在上传列表 if hasProgressNotify { mu.Lock() currentUploading = append(currentUploading, fullPath) mu.Unlock() } createfile, err := sftpCli.Create(fmt.Sprintf("%s/%s/%s", basePath, dir, fileHeader.Filename)) if err != nil { logx.Errorf("create file error: %s", err) file.Close() // 从正在上传列表移除 if hasProgressNotify { mu.Lock() for i, p := range currentUploading { if p == fullPath { currentUploading = append(currentUploading[:i], currentUploading[i+1:]...) break } } mu.Unlock() } return } // 使用 progressReader 包装,追踪单个文件上传进度 var reader io.Reader = file if hasProgressNotify { reader = &progressReader{ reader: file, total: fileHeader.Size, uploadId: uploadId, filename: fileHeader.Filename, path: fullPath, ctx: ctx, startTime: time.Now(), // 回调函数:更新全局进度 onProgress: func(readBytes int64) { mu.Lock() currentTotalUploaded := uploadedSize + readBytes uploadingFiles := make([]string, len(currentUploading)) copy(uploadingFiles, currentUploading) mu.Unlock() progressMsgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress, Params: collx.M{ "uploadId": uploadId, "folderName": folderName, "lastFile": fullPath, "totalFiles": totalFiles, "uploadedFiles": uploadedFiles, "totalSize": totalSize, "uploadedSize": currentTotalUploaded, "status": "uploading", "uploadingFiles": uploadingFiles, "timestamp": time.Now().UnixMilli(), }, ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent) }, } } _, err = io.Copy(createfile, reader) if err != nil { logx.Errorf("copy file error: %s", err) } // 累加已上传大小 mu.Lock() uploadedSize += fileHeader.Size mu.Unlock() createfile.Close() file.Close() // 从正在上传列表移除,增加已完成计数 if hasProgressNotify { mu.Lock() for i, p := range currentUploading { if p == fullPath { currentUploading = append(currentUploading[:i], currentUploading[i+1:]...) break } } uploadedFiles++ mu.Unlock() } } }) } // 等待所有协程执行完成 wg.Wait() } // 发送完成通知 if hasProgressNotify { status := "complete" if uploadedFiles < totalFiles { status = "error" } completeMsgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress, Params: collx.M{ "uploadId": uploadId, "folderName": folderName, "totalFiles": totalFiles, "uploadedFiles": uploadedFiles, "totalSize": totalSize, "uploadedSize": totalSize, // 完成时已上传大小等于总大小 "percent": 100, "status": status, }, ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, completeMsgEvent) } // 发送成功/失败消息通知 if protocol != entity.MachineProtocolRdp { // SSH 协议:使用 mcli 获取机器信息 mcli, err := m.machineFileApp.GetMachineCli(rc.MetaCtx, authCertName) if err == nil && mcli != nil { msgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess, Params: collx.M{ "filename": folderName, "path": basePath, "machineName": mcli.Info.Name, "machineCode": mcli.Info.Code, }, ReceiverIds: []uint64{rc.GetLoginAccount().Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent) } } else { // RDP 协议:直接发送通知 msgEvent := &msgdto.MsgTmplSendEvent{ TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess, Params: collx.M{ "filename": folderName, "path": basePath, }, ReceiverIds: []uint64{rc.GetLoginAccount().Id}, } global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent) } } func (m *MachineFile) RemoveFile(rc *req.Ctx) { opForm := req.BindJson[form.RemoveFileForm](rc) mi, err := m.machineFileApp.RemoveFile(rc.MetaCtx, opForm.MachineFileOp, opForm.Paths...) rc.ReqParam = collx.Kvs("machine", mi, "path", opForm) biz.ErrIsNilAppendErr(err, "remove file error: %s") } func (m *MachineFile) CopyFile(rc *req.Ctx) { opForm := req.BindJson[form.CopyFileForm](rc) mi, err := m.machineFileApp.Copy(rc.MetaCtx, opForm.MachineFileOp, opForm.ToPath, opForm.Paths...) biz.ErrIsNilAppendErr(err, "file copy error: %s") rc.ReqParam = collx.Kvs("machine", mi, "cp", opForm) } func (m *MachineFile) MvFile(rc *req.Ctx) { opForm := req.BindJson[form.CopyFileForm](rc) mi, err := m.machineFileApp.Mv(rc.MetaCtx, opForm.MachineFileOp, opForm.ToPath, opForm.Paths...) rc.ReqParam = collx.Kvs("machine", mi, "mv", opForm) biz.ErrIsNilAppendErr(err, "file move error: %s") } func (m *MachineFile) Rename(rc *req.Ctx) { renameForm := req.BindJson[form.RenameForm](rc) mi, err := m.machineFileApp.Rename(rc.MetaCtx, renameForm.MachineFileOp, renameForm.Newname) rc.ReqParam = collx.Kvs("machine", mi, "rename", renameForm) biz.ErrIsNilAppendErr(err, "file rename error: %s") } func getFileType(fm fs.FileMode) string { if fm.IsDir() { return dir } if fm.IsRegular() { return file } return dir } func GetMachineFileId(rc *req.Ctx) uint64 { fileId := rc.PathParamInt("fileId") biz.IsTrue(fileId != 0, "fileId error") return uint64(fileId) }