feat: 首页优化&机器文件/文件夹上传实时进度通知修复

This commit is contained in:
meilin.huang
2026-05-22 20:36:19 +08:00
parent daccc638a7
commit 871e9b8fdd
35 changed files with 1627 additions and 1231 deletions

View File

@@ -18,22 +18,17 @@ import (
"mayfly-go/pkg/biz"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/global"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/timex"
"mime/multipart"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/sftp"
"github.com/spf13/cast"
)
type MachineFile struct {
@@ -65,8 +60,6 @@ func (mf *MachineFile) ReqConfs() *req.Confs {
req.NewPost(":machineId/files/:fileId/upload", mf.UploadFile).Log(req.NewLogSaveI(imsg.LogMachineFileUpload)).RequiredPermissionCode("machine:file:upload"),
req.NewPost(":machineId/files/:fileId/upload-folder", mf.UploadFolder).Log(req.NewLogSaveI(imsg.LogMachineFileUploadFolder)).RequiredPermissionCode("machine:file:upload"),
req.NewPost(":machineId/files/:fileId/remove", mf.RemoveFile).Log(req.NewLogSaveI(imsg.LogMachineFileDelete)).RequiredPermissionCode("machine:file:rm"),
req.NewPost(":machineId/files/:fileId/cp", mf.CopyFile).Log(req.NewLogSaveI(imsg.LogMachineFileCopy)).RequiredPermissionCode("machine:file:rm"),
@@ -89,33 +82,31 @@ const (
// progressReader 用于 HTTP 上传时推送进度
type progressReader struct {
reader io.Reader
total int64
readSize int64
uploadId string
filename string
path string
ctx context.Context
startTime time.Time
onProgress func(readSize int64) // 进度回调函数
lastNotify time.Time // 上次发送通知的时间
}
func (r *progressReader) Read(p []byte) (n int, err error) {
// 先检查 context 是否已取消
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
default:
}
// 直接调用底层 Read不创建 goroutine
n, err = r.reader.Read(p)
if n > 0 {
r.readSize += int64(n)
// 如果有回调函数,调用它
// 如果有回调函数,检查是否需要发送进度通知
// 首次立即通知 + 之后每1秒通知一次确保小文件也能收到通知
if r.onProgress != nil {
r.onProgress(r.readSize)
now := time.Now()
// lastNotify 为零值表示首次读取,立即发送通知
if r.lastNotify.IsZero() || now.Sub(r.lastNotify) >= time.Second {
r.lastNotify = now
r.onProgress(r.readSize)
}
}
}
return n, err
}
@@ -282,401 +273,113 @@ func (m *MachineFile) WriteFileContent(rc *req.Ctx) {
}
func (m *MachineFile) UploadFile(rc *req.Ctx) {
path := rc.PostForm("path")
protocol := cast.ToInt(rc.PostForm("protocol"))
machineId := cast.ToUint64(rc.PostForm("machineId"))
authCertName := rc.PostForm("authCertName")
uploadId := rc.PostForm("uploadId") // 前端传递的 uploadId
// 从查询参数读取配置
opForm := req.BindQuery[dto.MachineFileOp](rc)
path := opForm.Path
authCertName := opForm.AuthCertName
uploadId := rc.Query("uploadId") // 前端传递的 uploadId
isFolderUpload := rc.Query("isFolderUpload") == "true" // 是否是文件夹上传的一部分
fileheader, err := rc.FormFile("file")
biz.ErrIsNilAppendErr(err, "read form file error: %s")
body := rc.GetRequest().Body
defer body.Close()
ctx := rc.MetaCtx
// 从查询参数获取文件名
filename := rc.QueryDefault("filename", "upload_file")
// 获取内容长度
contentLength := rc.GetRequest().ContentLength
// MetaCtx 用于业务逻辑(如获取登录账号、 EventBus 等)
metaCtx := rc.MetaCtx
maxUploadFileSize := config.GetMachine().UploadMaxFileSize
biz.IsTrueI(ctx, fileheader.Size <= maxUploadFileSize, imsg.ErrUploadFileOutOfLimit, "size", maxUploadFileSize)
file, _ := fileheader.Open()
defer file.Close()
biz.IsTrueI(metaCtx, contentLength <= maxUploadFileSize, imsg.ErrUploadFileOutOfLimit, "size", maxUploadFileSize)
// 是否需要推送进度通知
hasProgressNotify := uploadId != ""
startTime := time.Now()
// 进度通知消息模板
progressMsgTmplChannel := msgdto.MsgTmplMachineFileUploadProgress
if isFolderUpload {
progressMsgTmplChannel = msgdto.MsgTmplMachineFolderUploadProgress
}
// 推送进度
var progressMsgEvent *msgdto.MsgTmplSendEvent
if hasProgressNotify {
progressMsgEvent = &msgdto.MsgTmplSendEvent{
TmplChannel: progressMsgTmplChannel,
Params: collx.M{
"authCertName": authCertName,
"path": path,
"uploadId": uploadId,
"filename": filename,
"totalSize": contentLength,
"timestamp": time.Now().UnixMilli(),
},
ReceiverIds: []uint64{contextx.GetLoginAccount(metaCtx).Id},
}
}
var mi *mcm.MachineInfo
var reader io.Reader = file
var reader io.Reader = body
if hasProgressNotify {
// 创建带进度回调的 Reader
reader = &progressReader{
reader: file,
total: fileheader.Size,
uploadId: uploadId,
filename: fileheader.Filename,
path: path,
ctx: ctx,
startTime: startTime,
reader: body,
ctx: metaCtx,
onProgress: func(readSize int64) {
progressMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadProgress,
Params: collx.M{
"authCertName": authCertName,
"path": path,
"uploadId": uploadId,
"filename": fileheader.Filename,
"uploadedSize": readSize,
"totalSize": fileheader.Size,
"status": "uploading",
"timestamp": time.Now().UnixMilli(),
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent)
parmas := collx.CopyM(progressMsgEvent.Params)
parmas["uploadedSize"] = readSize
parmas["status"] = "uploading"
parmas["timestamp"] = time.Now().UnixMilli()
progressMsgEvent.Params = parmas
global.EventBus.Publish(metaCtx, event.EventTopicMsgTmplSend, progressMsgEvent)
},
}
}
opForm := &dto.MachineFileOp{
MachineId: machineId,
AuthCertName: authCertName,
Protocol: protocol,
Path: path,
uploadedSize, mi, err := m.machineFileApp.UploadFile(metaCtx, opForm, filename, reader)
rc.ReqParam = collx.Kvs("machine", mi, "path", fmt.Sprintf("%s/%s", path, filename))
// 检查实际上传的大小(已上传大小 == 总大小)
if err == nil && uploadedSize < contentLength {
err = fmt.Errorf("File upload canceld")
}
mi, err = m.machineFileApp.UploadFile(ctx, opForm, fileheader.Filename, reader)
rc.ReqParam = collx.Kvs("machine", mi, "path", fmt.Sprintf("%s/%s", path, fileheader.Filename))
// 检查是否是取消操作
if err != nil {
logx.ErrorfContext(ctx, "[UploadFile] Upload error: %v, uploadId: %s, ctx.Err: %v", err, uploadId, ctx.Err())
if ctx.Err() != nil {
logx.InfofContext(ctx, "File upload cancelled by client: %s, uploadId: %s", fileheader.Filename, uploadId)
// 发送取消通知
if hasProgressNotify {
progressMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadProgress,
Params: collx.M{
"uploadId": uploadId,
"status": "error",
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent)
}
return
if hasProgressNotify {
params := collx.CopyM(progressMsgEvent.Params)
if err != nil {
logx.WarnfContext(metaCtx, "[UploadFile] File upload incomplete: readSize=%d, total=%d, uploadId: %s", uploadedSize, contentLength, uploadId)
params["status"] = "error"
} else {
params["status"] = "complete"
}
}
// 发送完成通知
if hasProgressNotify && err == nil {
progressMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadProgress,
Params: collx.M{
"uploadId": uploadId,
"status": "complete",
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent)
progressMsgEvent.Params = params
global.EventBus.Publish(metaCtx, event.EventTopicMsgTmplSend, progressMsgEvent)
}
// 发送文件上传结果消息
msgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess,
Params: collx.M{
"filename": fileheader.Filename,
"path": path,
},
ReceiverIds: []uint64{rc.GetLoginAccount().Id},
}
if err != nil {
msgEvent.Params["error"] = err.Error()
msgEvent.TmplChannel = msgdto.MsgTmplMachineFileUploadFail
}
if mi != nil {
msgEvent.Params["machineName"] = mi.Name
msgEvent.Params["machineCode"] = mi.Code
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent)
biz.ErrIsNilAppendErr(err, "upload file error: %s")
}
type FolderFile struct {
Dir string
Fileheader *multipart.FileHeader
}
func (m *MachineFile) UploadFolder(rc *req.Ctx) {
mf, err := rc.MultipartForm()
biz.ErrIsNilAppendErr(err, "get multipart form error: %s")
basePath := mf.Value["basePath"][0]
biz.NotEmpty(basePath, "basePath cannot be empty")
fileheaders := mf.File["files"]
biz.IsTrue(len(fileheaders) > 0, "files cannot be empty")
totalSize := collx.ArrayReduce(fileheaders, 0, func(i int64, fh *multipart.FileHeader) int64 {
return i + fh.Size
})
ctx := rc.MetaCtx
maxUploadFileSize := config.GetMachine().UploadMaxFileSize
biz.IsTrueI(ctx, totalSize <= maxUploadFileSize, imsg.ErrUploadFileOutOfLimit, "size", maxUploadFileSize)
paths := mf.Value["paths"]
authCertName := mf.Value["authCertName"][0]
machineId := cast.ToUint64(mf.Value["machineId"][0])
protocol := cast.ToInt(mf.Value["protocol"][0])
uploadId := mf.Value["uploadId"][0] // 前端传递的 uploadId
opForm := &dto.MachineFileOp{
MachineId: machineId,
Protocol: protocol,
AuthCertName: authCertName,
}
folderName := filepath.Dir(paths[0])
totalFiles := len(fileheaders)
uploadedFiles := 0
// 是否需要推送进度通知
hasProgressNotify := uploadId != ""
// 发送开始通知
if hasProgressNotify {
startMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress,
Params: collx.M{
"authCertName": authCertName,
"path": basePath,
"uploadId": uploadId,
"folderName": folderName,
"totalFiles": totalFiles,
"uploadedFiles": 0,
"totalSize": totalSize,
"uploadedSize": 0,
"percent": 0,
"status": "uploading",
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, startMsgEvent)
}
if protocol == entity.MachineProtocolRdp {
// RDP 协议上传
m.machineFileApp.UploadFiles(ctx, opForm, basePath, fileheaders, paths)
uploadedFiles = totalFiles
} else {
// SSH 协议上传
mcli, err := m.machineFileApp.GetMachineCli(rc.MetaCtx, authCertName)
biz.ErrIsNil(err)
mi := mcli.Info
sftpCli, err := mcli.GetSftpCli()
biz.ErrIsNil(err)
rc.ReqParam = collx.Kvs("machine", mi, "path", fmt.Sprintf("%s/%s", basePath, folderName))
folderFiles := make([]FolderFile, len(paths))
// 先创建目录并将其包装为folderFile结构
mkdirs := make(map[string]bool, 0)
for i, path := range paths {
dir := filepath.Dir(path)
// 目录已建,则无需重复建
if !mkdirs[dir] {
biz.ErrIsNilAppendErr(sftpCli.MkdirAll(basePath+"/"+dir), "create dir error: %s")
mkdirs[dir] = true
}
folderFiles[i] = FolderFile{
Dir: dir,
Fileheader: fileheaders[i],
}
}
// 分组处理
groupNum := 3
chunks := collx.ArraySplit(folderFiles, groupNum)
var wg sync.WaitGroup
var mu sync.Mutex // 保护并发访问
var currentUploading []string // 正在上传的文件列表
var uploadedSize int64 = 0 // 已上传的总大小
for _, chunk := range chunks {
wg.Go(func() {
defer gox.Recover(func(e error) {
logx.ErrorfContext(ctx, "upload folder error: %s", e)
})
for _, file := range chunk {
fileHeader := file.Fileheader
dir := file.Dir
fullPath := dir + "/" + fileHeader.Filename
file, _ := fileHeader.Open()
// 添加到正在上传列表
if hasProgressNotify {
mu.Lock()
currentUploading = append(currentUploading, fullPath)
mu.Unlock()
}
createfile, err := sftpCli.Create(fmt.Sprintf("%s/%s/%s", basePath, dir, fileHeader.Filename))
if err != nil {
logx.ErrorfContext(ctx, "create file error: %s", err)
file.Close()
// 从正在上传列表移除
if hasProgressNotify {
mu.Lock()
for i, p := range currentUploading {
if p == fullPath {
currentUploading = append(currentUploading[:i], currentUploading[i+1:]...)
break
}
}
mu.Unlock()
}
return
}
// 使用 progressReader 包装,追踪单个文件上传进度
var reader io.Reader = file
if hasProgressNotify {
reader = &progressReader{
reader: file,
total: fileHeader.Size,
uploadId: uploadId,
filename: fileHeader.Filename,
path: fullPath,
ctx: ctx,
startTime: time.Now(),
// 回调函数:更新全局进度
onProgress: func(readBytes int64) {
mu.Lock()
currentTotalUploaded := uploadedSize + readBytes
uploadingFiles := make([]string, len(currentUploading))
copy(uploadingFiles, currentUploading)
mu.Unlock()
progressMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress,
Params: collx.M{
"authCertName": authCertName,
"path": basePath,
"uploadId": uploadId,
"folderName": folderName,
"totalFiles": totalFiles,
"uploadedFiles": uploadedFiles,
"totalSize": totalSize,
"uploadedSize": currentTotalUploaded,
"status": "uploading",
"uploadingFiles": uploadingFiles,
"timestamp": time.Now().UnixMilli(),
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, progressMsgEvent)
},
}
}
_, err = io.Copy(createfile, reader)
if err != nil {
logx.ErrorfContext(ctx, "copy file error: %s, uploadId: %s", err, uploadId)
// 检查是否是取消操作
if ctx.Err() != nil {
logx.InfofContext(ctx, "Folder upload cancelled by client, uploadId: %s", uploadId)
file.Close()
createfile.Close()
return
}
}
// 累加已上传大小
mu.Lock()
uploadedSize += fileHeader.Size
mu.Unlock()
createfile.Close()
file.Close()
// 从正在上传列表移除,增加已完成计数
if hasProgressNotify {
mu.Lock()
for i, p := range currentUploading {
if p == fullPath {
currentUploading = append(currentUploading[:i], currentUploading[i+1:]...)
break
}
}
uploadedFiles++
mu.Unlock()
}
}
})
}
// 等待所有协程执行完成
wg.Wait()
}
// 发送完成通知
if hasProgressNotify {
status := "complete"
if uploadedFiles < totalFiles {
status = "error"
}
completeMsgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFolderUploadProgress,
Params: collx.M{
"uploadId": uploadId,
"folderName": folderName,
"totalFiles": totalFiles,
"uploadedFiles": uploadedFiles,
"totalSize": totalSize,
"uploadedSize": totalSize, // 完成时已上传大小等于总大小
"percent": 100,
"status": status,
},
ReceiverIds: []uint64{contextx.GetLoginAccount(ctx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, completeMsgEvent)
}
// 发送成功/失败消息通知
if protocol != entity.MachineProtocolRdp {
// SSH 协议:使用 mcli 获取机器信息
mcli, err := m.machineFileApp.GetMachineCli(rc.MetaCtx, authCertName)
if err == nil && mcli != nil {
msgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess,
Params: collx.M{
"filename": folderName,
"path": basePath,
"machineName": mcli.Info.Name,
"machineCode": mcli.Info.Code,
},
ReceiverIds: []uint64{rc.GetLoginAccount().Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent)
}
} else {
// RDP 协议:直接发送通知
// 文件夹上传时不发送单个文件的成功通知,只在全部完成后由前端发送一次
if !isFolderUpload {
msgEvent := &msgdto.MsgTmplSendEvent{
TmplChannel: msgdto.MsgTmplMachineFileUploadSuccess,
Params: collx.M{
"filename": folderName,
"path": basePath,
"filename": filename,
"path": path,
},
ReceiverIds: []uint64{rc.GetLoginAccount().Id},
ReceiverIds: []uint64{contextx.GetLoginAccount(metaCtx).Id},
}
global.EventBus.Publish(ctx, event.EventTopicMsgTmplSend, msgEvent)
if err != nil {
msgEvent.Params["error"] = err.Error()
msgEvent.TmplChannel = msgdto.MsgTmplMachineFileUploadFail
}
if mi != nil {
msgEvent.Params["machineName"] = mi.Name
msgEvent.Params["machineCode"] = mi.Code
}
global.EventBus.Publish(metaCtx, event.EventTopicMsgTmplSend, msgEvent)
}
biz.ErrIsNilAppendErr(err, "upload file error: %s")
}
func (m *MachineFile) RemoveFile(rc *req.Ctx) {