mirror of
				https://gitee.com/dromara/mayfly-go
				synced 2025-11-04 00:10:25 +08:00 
			
		
		
		
	fix: 删除机器、数据库时关联的标签未删除
This commit is contained in:
		@@ -14,7 +14,7 @@
 | 
			
		||||
                            <span>:</span>
 | 
			
		||||
                        </template>
 | 
			
		||||
 | 
			
		||||
                        <SearchFormItem v-if="!item.slot" :item="item" v-model="searchParam[item.prop]" />
 | 
			
		||||
                        <SearchFormItem @keyup.enter="handleItemKeyupEnter(item)" v-if="!item.slot" :item="item" v-model="searchParam[item.prop]" />
 | 
			
		||||
 | 
			
		||||
                        <slot v-else :name="item.slot"></slot>
 | 
			
		||||
                    </el-form-item>
 | 
			
		||||
@@ -98,6 +98,12 @@ const showCollapse = computed(() => {
 | 
			
		||||
    }, 0);
 | 
			
		||||
    return show;
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
const handleItemKeyupEnter = (item: SearchItem) => {
 | 
			
		||||
    if (item.type == 'input') {
 | 
			
		||||
        props.search(searchParam);
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
</script>
 | 
			
		||||
<style lang="scss">
 | 
			
		||||
.search-form {
 | 
			
		||||
 
 | 
			
		||||
@@ -1,12 +1,14 @@
 | 
			
		||||
<template>
 | 
			
		||||
    <div>
 | 
			
		||||
        <!-- 查询表单 -->
 | 
			
		||||
        <SearchForm v-show="isShowSearch" :items="searchItems" v-model="queryForm_" :search="queryData" :reset="reset" :search-col="searchCol">
 | 
			
		||||
            <!-- 遍历父组件传入的 solts 透传给子组件 -->
 | 
			
		||||
            <template v-for="(_, key) in useSlots()" v-slot:[key]>
 | 
			
		||||
                <slot :name="key"></slot>
 | 
			
		||||
            </template>
 | 
			
		||||
        </SearchForm>
 | 
			
		||||
        <transition name="el-zoom-in-top">
 | 
			
		||||
            <!-- 查询表单 -->
 | 
			
		||||
            <SearchForm v-show="isShowSearch" :items="searchItems" v-model="queryForm_" :search="queryData" :reset="reset" :search-col="searchCol">
 | 
			
		||||
                <!-- 遍历父组件传入的 solts 透传给子组件 -->
 | 
			
		||||
                <template v-for="(_, key) in useSlots()" v-slot:[key]>
 | 
			
		||||
                    <slot :name="key"></slot>
 | 
			
		||||
                </template>
 | 
			
		||||
            </SearchForm>
 | 
			
		||||
        </transition>
 | 
			
		||||
 | 
			
		||||
        <el-card>
 | 
			
		||||
            <div class="table-main">
 | 
			
		||||
 
 | 
			
		||||
@@ -20,4 +20,7 @@ const (
 | 
			
		||||
	TagResourceTypeDb      = 2
 | 
			
		||||
	TagResourceTypeRedis   = 3
 | 
			
		||||
	TagResourceTypeMongo   = 4
 | 
			
		||||
 | 
			
		||||
	// 删除机器的事件主题名
 | 
			
		||||
	DeleteMachineEventTopic = "machine:delete"
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -124,9 +124,18 @@ func (d *dbAppImpl) Delete(ctx context.Context, id uint64) error {
 | 
			
		||||
		// 关闭连接
 | 
			
		||||
		dbm.CloseDb(id, v)
 | 
			
		||||
	}
 | 
			
		||||
	// 删除该库下用户保存的所有sql信息
 | 
			
		||||
	d.dbSqlRepo.DeleteByCond(ctx, &entity.DbSql{DbId: id})
 | 
			
		||||
	return d.DeleteById(ctx, id)
 | 
			
		||||
 | 
			
		||||
	return d.Tx(ctx,
 | 
			
		||||
		func(ctx context.Context) error {
 | 
			
		||||
			return d.DeleteById(ctx, id)
 | 
			
		||||
		},
 | 
			
		||||
		func(ctx context.Context) error {
 | 
			
		||||
			// 删除该库下用户保存的所有sql信息
 | 
			
		||||
			return d.dbSqlRepo.DeleteByCond(ctx, &entity.DbSql{DbId: id})
 | 
			
		||||
		}, func(ctx context.Context) error {
 | 
			
		||||
			var tagIds []uint64
 | 
			
		||||
			return d.tagApp.RelateResource(ctx, db.Code, consts.TagResourceTypeDb, tagIds)
 | 
			
		||||
		})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *dbAppImpl) GetDbConn(dbId uint64, dbName string) (*dbm.DbConn, error) {
 | 
			
		||||
 
 | 
			
		||||
@@ -59,7 +59,7 @@ func (m *MachineFile) SaveMachineFiles(rc *req.Ctx) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *MachineFile) DeleteFile(rc *req.Ctx) {
 | 
			
		||||
	biz.ErrIsNil(m.MachineFileApp.Delete(rc.MetaCtx, GetMachineFileId(rc.GinCtx)))
 | 
			
		||||
	biz.ErrIsNil(m.MachineFileApp.DeleteById(rc.MetaCtx, GetMachineFileId(rc.GinCtx)))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/***      sftp相关操作      */
 | 
			
		||||
 
 | 
			
		||||
@@ -12,14 +12,12 @@ import (
 | 
			
		||||
	tagapp "mayfly-go/internal/tag/application"
 | 
			
		||||
	"mayfly-go/pkg/base"
 | 
			
		||||
	"mayfly-go/pkg/errorx"
 | 
			
		||||
	"mayfly-go/pkg/gormx"
 | 
			
		||||
	"mayfly-go/pkg/global"
 | 
			
		||||
	"mayfly-go/pkg/logx"
 | 
			
		||||
	"mayfly-go/pkg/model"
 | 
			
		||||
	"mayfly-go/pkg/scheduler"
 | 
			
		||||
	"mayfly-go/pkg/utils/stringx"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"gorm.io/gorm"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Machine interface {
 | 
			
		||||
@@ -51,11 +49,14 @@ type Machine interface {
 | 
			
		||||
	GetMachineStats(machineId uint64) (*mcm.Stats, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newMachineApp(machineRepo repository.Machine, authCertApp AuthCert, tagApp tagapp.TagTree) Machine {
 | 
			
		||||
func newMachineApp(machineRepo repository.Machine,
 | 
			
		||||
	authCertApp AuthCert,
 | 
			
		||||
	tagApp tagapp.TagTree) Machine {
 | 
			
		||||
	app := &machineAppImpl{
 | 
			
		||||
		authCertApp: authCertApp,
 | 
			
		||||
		tagApp:      tagApp,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	app.Repo = machineRepo
 | 
			
		||||
	return app
 | 
			
		||||
}
 | 
			
		||||
@@ -141,22 +142,22 @@ func (m *machineAppImpl) ChangeStatus(ctx context.Context, id uint64, status int
 | 
			
		||||
 | 
			
		||||
// 根据条件获取机器信息
 | 
			
		||||
func (m *machineAppImpl) Delete(ctx context.Context, id uint64) error {
 | 
			
		||||
	machine, err := m.GetById(new(entity.Machine), id)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errorx.NewBiz("机器信息不存在")
 | 
			
		||||
	}
 | 
			
		||||
	// 关闭连接
 | 
			
		||||
	mcm.DeleteCli(id)
 | 
			
		||||
	return gormx.Tx(
 | 
			
		||||
		func(db *gorm.DB) error {
 | 
			
		||||
			// 删除machine表信息
 | 
			
		||||
			return gormx.DeleteByIdWithDb(db, new(entity.Machine), id)
 | 
			
		||||
		},
 | 
			
		||||
		func(db *gorm.DB) error {
 | 
			
		||||
			// 删除machine_file
 | 
			
		||||
			return gormx.DeleteByWithDb(db, &entity.MachineFile{MachineId: id})
 | 
			
		||||
		},
 | 
			
		||||
		func(db *gorm.DB) error {
 | 
			
		||||
			// 删除machine_script
 | 
			
		||||
			return gormx.DeleteByWithDb(db, &entity.MachineScript{MachineId: id})
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// 发布机器删除事件
 | 
			
		||||
	global.EventBus.Publish(ctx, consts.DeleteMachineEventTopic, machine)
 | 
			
		||||
	return m.Tx(ctx,
 | 
			
		||||
		func(ctx context.Context) error {
 | 
			
		||||
			return m.DeleteById(ctx, id)
 | 
			
		||||
		}, func(ctx context.Context) error {
 | 
			
		||||
			var tagIds []uint64
 | 
			
		||||
			return m.tagApp.RelateResource(ctx, machine.Code, consts.TagResourceTypeMachine, tagIds)
 | 
			
		||||
		})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *machineAppImpl) GetCli(machineId uint64) (*mcm.Cli, error) {
 | 
			
		||||
 
 | 
			
		||||
@@ -137,6 +137,11 @@ func (m *machineCropJobAppImpl) CronJobRelateMachines(ctx context.Context, cronJ
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *machineCropJobAppImpl) MachineRelateCronJobs(ctx context.Context, machineId uint64, cronJobs []uint64) {
 | 
			
		||||
	if len(cronJobs) == 0 {
 | 
			
		||||
		m.machineCropJobRelateRepo.DeleteByCond(ctx, &entity.MachineCronJobRelate{MachineId: machineId})
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	oldCronIds := m.machineCropJobRelateRepo.GetCronJobIds(machineId)
 | 
			
		||||
	addIds, delIds, _ := collx.ArrayCompare[uint64](cronJobs, oldCronIds, func(u1, u2 uint64) bool { return u1 == u2 })
 | 
			
		||||
	addVals := make([]*entity.MachineCronJobRelate, 0)
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ import (
 | 
			
		||||
	"mayfly-go/internal/machine/domain/entity"
 | 
			
		||||
	"mayfly-go/internal/machine/domain/repository"
 | 
			
		||||
	"mayfly-go/internal/machine/mcm"
 | 
			
		||||
	"mayfly-go/pkg/base"
 | 
			
		||||
	"mayfly-go/pkg/errorx"
 | 
			
		||||
	"mayfly-go/pkg/logx"
 | 
			
		||||
	"mayfly-go/pkg/model"
 | 
			
		||||
@@ -19,19 +20,16 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MachineFile interface {
 | 
			
		||||
	base.App[*entity.MachineFile]
 | 
			
		||||
 | 
			
		||||
	// 分页获取机器文件信息列表
 | 
			
		||||
	GetPageList(condition *entity.MachineFile, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
 | 
			
		||||
 | 
			
		||||
	// 根据条件获取
 | 
			
		||||
	GetMachineFile(condition *entity.MachineFile, cols ...string) error
 | 
			
		||||
 | 
			
		||||
	// 根据id获取
 | 
			
		||||
	GetById(id uint64, cols ...string) *entity.MachineFile
 | 
			
		||||
 | 
			
		||||
	Save(ctx context.Context, entity *entity.MachineFile) error
 | 
			
		||||
 | 
			
		||||
	Delete(ctx context.Context, id uint64) error
 | 
			
		||||
 | 
			
		||||
	// 获取文件关联的机器信息,主要用于记录日志使用
 | 
			
		||||
	// GetMachine(fileId uint64) *mcm.Info
 | 
			
		||||
 | 
			
		||||
@@ -75,11 +73,14 @@ type MachineFile interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newMachineFileApp(machineFileRepo repository.MachineFile, machineApp Machine) MachineFile {
 | 
			
		||||
	return &machineFileAppImpl{machineApp: machineApp, machineFileRepo: machineFileRepo}
 | 
			
		||||
 | 
			
		||||
	app := &machineFileAppImpl{machineApp: machineApp, machineFileRepo: machineFileRepo}
 | 
			
		||||
	app.Repo = machineFileRepo
 | 
			
		||||
	return app
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type machineFileAppImpl struct {
 | 
			
		||||
	base.AppImpl[*entity.MachineFile, repository.MachineFile]
 | 
			
		||||
 | 
			
		||||
	machineFileRepo repository.MachineFile
 | 
			
		||||
 | 
			
		||||
	machineApp Machine
 | 
			
		||||
@@ -95,15 +96,6 @@ func (m *machineFileAppImpl) GetMachineFile(condition *entity.MachineFile, cols
 | 
			
		||||
	return m.machineFileRepo.GetBy(condition, cols...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 根据id获取
 | 
			
		||||
func (m *machineFileAppImpl) GetById(id uint64, cols ...string) *entity.MachineFile {
 | 
			
		||||
	mf := new(entity.MachineFile)
 | 
			
		||||
	if err := m.machineFileRepo.GetById(mf, id, cols...); err == nil {
 | 
			
		||||
		return mf
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 保存机器文件配置
 | 
			
		||||
func (m *machineFileAppImpl) Save(ctx context.Context, mf *entity.MachineFile) error {
 | 
			
		||||
	_, err := m.machineApp.GetById(new(entity.Machine), mf.MachineId, "Name")
 | 
			
		||||
@@ -118,10 +110,6 @@ func (m *machineFileAppImpl) Save(ctx context.Context, mf *entity.MachineFile) e
 | 
			
		||||
	return m.machineFileRepo.Insert(ctx, mf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *machineFileAppImpl) Delete(ctx context.Context, id uint64) error {
 | 
			
		||||
	return m.machineFileRepo.DeleteById(ctx, id)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *machineFileAppImpl) ReadDir(fid uint64, path string) ([]fs.FileInfo, error) {
 | 
			
		||||
	if !strings.HasSuffix(path, "/") {
 | 
			
		||||
		path = path + "/"
 | 
			
		||||
@@ -309,8 +297,8 @@ func (m *machineFileAppImpl) Rename(fileId uint64, oldname string, newname strin
 | 
			
		||||
 | 
			
		||||
// 获取文件机器cli
 | 
			
		||||
func (m *machineFileAppImpl) GetMachineCli(fid uint64, inputPath ...string) (*mcm.Cli, error) {
 | 
			
		||||
	mf := m.GetById(fid)
 | 
			
		||||
	if mf == nil {
 | 
			
		||||
	mf, err := m.GetById(new(entity.MachineFile), fid)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, errorx.NewBiz("文件不存在")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,32 @@
 | 
			
		||||
package init
 | 
			
		||||
 | 
			
		||||
import "mayfly-go/internal/machine/application"
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"mayfly-go/internal/common/consts"
 | 
			
		||||
	"mayfly-go/internal/machine/application"
 | 
			
		||||
	"mayfly-go/internal/machine/domain/entity"
 | 
			
		||||
	"mayfly-go/pkg/eventbus"
 | 
			
		||||
	"mayfly-go/pkg/global"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Init() {
 | 
			
		||||
	application.GetMachineCronJobApp().InitCronJob()
 | 
			
		||||
	application.GetMachineApp().TimerUpdateStats()
 | 
			
		||||
 | 
			
		||||
	global.EventBus.Subscribe(consts.DeleteMachineEventTopic, "machineFile", func(ctx context.Context, event *eventbus.Event) error {
 | 
			
		||||
		me := event.Val.(*entity.Machine)
 | 
			
		||||
		return application.GetMachineFileApp().DeleteByCond(ctx, &entity.MachineFile{MachineId: me.Id})
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	global.EventBus.Subscribe(consts.DeleteMachineEventTopic, "machineScript", func(ctx context.Context, event *eventbus.Event) error {
 | 
			
		||||
		me := event.Val.(*entity.Machine)
 | 
			
		||||
		return application.GetMachineScriptApp().DeleteByCond(ctx, &entity.MachineScript{MachineId: me.Id})
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	global.EventBus.Subscribe(consts.DeleteMachineEventTopic, "machineCronJob", func(ctx context.Context, event *eventbus.Event) error {
 | 
			
		||||
		me := event.Val.(*entity.Machine)
 | 
			
		||||
		var jobIds []uint64
 | 
			
		||||
		application.GetMachineCronJobApp().MachineRelateCronJobs(ctx, me.Id, jobIds)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -51,8 +51,20 @@ func (d *mongoAppImpl) GetPageList(condition *entity.MongoQuery, pageParam *mode
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *mongoAppImpl) Delete(ctx context.Context, id uint64) error {
 | 
			
		||||
	mongoEntity, err := d.GetById(new(entity.Mongo), id)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errorx.NewBiz("mongo信息不存在")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mgm.CloseConn(id)
 | 
			
		||||
	return d.GetRepo().DeleteById(ctx, id)
 | 
			
		||||
	return d.Tx(ctx,
 | 
			
		||||
		func(ctx context.Context) error {
 | 
			
		||||
			return d.DeleteById(ctx, id)
 | 
			
		||||
		},
 | 
			
		||||
		func(ctx context.Context) error {
 | 
			
		||||
			var tagIds []uint64
 | 
			
		||||
			return d.tagApp.RelateResource(ctx, mongoEntity.Code, consts.TagResourceTypeMongo, tagIds)
 | 
			
		||||
		})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *mongoAppImpl) TestConn(me *entity.Mongo) error {
 | 
			
		||||
 
 | 
			
		||||
@@ -123,7 +123,13 @@ func (r *redisAppImpl) Delete(ctx context.Context, id uint64) error {
 | 
			
		||||
		db, _ := strconv.Atoi(dbStr)
 | 
			
		||||
		rdm.CloseConn(re.Id, db)
 | 
			
		||||
	}
 | 
			
		||||
	return r.DeleteById(ctx, id)
 | 
			
		||||
 | 
			
		||||
	return r.Tx(ctx, func(ctx context.Context) error {
 | 
			
		||||
		return r.DeleteById(ctx, id)
 | 
			
		||||
	}, func(ctx context.Context) error {
 | 
			
		||||
		var tagIds []uint64
 | 
			
		||||
		return r.tagApp.RelateResource(ctx, re.Code, consts.TagResourceTypeRedis, tagIds)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取数据库连接实例
 | 
			
		||||
 
 | 
			
		||||
@@ -147,6 +147,14 @@ func (p *tagTreeAppImpl) GetAccountResourceCodes(accountId uint64, resourceType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *tagTreeAppImpl) RelateResource(ctx context.Context, resourceCode string, resourceType int8, tagIds []uint64) error {
 | 
			
		||||
	// 如果tagIds为空数组,则为解绑该标签资源关联关系
 | 
			
		||||
	if len(tagIds) == 0 {
 | 
			
		||||
		return p.tagResourceApp.DeleteByCond(ctx, &entity.TagResource{
 | 
			
		||||
			ResourceCode: resourceCode,
 | 
			
		||||
			ResourceType: resourceType,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var oldTagResources []*entity.TagResource
 | 
			
		||||
	p.tagResourceApp.ListByQuery(&entity.TagResourceQuery{ResourceType: resourceType, ResourceCode: resourceCode}, &oldTagResources)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										191
									
								
								server/pkg/eventbus/eventbus.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										191
									
								
								server/pkg/eventbus/eventbus.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,191 @@
 | 
			
		||||
package eventbus
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"mayfly-go/pkg/logx"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// BusSubscriber defines subscription-related bus behavior
 | 
			
		||||
type BusSubscriber interface {
 | 
			
		||||
	Subscribe(topic string, subId string, fn EventHandleFunc) error
 | 
			
		||||
	SubscribeAsync(topic string, subId string, fn EventHandleFunc, transactional bool) error
 | 
			
		||||
	SubscribeOnce(topic string, subId string, fn EventHandleFunc) error
 | 
			
		||||
	Unsubscribe(topic string, subId string) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BusPublisher defines publishing-related bus behavior
 | 
			
		||||
type BusPublisher interface {
 | 
			
		||||
	Publish(ctx context.Context, topic string, val any)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BusController defines bus control behavior (checking handler's presence, synchronization)
 | 
			
		||||
type BusController interface {
 | 
			
		||||
	WaitAsync()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Bus englobes global (subscribe, publish, control) bus behavior
 | 
			
		||||
type Bus interface {
 | 
			
		||||
	BusController
 | 
			
		||||
	BusSubscriber
 | 
			
		||||
	BusPublisher
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EventBus - box for handlers and callbacks.
 | 
			
		||||
type EventBus struct {
 | 
			
		||||
	subscriberManager map[string]*SubscriberManager // topic -> SubscriberManager
 | 
			
		||||
	lock              sync.Mutex                    // a lock for the map
 | 
			
		||||
	wg                sync.WaitGroup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New() Bus {
 | 
			
		||||
	b := &EventBus{
 | 
			
		||||
		make(map[string]*SubscriberManager),
 | 
			
		||||
		sync.Mutex{},
 | 
			
		||||
		sync.WaitGroup{},
 | 
			
		||||
	}
 | 
			
		||||
	return Bus(b)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Event struct {
 | 
			
		||||
	Topic string
 | 
			
		||||
	Val   any
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 订阅者们的事件处理器
 | 
			
		||||
type SubscriberManager struct {
 | 
			
		||||
	// 事件处理器 subId -> handler
 | 
			
		||||
	handlers map[string]*eventHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewSubscriberManager() *SubscriberManager {
 | 
			
		||||
	return &SubscriberManager{
 | 
			
		||||
		handlers: make(map[string]*eventHandler),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sm *SubscriberManager) addSubscriber(subId string, eventHandler *eventHandler) {
 | 
			
		||||
	sm.handlers[subId] = eventHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sm *SubscriberManager) delSubscriber(subId string) {
 | 
			
		||||
	delete(sm.handlers, subId)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 事件处理函数
 | 
			
		||||
type EventHandleFunc func(ctx context.Context, event *Event) error
 | 
			
		||||
 | 
			
		||||
type eventHandler struct {
 | 
			
		||||
	handlerFunc   EventHandleFunc // 事件处理函数
 | 
			
		||||
	once          bool            // 是否只执行一次
 | 
			
		||||
	async         bool
 | 
			
		||||
	transactional bool
 | 
			
		||||
 | 
			
		||||
	sync.Mutex // lock for an event handler - useful for running async callbacks serially
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) Subscribe(topic string, subId string, fn EventHandleFunc) error {
 | 
			
		||||
	eh := &eventHandler{}
 | 
			
		||||
	eh.handlerFunc = fn
 | 
			
		||||
	return bus.doSubscribe(topic, subId, eh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeAsync subscribes to a topic with an asynchronous callback
 | 
			
		||||
// Transactional determines whether subsequent callbacks for a topic are
 | 
			
		||||
func (bus *EventBus) SubscribeAsync(topic string, subId string, fn EventHandleFunc, transactional bool) error {
 | 
			
		||||
	eh := &eventHandler{}
 | 
			
		||||
	eh.handlerFunc = fn
 | 
			
		||||
	eh.async = true
 | 
			
		||||
	return bus.doSubscribe(topic, subId, eh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeOnce subscribes to a topic once. Handler will be removed after executing.
 | 
			
		||||
func (bus *EventBus) SubscribeOnce(topic string, subId string, fn EventHandleFunc) error {
 | 
			
		||||
	eh := &eventHandler{}
 | 
			
		||||
	eh.handlerFunc = fn
 | 
			
		||||
	eh.once = true
 | 
			
		||||
	return bus.doSubscribe(topic, subId, eh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) Unsubscribe(topic string, subId string) error {
 | 
			
		||||
	bus.lock.Lock()
 | 
			
		||||
	defer bus.lock.Unlock()
 | 
			
		||||
	subManager := bus.subscriberManager[topic]
 | 
			
		||||
	if subManager == nil {
 | 
			
		||||
		return errors.New("该主题不存在订阅者")
 | 
			
		||||
	}
 | 
			
		||||
	subManager.delSubscriber(subId)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) Publish(ctx context.Context, topic string, val any) {
 | 
			
		||||
	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
 | 
			
		||||
	defer bus.lock.Unlock()
 | 
			
		||||
	logx.Debugf("主题-[%s]-发布了事件", topic)
 | 
			
		||||
	event := &Event{
 | 
			
		||||
		Topic: topic,
 | 
			
		||||
		Val:   val,
 | 
			
		||||
	}
 | 
			
		||||
	subscriberManager := bus.subscriberManager[topic]
 | 
			
		||||
	if subscriberManager == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	handlers := subscriberManager.handlers
 | 
			
		||||
	if len(handlers) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for subId, handler := range handlers {
 | 
			
		||||
		logx.Debugf("订阅者-[%s]-开始执行主题-[%s]-发布的事件", subId, topic)
 | 
			
		||||
		if handler.once {
 | 
			
		||||
			subscriberManager.delSubscriber(subId)
 | 
			
		||||
		}
 | 
			
		||||
		if !handler.async {
 | 
			
		||||
			bus.doPublish(ctx, handler, event)
 | 
			
		||||
		} else {
 | 
			
		||||
			bus.wg.Add(1)
 | 
			
		||||
			if handler.transactional {
 | 
			
		||||
				bus.lock.Unlock()
 | 
			
		||||
				handler.Lock()
 | 
			
		||||
				bus.lock.Lock()
 | 
			
		||||
			}
 | 
			
		||||
			go bus.doPublishAsync(ctx, handler, event)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitAsync waits for all async callbacks to complete
 | 
			
		||||
func (bus *EventBus) WaitAsync() {
 | 
			
		||||
	bus.wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) doSubscribe(topic string, subId string, handler *eventHandler) error {
 | 
			
		||||
	bus.lock.Lock()
 | 
			
		||||
	defer bus.lock.Unlock()
 | 
			
		||||
	logx.Debugf("订阅者-[%s]-订阅了主题-[%s]", subId, topic)
 | 
			
		||||
	subManager := bus.subscriberManager[topic]
 | 
			
		||||
	if subManager == nil {
 | 
			
		||||
		subManager = NewSubscriberManager()
 | 
			
		||||
		bus.subscriberManager[topic] = subManager
 | 
			
		||||
	}
 | 
			
		||||
	subManager.addSubscriber(subId, handler)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) doPublish(ctx context.Context, handler *eventHandler, event *Event) error {
 | 
			
		||||
	err := handler.handlerFunc(ctx, event)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logx.Errorf("订阅者执行主题[%s]失败: %s", event.Topic, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (bus *EventBus) doPublishAsync(ctx context.Context, handler *eventHandler, event *Event) {
 | 
			
		||||
	defer bus.wg.Done()
 | 
			
		||||
	if handler.transactional {
 | 
			
		||||
		defer handler.Unlock()
 | 
			
		||||
	}
 | 
			
		||||
	bus.doPublish(ctx, handler, event)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										44
									
								
								server/pkg/eventbus/eventbus_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								server/pkg/eventbus/eventbus_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,44 @@
 | 
			
		||||
package eventbus
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestSubscribe(t *testing.T) {
 | 
			
		||||
	bus := New()
 | 
			
		||||
 | 
			
		||||
	bus.SubscribeAsync("topic", "sub5", func(ctx context.Context, event *Event) error {
 | 
			
		||||
		time.Sleep(5 * time.Second)
 | 
			
		||||
		fmt.Printf("%s -> %s -> %d\n", "sub5", event.Topic, event.Val)
 | 
			
		||||
		return nil
 | 
			
		||||
	}, true)
 | 
			
		||||
 | 
			
		||||
	bus.SubscribeOnce("topic", "sub1", func(ctx context.Context, event *Event) error {
 | 
			
		||||
		fmt.Printf("%s -> %s -> %d\n", "sub1", event.Topic, event.Val)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	bus.Subscribe("topic", "sub2", func(ctx context.Context, event *Event) error {
 | 
			
		||||
		time.Sleep(5 * time.Second)
 | 
			
		||||
		return errors.New("失败。。。。")
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	bus.SubscribeAsync("topic", "sub3", func(ctx context.Context, event *Event) error {
 | 
			
		||||
		fmt.Printf("%s -> %s -> %d\n", "sub3", event.Topic, event.Val)
 | 
			
		||||
		return nil
 | 
			
		||||
	}, false)
 | 
			
		||||
 | 
			
		||||
	bus.SubscribeAsync("topic", "sub4", func(ctx context.Context, event *Event) error {
 | 
			
		||||
		time.Sleep(5 * time.Second)
 | 
			
		||||
		fmt.Printf("%s -> %s -> %d\n", "sub4", event.Topic, event.Val)
 | 
			
		||||
		return nil
 | 
			
		||||
	}, false)
 | 
			
		||||
 | 
			
		||||
	bus.Publish(context.Background(), "topic", 10)
 | 
			
		||||
	bus.Publish(context.Background(), "topic", 20)
 | 
			
		||||
	bus.WaitAsync()
 | 
			
		||||
}
 | 
			
		||||
@@ -1,9 +1,13 @@
 | 
			
		||||
package global
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"mayfly-go/pkg/eventbus"
 | 
			
		||||
 | 
			
		||||
	"gorm.io/gorm"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	Db *gorm.DB // gorm
 | 
			
		||||
 | 
			
		||||
	EventBus eventbus.Bus = eventbus.New()
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user