diff --git a/internal/nodes/ip_library_updater.go b/internal/nodes/ip_library_updater.go new file mode 100644 index 0000000..df0c303 --- /dev/null +++ b/internal/nodes/ip_library_updater.go @@ -0,0 +1,103 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + "io" + "os" +) + +type IPLibraryUpdater struct { +} + +func NewIPLibraryUpdater() *IPLibraryUpdater { + return &IPLibraryUpdater{} +} + +// DataDir 文件目录 +func (this *IPLibraryUpdater) DataDir() string { + // data/ + var dir = Tea.Root + "/data" + stat, err := os.Stat(dir) + if err == nil && stat.IsDir() { + return dir + } + + err = os.Mkdir(dir, 0666) + if err == nil { + return dir + } + + remotelogs.Error("IP_LIBRARY_UPDATER", "create directory '"+dir+"' failed: "+err.Error()) + + // 如果不能创建 data/ 目录,那么使用临时目录 + return os.TempDir() +} + +// FindLatestFile 检查最新的IP库文件 +func (this *IPLibraryUpdater) FindLatestFile() (code string, fileId int64, err error) { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return "", 0, err + } + resp, err := rpcClient.IPLibraryArtifactRPC().FindPublicIPLibraryArtifact(rpcClient.Context(), &pb.FindPublicIPLibraryArtifactRequest{}) + if err != nil { + return "", 0, err + } + var artifact = resp.IpLibraryArtifact + if artifact == nil { + return + } + return artifact.Code, artifact.FileId, nil +} + +// DownloadFile 下载文件 +func (this *IPLibraryUpdater) DownloadFile(fileId int64, writer io.Writer) error { + if fileId <= 0 { + return errors.New("invalid fileId: " + types.String(fileId)) + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + + chunkIdsResp, err := rpcClient.FileChunkRPC().FindAllFileChunkIds(rpcClient.Context(), &pb.FindAllFileChunkIdsRequest{FileId: fileId}) + if err != nil { + return err + } + for _, chunkId := range chunkIdsResp.FileChunkIds { + chunkResp, err := rpcClient.FileChunkRPC().DownloadFileChunk(rpcClient.Context(), &pb.DownloadFileChunkRequest{FileChunkId: chunkId}) + if err != nil { + return err + } + var chunk = chunkResp.FileChunk + if chunk == nil { + return errors.New("can not find file chunk with chunk id '" + types.String(chunkId) + "'") + } + _, err = writer.Write(chunk.Data) + if err != nil { + return err + } + } + return nil +} + +// LogInfo 普通日志 +func (this *IPLibraryUpdater) LogInfo(message string) { + remotelogs.Println("IP_LIBRARY_UPDATER", message) +} + +// LogError 错误日志 +func (this *IPLibraryUpdater) LogError(err error) { + if err == nil { + return + } + remotelogs.Error("IP_LIBRARY_UPDATER", err.Error()) +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index d785415..7ee4791 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -111,10 +111,13 @@ func (this *Node) Start() { // 启动IP库 remotelogs.Println("NODE", "initializing ip library ...") - err = iplib.Init() + err = iplib.InitDefault() if err != nil { remotelogs.Error("NODE", "initialize ip library failed: "+err.Error()) } + goman.New(func() { + iplib.NewUpdater(NewIPLibraryUpdater(), 10*time.Minute).Start() + }) // 检查硬盘类型 this.checkDisk() diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index d1c2df2..6dfe139 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -79,6 +79,10 @@ func (this *RPCClient) IPLibraryRPC() pb.IPLibraryServiceClient { return pb.NewIPLibraryServiceClient(this.pickConn()) } +func (this *RPCClient) IPLibraryArtifactRPC() pb.IPLibraryArtifactServiceClient { + return pb.NewIPLibraryArtifactServiceClient(this.pickConn()) +} + func (this *RPCClient) RegionCountryRPC() pb.RegionCountryServiceClient { return pb.NewRegionCountryServiceClient(this.pickConn()) } @@ -141,7 +145,7 @@ func (this *RPCClient) SSLCertRPC() pb.SSLCertServiceClient { // Context 节点上下文信息 func (this *RPCClient) Context() context.Context { - ctx := context.Background() + var ctx = context.Background() m := maps.Map{ "timestamp": time.Now().Unix(), "type": "node", @@ -157,7 +161,7 @@ func (this *RPCClient) Context() context.Context { utils.PrintError(err) return context.Background() } - token := base64.StdEncoding.EncodeToString(data) + var token = base64.StdEncoding.EncodeToString(data) ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", this.apiConfig.NodeId, "token", token) return ctx }