diff --git a/internal/db/models/message_media_dao.go b/internal/db/models/message_media_dao.go new file mode 100644 index 00000000..cadccdcb --- /dev/null +++ b/internal/db/models/message_media_dao.go @@ -0,0 +1,155 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/maps" +) + +const ( + MessageMediaStateEnabled = 1 // 已启用 + MessageMediaStateDisabled = 0 // 已禁用 +) + +type MessageMediaDAO dbs.DAO + +func NewMessageMediaDAO() *MessageMediaDAO { + return dbs.NewDAO(&MessageMediaDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageMedias", + Model: new(MessageMedia), + PkName: "id", + }, + }).(*MessageMediaDAO) +} + +var SharedMessageMediaDAO *MessageMediaDAO + +func init() { + dbs.OnReady(func() { + SharedMessageMediaDAO = NewMessageMediaDAO() + }) +} + +// 启用条目 +func (this *MessageMediaDAO) EnableMessageMedia(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageMediaStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageMediaDAO) DisableMessageMedia(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageMediaStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageMediaDAO) FindEnabledMessageMedia(tx *dbs.Tx, id int64) (*MessageMedia, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageMediaStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageMedia), err +} + +// 根据主键查找名称 +func (this *MessageMediaDAO) FindMessageMediaName(tx *dbs.Tx, id int64) (string, error) { + return this.Query(tx). + Pk(id). + Result("name"). + FindStringCol("") +} + +// 查询所有可用媒介 +func (this *MessageMediaDAO) FindAllEnabledMessageMedias(tx *dbs.Tx) (result []*MessageMedia, err error) { + _, err = this.Query(tx). + State(MessageMediaStateEnabled). + Desc("order"). + AscPk(). + Slice(&result). + FindAll() + return +} + +// 设置当前所有可用的媒介 +func (this *MessageMediaDAO) UpdateMessageMedias(tx *dbs.Tx, mediaMaps []maps.Map) error { + // 新的媒介信息 + mediaTypes := []string{} + for index, m := range mediaMaps { + order := len(mediaMaps) - index + mediaType := m.GetString("type") + mediaTypes = append(mediaTypes, mediaType) + + name := m.GetString("name") + description := m.GetString("description") + userDescription := m.GetString("userDescription") + isOn := m.GetBool("isOn") + + mediaId, err := this.Query(tx). + ResultPk(). + Attr("type", mediaType). + FindInt64Col(0) + if err != nil { + return err + } + op := NewMessageMediaOperator() + if mediaId > 0 { + op.Id = mediaId + } + op.Name = name + op.Type = mediaType + op.Description = description + op.UserDescription = userDescription + op.IsOn = isOn + op.Order = order + op.State = MessageMediaStateEnabled + err = this.Save(tx, op) + if err != nil { + return err + } + } + + // 老的媒介信息 + ones, err := this.Query(tx). + FindAll() + if err != nil { + return err + } + for _, one := range ones { + mediaType := one.(*MessageMedia).Type + if !lists.ContainsString(mediaTypes, mediaType) { + err := this.Query(tx). + Pk(one.(*MessageMedia).Id). + Set("state", MessageMediaStateDisabled). + UpdateQuickly() + if err != nil { + return err + } + } + } + return nil +} + +// 根据类型查找媒介 +func (this *MessageMediaDAO) FindEnabledMediaWithType(tx *dbs.Tx, mediaType string) (*MessageMedia, error) { + one, err := this.Query(tx). + Attr("type", mediaType). + State(MessageMediaStateEnabled). + Find() + if one == nil || err != nil { + return nil, err + } + return one.(*MessageMedia), nil +} diff --git a/internal/db/models/message_media_dao_test.go b/internal/db/models/message_media_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_media_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_media_instance_dao.go b/internal/db/models/message_media_instance_dao.go new file mode 100644 index 00000000..7c8b1565 --- /dev/null +++ b/internal/db/models/message_media_instance_dao.go @@ -0,0 +1,152 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" +) + +const ( + MessageMediaInstanceStateEnabled = 1 // 已启用 + MessageMediaInstanceStateDisabled = 0 // 已禁用 +) + +type MessageMediaInstanceDAO dbs.DAO + +func NewMessageMediaInstanceDAO() *MessageMediaInstanceDAO { + return dbs.NewDAO(&MessageMediaInstanceDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageMediaInstances", + Model: new(MessageMediaInstance), + PkName: "id", + }, + }).(*MessageMediaInstanceDAO) +} + +var SharedMessageMediaInstanceDAO *MessageMediaInstanceDAO + +func init() { + dbs.OnReady(func() { + SharedMessageMediaInstanceDAO = NewMessageMediaInstanceDAO() + }) +} + +// 启用条目 +func (this *MessageMediaInstanceDAO) EnableMessageMediaInstance(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageMediaInstanceStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageMediaInstanceDAO) DisableMessageMediaInstance(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageMediaInstanceStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageMediaInstanceDAO) FindEnabledMessageMediaInstance(tx *dbs.Tx, id int64) (*MessageMediaInstance, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageMediaInstanceStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageMediaInstance), err +} + +// 创建媒介实例 +func (this *MessageMediaInstanceDAO) CreateMediaInstance(tx *dbs.Tx, name string, mediaType string, params maps.Map, description string) (int64, error) { + op := NewMessageMediaInstanceOperator() + op.Name = name + op.MediaType = mediaType + + // 参数 + if params == nil { + params = maps.Map{} + } + paramsJSON, err := json.Marshal(params) + if err != nil { + return 0, err + } + op.Params = paramsJSON + + op.Description = description + + op.IsOn = true + op.State = MessageMediaInstanceStateEnabled + return this.SaveInt64(tx, op) +} + +// 修改媒介实例 +func (this *MessageMediaInstanceDAO) UpdateMediaInstance(tx *dbs.Tx, instanceId int64, name string, mediaType string, params maps.Map, description string, isOn bool) error { + if instanceId <= 0 { + return errors.New("invalid instanceId") + } + + op := NewMessageMediaInstanceOperator() + op.Id = instanceId + op.Name = name + op.MediaType = mediaType + + // 参数 + if params == nil { + params = maps.Map{} + } + paramsJSON, err := json.Marshal(params) + if err != nil { + return err + } + op.Params = paramsJSON + + op.Description = description + op.IsOn = isOn + return this.Save(tx, op) +} + +// 计算接收人数量 +func (this *MessageMediaInstanceDAO) CountAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string) (int64, error) { + query := this.Query(tx) + if len(mediaType) > 0 { + query.Attr("mediaType", mediaType) + } + if len(keyword) > 0 { + query.Where("(name LIKE :keyword OR description LIKE :keyword)"). + Param("keyword", "%"+keyword+"%") + } + return query. + State(MessageMediaInstanceStateEnabled). + Where("mediaType IN (SELECT `type` FROM " + SharedMessageMediaDAO.Table + " WHERE state=1)"). + Count() +} + +// 列出单页接收人 +func (this *MessageMediaInstanceDAO) ListAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string, offset int64, size int64) (result []*MessageMediaInstance, err error) { + query := this.Query(tx) + if len(mediaType) > 0 { + query.Attr("mediaType", mediaType) + } + if len(keyword) > 0 { + query.Where("(name LIKE :keyword OR description LIKE :keyword)"). + Param("keyword", "%"+keyword+"%") + } + _, err = query. + State(MessageMediaInstanceStateEnabled). + Where("mediaType IN (SELECT `type` FROM " + SharedMessageMediaDAO.Table + " WHERE state=1)"). + DescPk(). + Offset(offset). + Limit(size). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/message_media_instance_dao_test.go b/internal/db/models/message_media_instance_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_media_instance_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_media_instance_model.go b/internal/db/models/message_media_instance_model.go new file mode 100644 index 00000000..2bd31687 --- /dev/null +++ b/internal/db/models/message_media_instance_model.go @@ -0,0 +1,26 @@ +package models + +// 消息媒介接收人 +type MessageMediaInstance struct { + Id uint32 `field:"id"` // ID + Name string `field:"name"` // 名称 + IsOn uint8 `field:"isOn"` // 是否启用 + MediaType string `field:"mediaType"` // 媒介类型 + Params string `field:"params"` // 媒介参数 + Description string `field:"description"` // 备注 + State uint8 `field:"state"` // 状态 +} + +type MessageMediaInstanceOperator struct { + Id interface{} // ID + Name interface{} // 名称 + IsOn interface{} // 是否启用 + MediaType interface{} // 媒介类型 + Params interface{} // 媒介参数 + Description interface{} // 备注 + State interface{} // 状态 +} + +func NewMessageMediaInstanceOperator() *MessageMediaInstanceOperator { + return &MessageMediaInstanceOperator{} +} diff --git a/internal/db/models/message_media_instance_model_ext.go b/internal/db/models/message_media_instance_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_media_instance_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/message_media_model.go b/internal/db/models/message_media_model.go new file mode 100644 index 00000000..f4d25b12 --- /dev/null +++ b/internal/db/models/message_media_model.go @@ -0,0 +1,28 @@ +package models + +// 消息媒介 +type MessageMedia struct { + Id uint32 `field:"id"` // ID + Name string `field:"name"` // 名称 + Type string `field:"type"` // 类型 + Description string `field:"description"` // 描述 + UserDescription string `field:"userDescription"` // 用户描述 + IsOn uint8 `field:"isOn"` // 是否启用 + Order uint32 `field:"order"` // 排序 + State uint8 `field:"state"` // 状态 +} + +type MessageMediaOperator struct { + Id interface{} // ID + Name interface{} // 名称 + Type interface{} // 类型 + Description interface{} // 描述 + UserDescription interface{} // 用户描述 + IsOn interface{} // 是否启用 + Order interface{} // 排序 + State interface{} // 状态 +} + +func NewMessageMediaOperator() *MessageMediaOperator { + return &MessageMediaOperator{} +} diff --git a/internal/db/models/message_media_model_ext.go b/internal/db/models/message_media_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_media_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/message_recipient_dao.go b/internal/db/models/message_recipient_dao.go new file mode 100644 index 00000000..1430cea4 --- /dev/null +++ b/internal/db/models/message_recipient_dao.go @@ -0,0 +1,168 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +const ( + MessageRecipientStateEnabled = 1 // 已启用 + MessageRecipientStateDisabled = 0 // 已禁用 +) + +type MessageRecipientDAO dbs.DAO + +func NewMessageRecipientDAO() *MessageRecipientDAO { + return dbs.NewDAO(&MessageRecipientDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageRecipients", + Model: new(MessageRecipient), + PkName: "id", + }, + }).(*MessageRecipientDAO) +} + +var SharedMessageRecipientDAO *MessageRecipientDAO + +func init() { + dbs.OnReady(func() { + SharedMessageRecipientDAO = NewMessageRecipientDAO() + }) +} + +// 启用条目 +func (this *MessageRecipientDAO) EnableMessageRecipient(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageRecipientStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageRecipientDAO) DisableMessageRecipient(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageRecipientStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, id int64) (*MessageRecipient, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageRecipientStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageRecipient), err +} + +// 创建接收人 +func (this *MessageRecipientDAO) CreateRecipient(tx *dbs.Tx, adminId int64, instanceId int64, user string, groupIds []int64, description string) (int64, error) { + op := NewMessageRecipientOperator() + op.AdminId = adminId + op.InstanceId = instanceId + op.User = user + op.Description = description + + // 分组 + if len(groupIds) == 0 { + groupIds = []int64{} + } + groupIdsJSON, err := json.Marshal(groupIds) + if err != nil { + return 0, err + } + op.GroupIds = groupIdsJSON + + op.IsOn = true + op.State = MessageRecipientStateEnabled + return this.SaveInt64(tx, op) +} + +// 修改接收人 +func (this *MessageRecipientDAO) UpdateRecipient(tx *dbs.Tx, recipientId int64, adminId int64, instanceId int64, user string, groupIds []int64, description string, isOn bool) error { + if recipientId <= 0 { + return errors.New("invalid recipientId") + } + + op := NewMessageRecipientOperator() + op.Id = recipientId + op.AdminId = adminId + op.InstanceId = instanceId + op.User = user + + // 分组 + if len(groupIds) == 0 { + groupIds = []int64{} + } + groupIdsJSON, err := json.Marshal(groupIds) + if err != nil { + return err + } + op.GroupIds = groupIdsJSON + + op.Description = description + op.IsOn = isOn + return this.Save(tx, op) +} + +// 计算接收人数量 +func (this *MessageRecipientDAO) CountAllEnabledRecipients(tx *dbs.Tx, adminId int64, groupId int64, mediaType string, keyword string) (int64, error) { + query := this.Query(tx) + if adminId > 0 { + query.Attr("adminId", adminId) + } + if groupId > 0 { + query.Attr("groupId", groupId) + } + if len(mediaType) > 0 { + query.Where("instanceId IN (SELECT id FROM "+SharedMessageMediaInstanceDAO.Table+" WHERE state=1 AND mediaType=:mediaType)"). + Param("mediaType", mediaType) + } + if len(keyword) > 0 { + query.Where("(`user` LIKE :keyword OR description LIKE :keyword)"). + Param("keyword", "%"+keyword+"%") + } + return query. + State(MessageRecipientStateEnabled). + Where("adminId IN (SELECT id FROM " + SharedAdminDAO.Table + " WHERE state=1)"). + Where("instanceId IN (SELECT id FROM " + SharedMessageMediaInstanceDAO.Table + " WHERE state=1)"). + Count() +} + +// 列出单页接收人 +func (this *MessageRecipientDAO) ListAllEnabledRecipients(tx *dbs.Tx, adminId int64, groupId int64, mediaType string, keyword string, offset int64, size int64) (result []*MessageRecipient, err error) { + query := this.Query(tx) + if adminId > 0 { + query.Attr("adminId", adminId) + } + if groupId > 0 { + query.Attr("groupId", groupId) + } + if len(mediaType) > 0 { + query.Where("instanceId IN (SELECT id FROM "+SharedMessageMediaInstanceDAO.Table+" WHERE state=1 AND mediaType=:mediaType)"). + Param("mediaType", mediaType) + } + if len(keyword) > 0 { + query.Where("(`user` LIKE :keyword OR description LIKE :keyword)"). + Param("keyword", "%"+keyword+"%") + } + _, err = query. + State(MessageRecipientStateEnabled). + Where("adminId IN (SELECT id FROM " + SharedAdminDAO.Table + " WHERE state=1)"). + Where("instanceId IN (SELECT id FROM " + SharedMessageMediaInstanceDAO.Table + " WHERE state=1)"). + DescPk(). + Offset(offset). + Limit(size). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/message_recipient_dao_test.go b/internal/db/models/message_recipient_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_recipient_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_recipient_group_dao.go b/internal/db/models/message_recipient_group_dao.go new file mode 100644 index 00000000..77729d90 --- /dev/null +++ b/internal/db/models/message_recipient_group_dao.go @@ -0,0 +1,104 @@ +package models + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +const ( + MessageRecipientGroupStateEnabled = 1 // 已启用 + MessageRecipientGroupStateDisabled = 0 // 已禁用 +) + +type MessageRecipientGroupDAO dbs.DAO + +func NewMessageRecipientGroupDAO() *MessageRecipientGroupDAO { + return dbs.NewDAO(&MessageRecipientGroupDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageRecipientGroups", + Model: new(MessageRecipientGroup), + PkName: "id", + }, + }).(*MessageRecipientGroupDAO) +} + +var SharedMessageRecipientGroupDAO *MessageRecipientGroupDAO + +func init() { + dbs.OnReady(func() { + SharedMessageRecipientGroupDAO = NewMessageRecipientGroupDAO() + }) +} + +// 启用条目 +func (this *MessageRecipientGroupDAO) EnableMessageRecipientGroup(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageRecipientGroupStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageRecipientGroupDAO) DisableMessageRecipientGroup(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageRecipientGroupStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageRecipientGroupDAO) FindEnabledMessageRecipientGroup(tx *dbs.Tx, id int64) (*MessageRecipientGroup, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageRecipientGroupStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageRecipientGroup), err +} + +// 根据主键查找名称 +func (this *MessageRecipientGroupDAO) FindMessageRecipientGroupName(tx *dbs.Tx, id int64) (string, error) { + return this.Query(tx). + Pk(id). + Result("name"). + FindStringCol("") +} + +// 创建分组 +func (this *MessageRecipientGroupDAO) CreateGroup(tx *dbs.Tx, name string) (int64, error) { + op := NewMessageRecipientGroupOperator() + op.Name = name + op.IsOn = true + op.State = MessageRecipientStateEnabled + return this.SaveInt64(tx, op) +} + +// 修改分组 +func (this *MessageRecipientGroupDAO) UpdateGroup(tx *dbs.Tx, groupId int64, name string, isOn bool) error { + if groupId <= 0 { + return errors.New("invalid groupId") + } + op := NewMessageRecipientGroupOperator() + op.Id = groupId + op.Name = name + op.IsOn = isOn + return this.Save(tx, op) +} + +// 查找所有分组 +func (this *MessageRecipientGroupDAO) FindAllEnabledGroups(tx *dbs.Tx) (result []*MessageRecipientGroup, err error) { + _, err = this.Query(tx). + State(MessageRecipientStateEnabled). + Slice(&result). + Desc("order"). + AscPk(). + FindAll() + return +} diff --git a/internal/db/models/message_recipient_group_dao_test.go b/internal/db/models/message_recipient_group_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_recipient_group_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_recipient_group_model.go b/internal/db/models/message_recipient_group_model.go new file mode 100644 index 00000000..ddcf64c4 --- /dev/null +++ b/internal/db/models/message_recipient_group_model.go @@ -0,0 +1,22 @@ +package models + +// 消息接收人分组 +type MessageRecipientGroup struct { + Id uint32 `field:"id"` // ID + Name string `field:"name"` // 分组名 + Order uint32 `field:"order"` // 排序 + IsOn uint8 `field:"isOn"` // 是否启用 + State uint8 `field:"state"` // 状态 +} + +type MessageRecipientGroupOperator struct { + Id interface{} // ID + Name interface{} // 分组名 + Order interface{} // 排序 + IsOn interface{} // 是否启用 + State interface{} // 状态 +} + +func NewMessageRecipientGroupOperator() *MessageRecipientGroupOperator { + return &MessageRecipientGroupOperator{} +} diff --git a/internal/db/models/message_recipient_group_model_ext.go b/internal/db/models/message_recipient_group_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_recipient_group_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/message_recipient_model.go b/internal/db/models/message_recipient_model.go new file mode 100644 index 00000000..c3fbd477 --- /dev/null +++ b/internal/db/models/message_recipient_model.go @@ -0,0 +1,28 @@ +package models + +// 消息媒介接收人 +type MessageRecipient struct { + Id uint32 `field:"id"` // ID + AdminId uint32 `field:"adminId"` // 管理员ID + IsOn uint8 `field:"isOn"` // 是否启用 + InstanceId uint32 `field:"instanceId"` // 实例ID + User string `field:"user"` // 接收人信息 + GroupIds string `field:"groupIds"` // 分组ID + State uint8 `field:"state"` // 状态 + Description string `field:"description"` // 备注 +} + +type MessageRecipientOperator struct { + Id interface{} // ID + AdminId interface{} // 管理员ID + IsOn interface{} // 是否启用 + InstanceId interface{} // 实例ID + User interface{} // 接收人信息 + GroupIds interface{} // 分组ID + State interface{} // 状态 + Description interface{} // 备注 +} + +func NewMessageRecipientOperator() *MessageRecipientOperator { + return &MessageRecipientOperator{} +} diff --git a/internal/db/models/message_recipient_model_ext.go b/internal/db/models/message_recipient_model_ext.go new file mode 100644 index 00000000..fe454a6d --- /dev/null +++ b/internal/db/models/message_recipient_model_ext.go @@ -0,0 +1,20 @@ +package models + +import ( + "encoding/json" + "github.com/iwind/TeaGo/logs" +) + +// 解析分组ID +func (this *MessageRecipient) DecodeGroupIds() []int64 { + if len(this.GroupIds) == 0 { + return []int64{} + } + result := []int64{} + err := json.Unmarshal([]byte(this.GroupIds), &result) + if err != nil { + logs.Println("MessageRecipient.DecodeGroupIds(): " + err.Error()) + // 不阻断执行 + } + return result +} diff --git a/internal/db/models/message_task_dao.go b/internal/db/models/message_task_dao.go new file mode 100644 index 00000000..34db17ac --- /dev/null +++ b/internal/db/models/message_task_dao.go @@ -0,0 +1,117 @@ +package models + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "time" +) + +type MessageTaskStatus = int + +const ( + MessageTaskStateEnabled = 1 // 已启用 + MessageTaskStateDisabled = 0 // 已禁用 + + MessageTaskStatusNone MessageTaskStatus = 0 // 普通状态 + MessageTaskStatusSending MessageTaskStatus = 1 // 发送中 + MessageTaskStatusSuccess MessageTaskStatus = 2 // 发送成功 + MessageTaskStatusFailed MessageTaskStatus = 3 // 发送失败 +) + +type MessageTaskDAO dbs.DAO + +func NewMessageTaskDAO() *MessageTaskDAO { + return dbs.NewDAO(&MessageTaskDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageTasks", + Model: new(MessageTask), + PkName: "id", + }, + }).(*MessageTaskDAO) +} + +var SharedMessageTaskDAO *MessageTaskDAO + +func init() { + dbs.OnReady(func() { + SharedMessageTaskDAO = NewMessageTaskDAO() + }) +} + +// 启用条目 +func (this *MessageTaskDAO) EnableMessageTask(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageTaskStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageTaskDAO) DisableMessageTask(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageTaskStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*MessageTask, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageTaskStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageTask), err +} + +// 创建任务 +func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, instanceId int64, user string, subject string, body string, isPrimary bool) (int64, error) { + op := NewMessageTaskOperator() + op.RecipientId = recipientId + op.InstanceId = instanceId + op.User = user + op.Subject = subject + op.Body = body + op.IsPrimary = isPrimary + op.Status = MessageTaskStatusNone + op.State = MessageTaskStateEnabled + return this.SaveInt64(tx, op) +} + +// 查找需要发送的任务 +func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (result []*MessageTask, err error) { + if size <= 0 { + return nil, nil + } + _, err = this.Query(tx). + State(MessageTaskStateEnabled). + Attr("status", MessageTaskStatusNone). + Desc("isPrimary"). + AscPk(). + Limit(size). + Slice(&result). + FindAll() + return +} + +// 设置发送的状态 +func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error { + if taskId <= 0 { + return errors.New("invalid taskId") + } + op := NewMessageTaskOperator() + op.Id = taskId + op.Status = status + op.SentAt = time.Now().Unix() + if len(result) > 0 { + op.Result = result + } + return this.Save(tx, op) +} diff --git a/internal/db/models/message_task_dao_test.go b/internal/db/models/message_task_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_task_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_task_log_dao.go b/internal/db/models/message_task_log_dao.go new file mode 100644 index 00000000..a3fdeebd --- /dev/null +++ b/internal/db/models/message_task_log_dao.go @@ -0,0 +1,55 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +type MessageTaskLogDAO dbs.DAO + +func NewMessageTaskLogDAO() *MessageTaskLogDAO { + return dbs.NewDAO(&MessageTaskLogDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageTaskLogs", + Model: new(MessageTaskLog), + PkName: "id", + }, + }).(*MessageTaskLogDAO) +} + +var SharedMessageTaskLogDAO *MessageTaskLogDAO + +func init() { + dbs.OnReady(func() { + SharedMessageTaskLogDAO = NewMessageTaskLogDAO() + }) +} + +// 创建日志 +func (this *MessageTaskLogDAO) CreateLog(tx *dbs.Tx, taskId int64, isOk bool, errMsg string, response string) error { + op := NewMessageTaskLogOperator() + op.TaskId = taskId + op.IsOk = isOk + op.Error = errMsg + op.Response = response + return this.Save(tx, op) +} + +// 计算日志数量 +func (this *MessageTaskLogDAO) CountLogs(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + Count() +} + +// 列出单页日志 +func (this *MessageTaskLogDAO) ListLogs(tx *dbs.Tx, offset int64, size int64) (result []*MessageTaskLog, err error) { + _, err = this.Query(tx). + Offset(offset). + Limit(size). + DescPk(). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/message_task_log_dao_test.go b/internal/db/models/message_task_log_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_task_log_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_task_log_model.go b/internal/db/models/message_task_log_model.go new file mode 100644 index 00000000..042158ad --- /dev/null +++ b/internal/db/models/message_task_log_model.go @@ -0,0 +1,24 @@ +package models + +// 消息发送日志 +type MessageTaskLog struct { + Id uint64 `field:"id"` // ID + TaskId uint64 `field:"taskId"` // 任务ID + CreatedAt uint64 `field:"createdAt"` // 创建时间 + IsOk uint8 `field:"isOk"` // 是否成功 + Error string `field:"error"` // 错误信息 + Response string `field:"response"` // 响应信息 +} + +type MessageTaskLogOperator struct { + Id interface{} // ID + TaskId interface{} // 任务ID + CreatedAt interface{} // 创建时间 + IsOk interface{} // 是否成功 + Error interface{} // 错误信息 + Response interface{} // 响应信息 +} + +func NewMessageTaskLogOperator() *MessageTaskLogOperator { + return &MessageTaskLogOperator{} +} diff --git a/internal/db/models/message_task_log_model_ext.go b/internal/db/models/message_task_log_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_task_log_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/message_task_model.go b/internal/db/models/message_task_model.go new file mode 100644 index 00000000..c691253f --- /dev/null +++ b/internal/db/models/message_task_model.go @@ -0,0 +1,36 @@ +package models + +// +type MessageTask struct { + Id uint64 `field:"id"` // ID + RecipientId uint32 `field:"recipientId"` // 接收人ID + InstanceId uint32 `field:"instanceId"` // 媒介实例ID + User string `field:"user"` // 接收用户标识 + Subject string `field:"subject"` // 标题 + Body string `field:"body"` // 内容 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + Status uint8 `field:"status"` // 发送状态 + SentAt uint64 `field:"sentAt"` // 最后一次发送时间 + State uint8 `field:"state"` // 状态 + Result string `field:"result"` // 结果 + IsPrimary uint8 `field:"isPrimary"` // 是否优先 +} + +type MessageTaskOperator struct { + Id interface{} // ID + RecipientId interface{} // 接收人ID + InstanceId interface{} // 媒介实例ID + User interface{} // 接收用户标识 + Subject interface{} // 标题 + Body interface{} // 内容 + CreatedAt interface{} // 创建时间 + Status interface{} // 发送状态 + SentAt interface{} // 最后一次发送时间 + State interface{} // 状态 + Result interface{} // 结果 + IsPrimary interface{} // 是否优先 +} + +func NewMessageTaskOperator() *MessageTaskOperator { + return &MessageTaskOperator{} +} diff --git a/internal/db/models/message_task_model_ext.go b/internal/db/models/message_task_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_task_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/monitor_node_dao.go b/internal/db/models/monitor_node_dao.go new file mode 100644 index 00000000..694e6675 --- /dev/null +++ b/internal/db/models/monitor_node_dao.go @@ -0,0 +1,227 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" + "strconv" +) + +const ( + MonitorNodeStateEnabled = 1 // 已启用 + MonitorNodeStateDisabled = 0 // 已禁用 +) + +type MonitorNodeDAO dbs.DAO + +func NewMonitorNodeDAO() *MonitorNodeDAO { + return dbs.NewDAO(&MonitorNodeDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMonitorNodes", + Model: new(MonitorNode), + PkName: "id", + }, + }).(*MonitorNodeDAO) +} + +var SharedMonitorNodeDAO *MonitorNodeDAO + +func init() { + dbs.OnReady(func() { + SharedMonitorNodeDAO = NewMonitorNodeDAO() + }) +} + +// 启用条目 +func (this *MonitorNodeDAO) EnableMonitorNode(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MonitorNodeStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MonitorNodeDAO) DisableMonitorNode(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MonitorNodeStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MonitorNodeDAO) FindEnabledMonitorNode(tx *dbs.Tx, id int64) (*MonitorNode, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MonitorNodeStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MonitorNode), err +} + +// 根据主键查找名称 +func (this *MonitorNodeDAO) FindMonitorNodeName(tx *dbs.Tx, id int64) (string, error) { + return this.Query(tx). + Pk(id). + Result("name"). + FindStringCol("") +} + +// 列出所有可用监控节点 +func (this *MonitorNodeDAO) FindAllEnabledMonitorNodes(tx *dbs.Tx) (result []*MonitorNode, err error) { + _, err = this.Query(tx). + State(MonitorNodeStateEnabled). + Desc("order"). + AscPk(). + Slice(&result). + FindAll() + return +} + +// 计算监控节点数量 +func (this *MonitorNodeDAO) CountAllEnabledMonitorNodes(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + State(MonitorNodeStateEnabled). + Count() +} + +// 列出单页的监控节点 +func (this *MonitorNodeDAO) ListEnabledMonitorNodes(tx *dbs.Tx, offset int64, size int64) (result []*MonitorNode, err error) { + _, err = this.Query(tx). + State(MonitorNodeStateEnabled). + Offset(offset). + Limit(size). + Desc("order"). + DescPk(). + Slice(&result). + FindAll() + return +} + +// 根据主机名和端口获取ID +func (this *MonitorNodeDAO) FindEnabledMonitorNodeIdWithAddr(tx *dbs.Tx, protocol string, host string, port int) (int64, error) { + addr := maps.Map{ + "protocol": protocol, + "host": host, + "portRange": strconv.Itoa(port), + } + addrJSON, err := json.Marshal(addr) + if err != nil { + return 0, err + } + + one, err := this.Query(tx). + State(MonitorNodeStateEnabled). + Where("JSON_CONTAINS(accessAddrs, :addr)"). + Param("addr", string(addrJSON)). + ResultPk(). + Find() + if err != nil { + return 0, err + } + if one == nil { + return 0, nil + } + return int64(one.(*MonitorNode).Id), nil +} + +// 创建监控节点 +func (this *MonitorNodeDAO) CreateMonitorNode(tx *dbs.Tx, name string, description string, isOn bool) (nodeId int64, err error) { + uniqueId, err := this.GenUniqueId(tx) + if err != nil { + return 0, err + } + secret := rands.String(32) + err = NewApiTokenDAO().CreateAPIToken(tx, uniqueId, secret, NodeRoleMonitor) + if err != nil { + return + } + + op := NewMonitorNodeOperator() + op.IsOn = isOn + op.UniqueId = uniqueId + op.Secret = secret + op.Name = name + op.Description = description + op.State = NodeStateEnabled + err = this.Save(tx, op) + if err != nil { + return + } + + return types.Int64(op.Id), nil +} + +// 修改监控节点 +func (this *MonitorNodeDAO) UpdateMonitorNode(tx *dbs.Tx, nodeId int64, name string, description string, isOn bool) error { + if nodeId <= 0 { + return errors.New("invalid nodeId") + } + + op := NewMonitorNodeOperator() + op.Id = nodeId + op.Name = name + op.Description = description + op.IsOn = isOn + err := this.Save(tx, op) + return err +} + +// 根据唯一ID获取节点信息 +func (this *MonitorNodeDAO) FindEnabledMonitorNodeWithUniqueId(tx *dbs.Tx, uniqueId string) (*MonitorNode, error) { + result, err := this.Query(tx). + Attr("uniqueId", uniqueId). + Attr("state", MonitorNodeStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MonitorNode), err +} + +// 根据唯一ID获取节点ID +func (this *MonitorNodeDAO) FindEnabledMonitorNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string) (int64, error) { + return this.Query(tx). + Attr("uniqueId", uniqueId). + Attr("state", MonitorNodeStateEnabled). + ResultPk(). + FindInt64Col(0) +} + +// 生成唯一ID +func (this *MonitorNodeDAO) GenUniqueId(tx *dbs.Tx) (string, error) { + for { + uniqueId := rands.HexString(32) + ok, err := this.Query(tx). + Attr("uniqueId", uniqueId). + Exist() + if err != nil { + return "", err + } + if ok { + continue + } + return uniqueId, nil + } +} + +// 更改节点状态 +func (this *MonitorNodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error { + if statusJSON == nil { + return nil + } + _, err := this.Query(tx). + Pk(nodeId). + Set("status", string(statusJSON)). + Update() + return err +} diff --git a/internal/db/models/monitor_node_dao_test.go b/internal/db/models/monitor_node_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/monitor_node_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/monitor_node_model.go b/internal/db/models/monitor_node_model.go new file mode 100644 index 00000000..02b928ed --- /dev/null +++ b/internal/db/models/monitor_node_model.go @@ -0,0 +1,36 @@ +package models + +// 监控节点 +type MonitorNode struct { + Id uint32 `field:"id"` // ID + IsOn uint8 `field:"isOn"` // 是否启用 + UniqueId string `field:"uniqueId"` // 唯一ID + Secret string `field:"secret"` // 密钥 + Name string `field:"name"` // 名称 + Description string `field:"description"` // 描述 + Order uint32 `field:"order"` // 排序 + State uint8 `field:"state"` // 状态 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + AdminId uint32 `field:"adminId"` // 管理员ID + Weight uint32 `field:"weight"` // 权重 + Status string `field:"status"` // 运行状态 +} + +type MonitorNodeOperator struct { + Id interface{} // ID + IsOn interface{} // 是否启用 + UniqueId interface{} // 唯一ID + Secret interface{} // 密钥 + Name interface{} // 名称 + Description interface{} // 描述 + Order interface{} // 排序 + State interface{} // 状态 + CreatedAt interface{} // 创建时间 + AdminId interface{} // 管理员ID + Weight interface{} // 权重 + Status interface{} // 运行状态 +} + +func NewMonitorNodeOperator() *MonitorNodeOperator { + return &MonitorNodeOperator{} +} diff --git a/internal/db/models/monitor_node_model_ext.go b/internal/db/models/monitor_node_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/monitor_node_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/user_node_dao.go b/internal/db/models/user_node_dao.go index 8a478060..df215482 100644 --- a/internal/db/models/user_node_dao.go +++ b/internal/db/models/user_node_dao.go @@ -244,6 +244,9 @@ func (this *UserNodeDAO) GenUniqueId(tx *dbs.Tx) (string, error) { // 更改节点状态 func (this *UserNodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error { + if len(statusJSON) == 0 { + return nil + } _, err := this.Query(tx). Pk(nodeId). Set("status", string(statusJSON)). diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 9d6c7655..d804f46e 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -209,6 +209,12 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterNodeLogServiceServer(rpcServer, &services.NodeLogService{}) pb.RegisterHTTPAccessLogServiceServer(rpcServer, &services.HTTPAccessLogService{}) pb.RegisterMessageServiceServer(rpcServer, &services.MessageService{}) + pb.RegisterMessageRecipientServiceServer(rpcServer, &services.MessageRecipientService{}) + pb.RegisterMessageMediaServiceServer(rpcServer, &services.MessageMediaService{}) + pb.RegisterMessageRecipientGroupServiceServer(rpcServer, &services.MessageRecipientGroupService{}) + pb.RegisterMessageMediaInstanceServiceServer(rpcServer, &services.MessageMediaInstanceService{}) + pb.RegisterMessageTaskServiceServer(rpcServer, &services.MessageTaskService{}) + pb.RegisterMessageTaskLogServiceServer(rpcServer, &services.MessageTaskLogService{}) pb.RegisterNodeGroupServiceServer(rpcServer, &services.NodeGroupService{}) pb.RegisterNodeRegionServiceServer(rpcServer, &services.NodeRegionService{}) pb.RegisterNodePriceItemServiceServer(rpcServer, &services.NodePriceItemService{}) @@ -245,6 +251,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterServerHTTPFirewallDailyStatServiceServer(rpcServer, &services.ServerHTTPFirewallDailyStatService{}) pb.RegisterDNSTaskServiceServer(rpcServer, &services.DNSTaskService{}) pb.RegisterNodeClusterFirewallActionServiceServer(rpcServer, &services.NodeClusterFirewallActionService{}) + pb.RegisterMonitorNodeServiceServer(rpcServer, &services.MonitorNodeService{}) err := rpcServer.Serve(listener) if err != nil { return errors.New("[API_NODE]start rpc failed: " + err.Error()) diff --git a/internal/rpc/services/service_api_node.go b/internal/rpc/services/service_api_node.go index f591bd2e..e85c3604 100644 --- a/internal/rpc/services/service_api_node.go +++ b/internal/rpc/services/service_api_node.go @@ -65,7 +65,7 @@ func (this *APINodeService) DeleteAPINode(ctx context.Context, req *pb.DeleteAPI // 列出所有可用API节点 func (this *APINodeService) FindAllEnabledAPINodes(ctx context.Context, req *pb.FindAllEnabledAPINodesRequest) (*pb.FindAllEnabledAPINodesResponse, error) { - _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin, rpcutils.UserTypeUser, rpcutils.UserTypeNode) + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin, rpcutils.UserTypeUser, rpcutils.UserTypeNode, rpcutils.UserTypeMonitor, rpcutils.UserTypeDNS) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_base.go b/internal/rpc/services/service_base.go index d83622d9..141c7b8f 100644 --- a/internal/rpc/services/service_base.go +++ b/internal/rpc/services/service_base.go @@ -82,6 +82,12 @@ func (this *BaseService) ValidateUser(ctx context.Context) (userId int64, err er return } +// 校验监控节点 +func (this *BaseService) ValidateMonitor(ctx context.Context) (nodeId int64, err error) { + _, nodeId, err = rpcutils.ValidateRequest(ctx, rpcutils.UserTypeMonitor) + return +} + // 获取节点ID func (this *BaseService) ValidateNodeId(ctx context.Context, roles ...rpcutils.UserType) (role rpcutils.UserType, nodeIntId int64, err error) { if ctx == nil { @@ -170,6 +176,8 @@ func (this *BaseService) ValidateNodeId(ctx context.Context, roles ...rpcutils.U nodeIntId, err = models.SharedUserNodeDAO.FindEnabledUserNodeIdWithUniqueId(nil, nodeId) case rpcutils.UserTypeAdmin: nodeIntId = 0 + case rpcutils.UserTypeMonitor: + nodeIntId, err = models.SharedMonitorNodeDAO.FindEnabledMonitorNodeIdWithUniqueId(nil, nodeId) default: err = errors.New("unsupported user role '" + apiToken.Role + "'") } diff --git a/internal/rpc/services/service_message_media.go b/internal/rpc/services/service_message_media.go new file mode 100644 index 00000000..cd841b42 --- /dev/null +++ b/internal/rpc/services/service_message_media.go @@ -0,0 +1,64 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/maps" +) + +// 消息媒介服务 +type MessageMediaService struct { + BaseService +} + +// 获取所有支持的媒介 +func (this *MessageMediaService) FindAllMessageMedias(ctx context.Context, req *pb.FindAllMessageMediasRequest) (*pb.FindAllMessageMediasResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + medias, err := models.SharedMessageMediaDAO.FindAllEnabledMessageMedias(tx) + if err != nil { + return nil, err + } + pbMedias := []*pb.MessageMedia{} + for _, media := range medias { + pbMedias = append(pbMedias, &pb.MessageMedia{ + Id: int64(media.Id), + Type: media.Type, + Name: media.Name, + Description: media.Description, + UserDescription: media.UserDescription, + IsOn: media.IsOn == 1, + }) + } + return &pb.FindAllMessageMediasResponse{MessageMedias: pbMedias}, nil +} + +// 设置所有支持的媒介 +func (this *MessageMediaService) UpdateMessageMedias(ctx context.Context, req *pb.UpdateMessageMediasRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } + + mediaMaps := []maps.Map{} + for _, media := range req.MessageMedias { + mediaMaps = append(mediaMaps, maps.Map{ + "name": media.Name, + "type": media.Type, + "description": media.Description, + "userDescription": media.UserDescription, + "isOn": media.IsOn, + }) + } + + var tx = this.NullTx() + err = models.SharedMessageMediaDAO.UpdateMessageMedias(tx, mediaMaps) + if err != nil { + return nil, err + } + return this.Success() +} diff --git a/internal/rpc/services/service_message_media_instance.go b/internal/rpc/services/service_message_media_instance.go new file mode 100644 index 00000000..da39c2f8 --- /dev/null +++ b/internal/rpc/services/service_message_media_instance.go @@ -0,0 +1,181 @@ +package services + +import ( + "context" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/maps" +) + +// 消息媒介实例服务 +type MessageMediaInstanceService struct { + BaseService +} + +// 创建消息媒介实例 +func (this *MessageMediaInstanceService) CreateMessageMediaInstance(ctx context.Context, req *pb.CreateMessageMediaInstanceRequest) (*pb.CreateMessageMediaInstanceResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + + params := maps.Map{} + if len(req.ParamsJSON) > 0 { + err = json.Unmarshal(req.ParamsJSON, ¶ms) + if err != nil { + return nil, err + } + } + + instanceId, err := models.SharedMessageMediaInstanceDAO.CreateMediaInstance(tx, req.Name, req.MediaType, params, req.Description) + if err != nil { + return nil, err + } + + return &pb.CreateMessageMediaInstanceResponse{MessageMediaInstanceId: instanceId}, nil +} + +// 修改消息实例 +func (this *MessageMediaInstanceService) UpdateMessageMediaInstance(ctx context.Context, req *pb.UpdateMessageMediaInstanceRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + params := maps.Map{} + if len(req.ParamsJSON) > 0 { + err = json.Unmarshal(req.ParamsJSON, ¶ms) + if err != nil { + return nil, err + } + } + + var tx = this.NullTx() + err = models.SharedMessageMediaInstanceDAO.UpdateMediaInstance(tx, req.MessageMediaInstanceId, req.Name, req.MediaType, params, req.Description, req.IsOn) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 删除媒介实例 +func (this *MessageMediaInstanceService) DeleteMessageMediaInstance(ctx context.Context, req *pb.DeleteMessageMediaInstanceRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageMediaInstanceDAO.DisableMessageMediaInstance(tx, req.MessageMediaInstanceId) + if err != nil { + return nil, err + } + return this.Success() +} + +// 计算媒介实例数量 +func (this *MessageMediaInstanceService) CountAllEnabledMessageMediaInstances(ctx context.Context, req *pb.CountAllEnabledMessageMediaInstancesRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedMessageMediaInstanceDAO.CountAllEnabledMediaInstances(tx, req.MediaType, req.Keyword) + if err != nil { + return nil, err + } + + return this.SuccessCount(count) +} + +// 列出单页媒介实例 +func (this *MessageMediaInstanceService) ListEnabledMessageMediaInstances(ctx context.Context, req *pb.ListEnabledMessageMediaInstancesRequest) (*pb.ListEnabledMessageMediaInstancesResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + instances, err := models.SharedMessageMediaInstanceDAO.ListAllEnabledMediaInstances(tx, req.MediaType, req.Keyword, req.Offset, req.Size) + if err != nil { + return nil, err + } + pbInstances := []*pb.MessageMediaInstance{} + for _, instance := range instances { + // 媒介 + media, err := models.SharedMessageMediaDAO.FindEnabledMediaWithType(tx, instance.MediaType) + if err != nil { + return nil, err + } + if media == nil { + continue + } + pbMedia := &pb.MessageMedia{ + Id: int64(media.Id), + Type: media.Type, + Name: media.Name, + Description: media.Description, + UserDescription: media.UserDescription, + IsOn: media.IsOn == 1, + } + + pbInstances = append(pbInstances, &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + IsOn: instance.IsOn == 1, + MessageMedia: pbMedia, + ParamsJSON: []byte(instance.Params), + Description: instance.Description, + }) + } + + return &pb.ListEnabledMessageMediaInstancesResponse{MessageMediaInstances: pbInstances}, nil +} + +// 查找单个媒介实例信息 +func (this *MessageMediaInstanceService) FindEnabledMessageMediaInstance(ctx context.Context, req *pb.FindEnabledMessageMediaInstanceRequest) (*pb.FindEnabledMessageMediaInstanceResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, req.MessageMediaInstanceId) + if err != nil { + return nil, err + } + if instance == nil { + return &pb.FindEnabledMessageMediaInstanceResponse{MessageMediaInstance: nil}, nil + } + + // 媒介 + media, err := models.SharedMessageMediaDAO.FindEnabledMediaWithType(tx, instance.MediaType) + if err != nil { + return nil, err + } + if media == nil { + return &pb.FindEnabledMessageMediaInstanceResponse{MessageMediaInstance: nil}, nil + } + pbMedia := &pb.MessageMedia{ + Id: int64(media.Id), + Type: media.Type, + Name: media.Name, + Description: media.Description, + UserDescription: media.UserDescription, + IsOn: media.IsOn == 1, + } + + return &pb.FindEnabledMessageMediaInstanceResponse{MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + IsOn: instance.IsOn == 1, + MessageMedia: pbMedia, + ParamsJSON: []byte(instance.Params), + Description: instance.Description, + }}, nil +} diff --git a/internal/rpc/services/service_message_recipient.go b/internal/rpc/services/service_message_recipient.go new file mode 100644 index 00000000..6507dc7b --- /dev/null +++ b/internal/rpc/services/service_message_recipient.go @@ -0,0 +1,228 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// 消息接收人服务 +type MessageRecipientService struct { + BaseService +} + +// 创建接收人 +func (this *MessageRecipientService) CreateMessageRecipient(ctx context.Context, req *pb.CreateMessageRecipientRequest) (*pb.CreateMessageRecipientResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + recipientId, err := models.SharedMessageRecipientDAO.CreateRecipient(tx, req.AdminId, req.InstanceId, req.User, req.GroupIds, req.Description) + if err != nil { + return nil, err + } + + return &pb.CreateMessageRecipientResponse{MessageRecipientId: recipientId}, nil +} + +// 修改接收人 +func (this *MessageRecipientService) UpdateMessageRecipient(ctx context.Context, req *pb.UpdateMessageRecipientRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageRecipientDAO.UpdateRecipient(tx, req.MessageRecipientId, req.AdminId, req.InstanceId, req.User, req.GroupIds, req.Description, req.IsOn) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 删除接收人 +func (this *MessageRecipientService) DeleteMessageRecipient(ctx context.Context, req *pb.DeleteMessageRecipientRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageRecipientDAO.DisableMessageRecipient(tx, req.MessageRecipientId) + if err != nil { + return nil, err + } + return this.Success() +} + +// 计算接收人数量 +func (this *MessageRecipientService) CountAllEnabledMessageRecipients(ctx context.Context, req *pb.CountAllEnabledMessageRecipientsRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedMessageRecipientDAO.CountAllEnabledRecipients(tx, req.AdminId, req.GroupId, req.MediaType, req.Keyword) + if err != nil { + return nil, err + } + + return this.SuccessCount(count) +} + +// 列出单页接收人 +func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Context, req *pb.ListEnabledMessageRecipientsRequest) (*pb.ListEnabledMessageRecipientsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + recipients, err := models.SharedMessageRecipientDAO.ListAllEnabledRecipients(tx, req.AdminId, req.GroupId, req.MediaType, req.Keyword, req.Offset, req.Size) + if err != nil { + return nil, err + } + pbRecipients := []*pb.MessageRecipient{} + for _, recipient := range recipients { + // admin + admin, err := models.SharedAdminDAO.FindEnabledAdmin(tx, int64(recipient.AdminId)) + if err != nil { + return nil, err + } + if admin == nil { + continue + } + pbAdmin := &pb.Admin{ + Id: int64(admin.Id), + Fullname: admin.Fullname, + Username: admin.Username, + IsOn: admin.IsOn == 1, + } + + // 媒介实例 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil { + continue + } + pbInstance := &pb.MessageMediaInstance{ + Id: int64(instance.Id), + IsOn: instance.IsOn == 1, + Name: instance.Name, + Description: instance.Description, + } + + // 分组 + pbGroups := []*pb.MessageRecipientGroup{} + groupIds := recipient.DecodeGroupIds() + if len(groupIds) > 0 { + for _, groupId := range groupIds { + group, err := models.SharedMessageRecipientGroupDAO.FindEnabledMessageRecipientGroup(tx, groupId) + if err != nil { + return nil, err + } + if group != nil { + pbGroups = append(pbGroups, &pb.MessageRecipientGroup{ + Id: int64(group.Id), + Name: group.Name, + IsOn: group.IsOn == 1, + }) + } + } + } + + pbRecipients = append(pbRecipients, &pb.MessageRecipient{ + Id: int64(recipient.Id), + Admin: pbAdmin, + User: recipient.User, + MessageMediaInstance: pbInstance, + IsOn: recipient.IsOn == 1, + MessageRecipientGroups: pbGroups, + Description: recipient.Description, + }) + } + + return &pb.ListEnabledMessageRecipientsResponse{MessageRecipients: pbRecipients}, nil +} + +// 查找单个接收人信息 +func (this *MessageRecipientService) FindEnabledMessageRecipient(ctx context.Context, req *pb.FindEnabledMessageRecipientRequest) (*pb.FindEnabledMessageRecipientResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, req.MessageRecipientId) + if err != nil { + return nil, err + } + if recipient == nil { + return &pb.FindEnabledMessageRecipientResponse{MessageRecipient: nil}, nil + } + + // admin + admin, err := models.SharedAdminDAO.FindEnabledAdmin(tx, int64(recipient.AdminId)) + if err != nil { + return nil, err + } + if admin == nil { + return &pb.FindEnabledMessageRecipientResponse{MessageRecipient: nil}, nil + } + pbAdmin := &pb.Admin{ + Id: int64(admin.Id), + Fullname: admin.Fullname, + Username: admin.Username, + IsOn: admin.IsOn == 1, + } + + // 媒介实例 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil { + return &pb.FindEnabledMessageRecipientResponse{MessageRecipient: nil}, nil + } + pbInstance := &pb.MessageMediaInstance{ + Id: int64(instance.Id), + IsOn: instance.IsOn == 1, + Name: instance.Name, + Description: instance.Description, + } + + // 分组 + pbGroups := []*pb.MessageRecipientGroup{} + groupIds := recipient.DecodeGroupIds() + if len(groupIds) > 0 { + for _, groupId := range groupIds { + group, err := models.SharedMessageRecipientGroupDAO.FindEnabledMessageRecipientGroup(tx, groupId) + if err != nil { + return nil, err + } + if group != nil { + pbGroups = append(pbGroups, &pb.MessageRecipientGroup{ + Id: int64(group.Id), + Name: group.Name, + IsOn: group.IsOn == 1, + }) + } + } + } + + return &pb.FindEnabledMessageRecipientResponse{MessageRecipient: &pb.MessageRecipient{ + Id: int64(recipient.Id), + User: recipient.User, + Admin: pbAdmin, + MessageMediaInstance: pbInstance, + IsOn: recipient.IsOn == 1, + MessageRecipientGroups: pbGroups, + Description: recipient.Description, + }}, nil +} diff --git a/internal/rpc/services/service_message_recipient_group.go b/internal/rpc/services/service_message_recipient_group.go new file mode 100644 index 00000000..3d9db320 --- /dev/null +++ b/internal/rpc/services/service_message_recipient_group.go @@ -0,0 +1,106 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// 消息接收人分组 +type MessageRecipientGroupService struct { + BaseService +} + +// 创建分组 +func (this *MessageRecipientGroupService) CreateMessageRecipientGroup(ctx context.Context, req *pb.CreateMessageRecipientGroupRequest) (*pb.CreateMessageRecipientGroupResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + groupId, err := models.SharedMessageRecipientGroupDAO.CreateGroup(tx, req.Name) + if err != nil { + return nil, err + } + + return &pb.CreateMessageRecipientGroupResponse{MessageRecipientGroupId: groupId}, nil +} + +// 修改分组 +func (this *MessageRecipientGroupService) UpdateMessageRecipientGroup(ctx context.Context, req *pb.UpdateMessageRecipientGroupRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageRecipientGroupDAO.UpdateGroup(tx, req.MessageRecipientGroupId, req.Name, req.IsOn) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 查找所有可用的分组 +func (this *MessageRecipientGroupService) FindAllEnabledMessageRecipientGroups(ctx context.Context, req *pb.FindAllEnabledMessageRecipientGroupsRequest) (*pb.FindAllEnabledMessageRecipientGroupsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + groups, err := models.SharedMessageRecipientGroupDAO.FindAllEnabledGroups(tx) + if err != nil { + return nil, err + } + pbGroups := []*pb.MessageRecipientGroup{} + for _, group := range groups { + pbGroups = append(pbGroups, &pb.MessageRecipientGroup{ + Id: int64(group.Id), + Name: group.Name, + IsOn: group.IsOn == 1, + }) + } + + return &pb.FindAllEnabledMessageRecipientGroupsResponse{MessageRecipientGroups: pbGroups}, nil +} + +// 删除分组 +func (this *MessageRecipientGroupService) DeleteMessageRecipientGroup(ctx context.Context, req *pb.DeleteMessageRecipientGroupRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageRecipientGroupDAO.DisableMessageRecipientGroup(tx, req.MessageRecipientGroupId) + if err != nil { + return nil, err + } + return this.Success() +} + +// 查找单个分组信息 +func (this *MessageRecipientGroupService) FindEnabledMessageRecipientGroup(ctx context.Context, req *pb.FindEnabledMessageRecipientGroupRequest) (*pb.FindEnabledMessageRecipientGroupResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + group, err := models.SharedMessageRecipientGroupDAO.FindEnabledMessageRecipientGroup(tx, req.MessageRecipientGroupId) + if err != nil { + return nil, err + } + if group == nil { + return &pb.FindEnabledMessageRecipientGroupResponse{MessageRecipientGroup: nil}, nil + } + pbGroup := &pb.MessageRecipientGroup{ + Id: int64(group.Id), + IsOn: group.IsOn == 1, + Name: group.Name, + } + return &pb.FindEnabledMessageRecipientGroupResponse{MessageRecipientGroup: pbGroup}, nil +} diff --git a/internal/rpc/services/service_message_task.go b/internal/rpc/services/service_message_task.go new file mode 100644 index 00000000..98f29c2b --- /dev/null +++ b/internal/rpc/services/service_message_task.go @@ -0,0 +1,284 @@ +package services + +import ( + "context" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" +) + +// 消息发送任务服务 +type MessageTaskService struct { + BaseService +} + +// 创建任务 +func (this *MessageTaskService) CreateMessageTask(ctx context.Context, req *pb.CreateMessageTaskRequest) (*pb.CreateMessageTaskResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + taskId, err := models.SharedMessageTaskDAO.CreateMessageTask(tx, req.RecipientId, req.InstanceId, req.User, req.Subject, req.Body, req.IsPrimary) + if err != nil { + return nil, err + } + return &pb.CreateMessageTaskResponse{MessageTaskId: taskId}, nil +} + +// 查找要发送的任务 +func (this *MessageTaskService) FindSendingMessageTasks(ctx context.Context, req *pb.FindSendingMessageTasksRequest) (*pb.FindSendingMessageTasksResponse, error) { + _, err := this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + tasks, err := models.SharedMessageTaskDAO.FindSendingMessageTasks(tx, req.Size) + if err != nil { + return nil, err + } + pbTasks := []*pb.MessageTask{} + for _, task := range tasks { + var pbRecipient *pb.MessageRecipient + if task.RecipientId > 0 { + // TODO 需要缓存以提升性能 + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + if err != nil { + return nil, err + } + if recipient == nil || recipient.IsOn == 0 { + // 如果发送人已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + + // 媒介 + // TODO 需要缓存以提升性能 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + + pbRecipient = &pb.MessageRecipient{ + Id: int64(recipient.Id), + User: recipient.User, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } else { // 没有指定既定的接收人 + // 媒介 + // TODO 需要缓存以提升性能 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + pbRecipient = &pb.MessageRecipient{ + Id: 0, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } + + pbTasks = append(pbTasks, &pb.MessageTask{ + Id: int64(task.Id), + MessageRecipient: pbRecipient, + User: task.User, + Subject: task.Subject, + Body: task.Body, + CreatedAt: int64(task.CreatedAt), + Status: types.Int32(task.Status), + SentAt: int64(task.SentAt), + }) + } + return &pb.FindSendingMessageTasksResponse{MessageTasks: pbTasks}, nil +} + +// 修改任务状态 +func (this *MessageTaskService) UpdateMessageTaskStatus(ctx context.Context, req *pb.UpdateMessageTaskStatusRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + resultJSON := []byte{} + if req.Result != nil { + resultJSON, err = json.Marshal(maps.Map{ + "isOk": req.Result.IsOk, + "error": req.Result.Error, + "response": req.Result.Response, + }) + if err != nil { + return nil, err + } + } + + err = models.SharedMessageTaskDAO.UpdateMessageTaskStatus(tx, req.MessageTaskId, int(req.Status), resultJSON) + if err != nil { + return nil, err + } + + // 创建发送记录 + if (int(req.Status) == models.MessageTaskStatusSuccess || int(req.Status) == models.MessageTaskStatusFailed) && req.Result != nil { + err = models.SharedMessageTaskLogDAO.CreateLog(tx, req.MessageTaskId, req.Result.IsOk, req.Result.Error, req.Result.Response) + if err != nil { + return nil, err + } + } + + return this.Success() +} + +// 删除消息任务 +func (this *MessageTaskService) DeleteMessageTask(ctx context.Context, req *pb.DeleteMessageTaskRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, req.MessageTaskId) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 读取消息任务状态 +func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req *pb.FindEnabledMessageTaskRequest) (*pb.FindEnabledMessageTaskResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + task, err := models.SharedMessageTaskDAO.FindEnabledMessageTask(tx, req.MessageTaskId) + if err != nil { + return nil, err + } + if task == nil { + return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil + } + + // TODO 需要缓存以提升性能 + var pbRecipient *pb.MessageRecipient + if task.RecipientId > 0 { + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + if err != nil { + return nil, err + } + if recipient == nil || recipient.IsOn == 0 { + // 如果发送人已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil + } + + // 媒介 + // TODO 需要缓存以提升性能 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil + } + + pbRecipient = &pb.MessageRecipient{ + + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } else { // 没有指定既定的接收人 + // 媒介 + // TODO 需要缓存以提升性能 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil + } + pbRecipient = &pb.MessageRecipient{ + Id: 0, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } + + var result = &pb.MessageTaskResult{} + if len(task.Result) > 0 { + err = json.Unmarshal([]byte(task.Result), result) + if err != nil { + return nil, err + } + } + + return &pb.FindEnabledMessageTaskResponse{MessageTask: &pb.MessageTask{ + Id: int64(task.Id), + MessageRecipient: pbRecipient, + User: task.User, + Subject: task.Subject, + Body: task.Body, + CreatedAt: int64(task.CreatedAt), + Status: int32(task.Status), + SentAt: int64(task.SentAt), + Result: result, + }}, nil +} diff --git a/internal/rpc/services/service_message_task_log.go b/internal/rpc/services/service_message_task_log.go new file mode 100644 index 00000000..7eb69e3d --- /dev/null +++ b/internal/rpc/services/service_message_task_log.go @@ -0,0 +1,96 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// 消息发送日志相关服务 +type MessageTaskLogService struct { + BaseService +} + +// 计算日志数量 +func (this *MessageTaskLogService) CountMessageTaskLogs(ctx context.Context, req *pb.CountMessageTaskLogsRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + count, err := models.SharedMessageTaskLogDAO.CountLogs(tx) + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} + +// 列出当页日志 +func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req *pb.ListMessageTaskLogsRequest) (*pb.ListMessageTaskLogsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + logs, err := models.SharedMessageTaskLogDAO.ListLogs(tx, req.Offset, req.Size) + if err != nil { + return nil, err + } + + pbLogs := []*pb.MessageTaskLog{} + for _, log := range logs { + task, err := models.SharedMessageTaskDAO.FindEnabledMessageTask(tx, int64(log.TaskId)) + if err != nil { + return nil, err + } + if task == nil { + continue + } + + var pbRecipient *pb.MessageRecipient + if task.RecipientId > 0 { + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + if err != nil { + return nil, err + } + if recipient != nil { + pbRecipient = &pb.MessageRecipient{ + Id: int64(recipient.Id), + User: recipient.User, + } + } + } + + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil { + continue + } + + pbLogs = append(pbLogs, &pb.MessageTaskLog{ + Id: int64(log.Id), + CreatedAt: int64(log.CreatedAt), + IsOk: log.IsOk == 1, + Error: log.Error, + Response: log.Response, + MessageTask: &pb.MessageTask{ + Id: int64(task.Id), + MessageRecipient: pbRecipient, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + }, + User: task.User, + Subject: task.Subject, + Body: task.Body, + CreatedAt: int64(task.CreatedAt), + Status: int32(task.Status), + SentAt: int64(task.SentAt), + Result: nil, + }, + }) + } + return &pb.ListMessageTaskLogsResponse{MessageTaskLogs: pbLogs}, nil +} diff --git a/internal/rpc/services/service_monitor_node.go b/internal/rpc/services/service_monitor_node.go new file mode 100644 index 00000000..13b39b0f --- /dev/null +++ b/internal/rpc/services/service_monitor_node.go @@ -0,0 +1,233 @@ +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "google.golang.org/grpc/metadata" +) + +type MonitorNodeService struct { + BaseService +} + +// 创建监控节点 +func (this *MonitorNodeService) CreateMonitorNode(ctx context.Context, req *pb.CreateMonitorNodeRequest) (*pb.CreateMonitorNodeResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + nodeId, err := models.SharedMonitorNodeDAO.CreateMonitorNode(tx, req.Name, req.Description, req.IsOn) + if err != nil { + return nil, err + } + + return &pb.CreateMonitorNodeResponse{NodeId: nodeId}, nil +} + +// 修改监控节点 +func (this *MonitorNodeService) UpdateMonitorNode(ctx context.Context, req *pb.UpdateMonitorNodeRequest) (*pb.RPCSuccess, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + err = models.SharedMonitorNodeDAO.UpdateMonitorNode(tx, req.NodeId, req.Name, req.Description, req.IsOn) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 删除监控节点 +func (this *MonitorNodeService) DeleteMonitorNode(ctx context.Context, req *pb.DeleteMonitorNodeRequest) (*pb.RPCSuccess, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + err = models.SharedMonitorNodeDAO.DisableMonitorNode(tx, req.NodeId) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 列出所有可用监控节点 +func (this *MonitorNodeService) FindAllEnabledMonitorNodes(ctx context.Context, req *pb.FindAllEnabledMonitorNodesRequest) (*pb.FindAllEnabledMonitorNodesResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + nodes, err := models.SharedMonitorNodeDAO.FindAllEnabledMonitorNodes(tx) + if err != nil { + return nil, err + } + + result := []*pb.MonitorNode{} + for _, node := range nodes { + result = append(result, &pb.MonitorNode{ + Id: int64(node.Id), + IsOn: node.IsOn == 1, + UniqueId: node.UniqueId, + Secret: node.Secret, + Name: node.Name, + Description: node.Description, + }) + } + + return &pb.FindAllEnabledMonitorNodesResponse{Nodes: result}, nil +} + +// 计算监控节点数量 +func (this *MonitorNodeService) CountAllEnabledMonitorNodes(ctx context.Context, req *pb.CountAllEnabledMonitorNodesRequest) (*pb.RPCCountResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + count, err := models.SharedMonitorNodeDAO.CountAllEnabledMonitorNodes(tx) + if err != nil { + return nil, err + } + + return this.SuccessCount(count) +} + +// 列出单页的监控节点 +func (this *MonitorNodeService) ListEnabledMonitorNodes(ctx context.Context, req *pb.ListEnabledMonitorNodesRequest) (*pb.ListEnabledMonitorNodesResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + nodes, err := models.SharedMonitorNodeDAO.ListEnabledMonitorNodes(tx, req.Offset, req.Size) + if err != nil { + return nil, err + } + + result := []*pb.MonitorNode{} + for _, node := range nodes { + result = append(result, &pb.MonitorNode{ + Id: int64(node.Id), + IsOn: node.IsOn == 1, + UniqueId: node.UniqueId, + Secret: node.Secret, + Name: node.Name, + Description: node.Description, + StatusJSON: []byte(node.Status), + }) + } + + return &pb.ListEnabledMonitorNodesResponse{Nodes: result}, nil +} + +// 根据ID查找节点 +func (this *MonitorNodeService) FindEnabledMonitorNode(ctx context.Context, req *pb.FindEnabledMonitorNodeRequest) (*pb.FindEnabledMonitorNodeResponse, error) { + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + node, err := models.SharedMonitorNodeDAO.FindEnabledMonitorNode(tx, req.NodeId) + if err != nil { + return nil, err + } + + if node == nil { + return &pb.FindEnabledMonitorNodeResponse{Node: nil}, nil + } + + result := &pb.MonitorNode{ + Id: int64(node.Id), + IsOn: node.IsOn == 1, + UniqueId: node.UniqueId, + Secret: node.Secret, + Name: node.Name, + Description: node.Description, + } + return &pb.FindEnabledMonitorNodeResponse{Node: result}, nil +} + +// 获取当前监控节点的版本 +func (this *MonitorNodeService) FindCurrentMonitorNode(ctx context.Context, req *pb.FindCurrentMonitorNodeRequest) (*pb.FindCurrentMonitorNodeResponse, error) { + _, err := this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errors.New("context: need 'nodeId'") + } + nodeIds := md.Get("nodeid") + if len(nodeIds) == 0 { + return nil, errors.New("invalid 'nodeId'") + } + nodeId := nodeIds[0] + node, err := models.SharedMonitorNodeDAO.FindEnabledMonitorNodeWithUniqueId(tx, nodeId) + if err != nil { + return nil, err + } + + if node == nil { + return &pb.FindCurrentMonitorNodeResponse{Node: nil}, nil + } + + result := &pb.MonitorNode{ + Id: int64(node.Id), + IsOn: node.IsOn == 1, + UniqueId: node.UniqueId, + Secret: node.Secret, + Name: node.Name, + Description: node.Description, + } + return &pb.FindCurrentMonitorNodeResponse{Node: result}, nil +} + +// 更新节点状态 +func (this *MonitorNodeService) UpdateMonitorNodeStatus(ctx context.Context, req *pb.UpdateMonitorNodeStatusRequest) (*pb.RPCSuccess, error) { + // 校验节点 + _, nodeId, err := this.ValidateNodeId(ctx, rpcutils.UserTypeMonitor) + if err != nil { + return nil, err + } + + if req.NodeId > 0 { + nodeId = req.NodeId + } + + if nodeId <= 0 { + return nil, errors.New("'nodeId' should be greater than 0") + } + + tx := this.NullTx() + + err = models.SharedMonitorNodeDAO.UpdateNodeStatus(tx, nodeId, req.StatusJSON) + if err != nil { + return nil, err + } + return this.Success() +} diff --git a/internal/rpc/services/service_sys_locker.go b/internal/rpc/services/service_sys_locker.go index 23f6c8ad..fdd5cd38 100644 --- a/internal/rpc/services/service_sys_locker.go +++ b/internal/rpc/services/service_sys_locker.go @@ -15,7 +15,10 @@ type SysLockerService struct { func (this *SysLockerService) SysLockerLock(ctx context.Context, req *pb.SysLockerLockRequest) (*pb.SysLockerLockResponse, error) { _, userId, err := this.ValidateAdminAndUser(ctx, 0, 0) if err != nil { - return nil, err + _, err = this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } } key := req.Key @@ -42,7 +45,10 @@ func (this *SysLockerService) SysLockerLock(ctx context.Context, req *pb.SysLock func (this *SysLockerService) SysLockerUnlock(ctx context.Context, req *pb.SysLockerUnlockRequest) (*pb.RPCSuccess, error) { _, userId, err := this.ValidateAdminAndUser(ctx, 0, 0) if err != nil { - return nil, err + _, err = this.ValidateMonitor(ctx) + if err != nil { + return nil, err + } } key := req.Key diff --git a/internal/rpc/utils/utils.go b/internal/rpc/utils/utils.go index c77575ad..df26f3ed 100644 --- a/internal/rpc/utils/utils.go +++ b/internal/rpc/utils/utils.go @@ -154,6 +154,8 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT } nodeUserId = clusterId case UserTypeUser: + case UserTypeMonitor: + case UserTypeDNS: } if nodeUserId > 0 {