EdgeDNS:访问日志增加集群和记录类型筛选

This commit is contained in:
GoEdgeLab
2022-07-27 20:19:29 +08:00
parent 6e03bdc505
commit 03861b59b0
3 changed files with 54 additions and 12 deletions

View File

@@ -111,7 +111,7 @@ func (this *NSAccessLogDAO) CreateNSAccessLogsWithDAO(tx *dbs.Tx, daoWrapper *NS
} }
// ListAccessLogs 读取往前的 单页访问日志 // ListAccessLogs 读取往前的 单页访问日志
func (this *NSAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string, size int64, day string, nodeId int64, domainId int64, recordId int64, keyword string, reverse bool) (result []*NSAccessLog, nextLastRequestId string, hasMore bool, err error) { func (this *NSAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string, size int64, day string, clusterId int64, nodeId int64, domainId int64, recordId int64, recordType string, keyword string, reverse bool) (result []*NSAccessLog, nextLastRequestId string, hasMore bool, err error) {
if len(day) != 8 { if len(day) != 8 {
return return
} }
@@ -121,24 +121,24 @@ func (this *NSAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string, siz
size = 1000 size = 1000
} }
result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, nodeId, domainId, recordId, keyword, reverse) result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, clusterId, nodeId, domainId, recordId, recordType, keyword, reverse)
if err != nil || int64(len(result)) < size { if err != nil || int64(len(result)) < size {
return return
} }
moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, nodeId, domainId, recordId, keyword, reverse) moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, clusterId, nodeId, domainId, recordId, recordType, keyword, reverse)
hasMore = len(moreResult) > 0 hasMore = len(moreResult) > 0
return return
} }
// 读取往前的单页访问日志 // 读取往前的单页访问日志
func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, size int64, day string, nodeId int64, domainId int64, recordId int64, keyword string, reverse bool) (result []*NSAccessLog, nextLastRequestId string, err error) { func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, size int64, day string, clusterId int64, nodeId int64, domainId int64, recordId int64, recordType string, keyword string, reverse bool) (result []*NSAccessLog, nextLastRequestId string, err error) {
if size <= 0 { if size <= 0 {
return nil, lastRequestId, nil return nil, lastRequestId, nil
} }
accessLogLocker.RLock() accessLogLocker.RLock()
daoList := []*NSAccessLogDAOWrapper{} var daoList = []*NSAccessLogDAOWrapper{}
for _, daoWrapper := range nsAccessLogDAOMapping { for _, daoWrapper := range nsAccessLogDAOMapping {
daoList = append(daoList, daoWrapper) daoList = append(daoList, daoWrapper)
} }
@@ -151,10 +151,23 @@ func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, siz
}} }}
} }
locker := sync.Mutex{} // 检查是否有集群筛选条件
var nodeIds []int64
if clusterId > 0 && nodeId <= 0 {
nodeIds, err = SharedNSNodeDAO.FindEnabledNodeIdsWithClusterId(tx, clusterId)
if err != nil {
return
}
if len(nodeIds) == 0 {
// 没有任何节点则直接返回空
return nil, "", nil
}
}
count := len(daoList) var locker = sync.Mutex{}
wg := &sync.WaitGroup{}
var count = len(daoList)
var wg = &sync.WaitGroup{}
wg.Add(count) wg.Add(count)
for _, daoWrapper := range daoList { for _, daoWrapper := range daoList {
go func(daoWrapper *NSAccessLogDAOWrapper) { go func(daoWrapper *NSAccessLogDAOWrapper) {
@@ -172,11 +185,14 @@ func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, siz
return return
} }
query := dao.Query(tx) var query = dao.Query(tx)
// 条件 // 条件
if nodeId > 0 { if nodeId > 0 {
query.Attr("nodeId", nodeId) query.Attr("nodeId", nodeId)
} else if clusterId > 0 {
query.Attr("nodeId", nodeIds)
query.Reuse(false)
} }
if domainId > 0 { if domainId > 0 {
query.Attr("domainId", domainId) query.Attr("domainId", domainId)
@@ -202,6 +218,12 @@ func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, siz
Param("keyword", dbutils.QuoteLike(keyword)) Param("keyword", dbutils.QuoteLike(keyword))
} }
// record type
if len(recordType) > 0 {
query.Where("JSON_EXTRACT(content, '$.questionType')=:recordType")
query.Param("recordType", recordType)
}
if !reverse { if !reverse {
query.Desc("requestId") query.Desc("requestId")
} else { } else {
@@ -244,7 +266,7 @@ func (this *NSAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, siz
result = result[:size] result = result[:size]
} }
requestId := result[len(result)-1].RequestId var requestId = result[len(result)-1].RequestId
if reverse { if reverse {
lists.Reverse(result) lists.Reverse(result)
} }

View File

@@ -609,6 +609,26 @@ func (this *NSNodeDAO) UpdateNodeInstallStatus(tx *dbs.Tx, nodeId int64, status
return err return err
} }
// FindEnabledNodeIdsWithClusterId 查找集群下的所有节点
func (this *NSNodeDAO) FindEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64) ([]int64, error) {
if clusterId <= 0 {
return nil, nil
}
ones, err := this.Query(tx).
ResultPk().
Attr("clusterId", clusterId).
State(NSNodeStateEnabled).
FindAll()
if err != nil {
return nil, err
}
var result = []int64{}
for _, one := range ones {
result = append(result, int64(one.(*NSNode).Id))
}
return result, nil
}
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
// TODO 先什么都不做 // TODO 先什么都不做

View File

@@ -52,12 +52,12 @@ func (this *NSAccessLogService) ListNSAccessLogs(ctx context.Context, req *pb.Li
// TODO // TODO
} }
accessLogs, requestId, hasMore, err := models.SharedNSAccessLogDAO.ListAccessLogs(tx, req.RequestId, req.Size, req.Day, req.NsNodeId, req.NsDomainId, req.NsRecordId, req.Keyword, req.Reverse) accessLogs, requestId, hasMore, err := models.SharedNSAccessLogDAO.ListAccessLogs(tx, req.RequestId, req.Size, req.Day, req.NsClusterId, req.NsNodeId, req.NsDomainId, req.NsRecordId, req.RecordType, req.Keyword, req.Reverse)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := []*pb.NSAccessLog{} var result = []*pb.NSAccessLog{}
for _, accessLog := range accessLogs { for _, accessLog := range accessLogs {
a, err := accessLog.ToPB() a, err := accessLog.ToPB()
if err != nil { if err != nil {