Files
EdgeNode/internal/monitor/value_queue.go

85 lines
1.8 KiB
Go
Raw Normal View History

2021-04-29 16:48:47 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package monitor
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/maps"
"time"
)
var SharedValueQueue = NewValueQueue()
func init() {
events.On(events.EventLoaded, func() {
2021-04-29 16:48:47 +08:00
go SharedValueQueue.Start()
})
}
// ValueQueue 数据记录队列
type ValueQueue struct {
valuesChan chan *ItemValue
}
func NewValueQueue() *ValueQueue {
return &ValueQueue{
valuesChan: make(chan *ItemValue, 1024),
}
}
// Start 启动队列
func (this *ValueQueue) Start() {
// 这里单次循环就行因为Loop里已经使用了Range通道
err := this.Loop()
if err != nil {
2021-11-10 21:51:56 +08:00
remotelogs.ErrorObject("MONITOR_QUEUE", err)
2021-04-29 16:48:47 +08:00
}
}
// Add 添加数据
func (this *ValueQueue) Add(item string, value maps.Map) {
valueJSON, err := json.Marshal(value)
if err != nil {
remotelogs.Error("MONITOR_QUEUE", "marshal value error: "+err.Error())
return
}
select {
case this.valuesChan <- &ItemValue{
Item: item,
ValueJSON: valueJSON,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// Loop 单次循环
func (this *ValueQueue) Loop() error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
for value := range this.valuesChan {
_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{
Item: value.Item,
ValueJSON: value.ValueJSON,
CreatedAt: value.CreatedAt,
})
if err != nil {
2021-11-10 21:51:56 +08:00
if rpc.IsConnError(err) {
remotelogs.Warn("MONITOR", err.Error())
} else {
remotelogs.Error("MONITOR", err.Error())
}
continue
2021-04-29 16:48:47 +08:00
}
}
return nil
}