实现缓存策略的部分功能

This commit is contained in:
GoEdgeLab
2020-10-04 14:27:05 +08:00
parent bfa6227a82
commit de5ad48070
49 changed files with 1444 additions and 189 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/iwind/TeaGo/rands"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net/url"
"time"
)
@@ -30,7 +31,19 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
conns := []*grpc.ClientConn{}
for _, endpoint := range apiConfig.RPC.Endpoints {
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
u, err := url.Parse(endpoint)
if err != nil {
return nil, errors.New("parse endpoint failed: " + err.Error())
}
var conn *grpc.ClientConn
if u.Scheme == "http" {
conn, err = grpc.Dial(u.Host, grpc.WithInsecure())
} else if u.Scheme == "https" {
// TODO 暂不支持HTTPS
conn, err = grpc.Dial(u.Host)
} else {
return nil, errors.New("parse endpoint failed: invalid scheme '" + u.Scheme + "'")
}
if err != nil {
return nil, err
}

View File

@@ -1,8 +1,8 @@
package rpc
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeAdmin/internal/configs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/iwind/TeaGo/bootstrap"
stringutil "github.com/iwind/TeaGo/utils/string"
"testing"
@@ -31,3 +31,66 @@ func TestRPCClient_NodeRPC(t *testing.T) {
}
t.Log(resp)
}
func TestRPC_Dial_HTTP(t *testing.T) {
client, err := NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: []string{"127.0.0.1:8003"},
},
NodeId: "a7e55782dab39bce0901058a1e14a0e6",
Secret: "lvyPobI3BszkJopz5nPTocOs0OLkEJ7y",
})
if err != nil {
t.Fatal(err)
}
resp, err := client.NodeRPC().FindEnabledNode(client.Context(1), &pb.FindEnabledNodeRequest{NodeId: 4})
if err != nil {
t.Fatal(err)
}
t.Log(resp.Node)
}
func TestRPC_Dial_HTTP_2(t *testing.T) {
client, err := NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: []string{"http://127.0.0.1:8003"},
},
NodeId: "a7e55782dab39bce0901058a1e14a0e6",
Secret: "lvyPobI3BszkJopz5nPTocOs0OLkEJ7y",
})
if err != nil {
t.Fatal(err)
}
resp, err := client.NodeRPC().FindEnabledNode(client.Context(1), &pb.FindEnabledNodeRequest{NodeId: 4})
if err != nil {
t.Fatal(err)
}
t.Log(resp.Node)
}
func TestRPC_Dial_HTTPS(t *testing.T) {
client, err := NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: []string{"https://127.0.0.1:8004"},
},
NodeId: "a7e55782dab39bce0901058a1e14a0e6",
Secret: "lvyPobI3BszkJopz5nPTocOs0OLkEJ7y",
})
if err != nil {
t.Fatal(err)
}
resp, err := client.NodeRPC().FindEnabledNode(client.Context(1), &pb.FindEnabledNodeRequest{NodeId: 4})
if err != nil {
t.Fatal(err)
}
t.Log(resp.Node)
}

View File

@@ -37,11 +37,10 @@ func (this *IndexAction) RunGet(params struct{}) {
for _, node := range nodesResp.Nodes {
nodeMaps = append(nodeMaps, maps.Map{
"id": node.Id,
"isOn": node.IsOn,
"name": node.Name,
"host": node.Host,
"port": node.Port,
"id": node.Id,
"isOn": node.IsOn,
"name": node.Name,
"accessAddrs": node.AccessAddrs,
})
}
}

View File

@@ -1,8 +1,11 @@
package node
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs"
"github.com/iwind/TeaGo/actions"
)
@@ -19,27 +22,113 @@ func (this *CreateAction) RunGet(params struct{}) {
}
func (this *CreateAction) RunPost(params struct {
Name string
Host string
Port int
Description string
Name string
Description string
ListensJSON []byte
CertIdsJSON []byte
AccessAddrsJSON []byte
IsOn bool
Must *actions.Must
}) {
params.Must.
Field("name", params.Name).
Require("请输入API节点").
Field("host", params.Host).
Require("请输入主机地址").
Field("port", params.Port).
Gt(0, "端口不能小于1").
Lte(65535, "端口不能大于65535")
Require("请输入API节点名称")
_, err := this.RPC().APINodeRPC().CreateAPINode(this.AdminContext(), &pb.CreateAPINodeRequest{
Name: params.Name,
Description: params.Description,
Host: params.Host,
Port: int32(params.Port),
httpConfig := &serverconfigs.HTTPProtocolConfig{}
httpsConfig := &serverconfigs.HTTPSProtocolConfig{}
// 监听地址
listens := []*serverconfigs.NetworkAddressConfig{}
err := json.Unmarshal(params.ListensJSON, &listens)
if err != nil {
this.ErrorPage(err)
return
}
if len(listens) == 0 {
this.Fail("请添加至少一个进程监听地址")
}
for _, addr := range listens {
if addr.Protocol.IsHTTPFamily() {
httpConfig.IsOn = true
httpConfig.Listen = append(httpConfig.Listen, addr)
} else if addr.Protocol.IsHTTPSFamily() {
httpsConfig.IsOn = true
httpsConfig.Listen = append(httpsConfig.Listen, addr)
}
}
// 证书
certIds := []int64{}
if len(params.CertIdsJSON) > 0 {
err = json.Unmarshal(params.CertIdsJSON, &certIds)
if err != nil {
this.ErrorPage(err)
return
}
}
if httpsConfig.IsOn && len(httpsConfig.Listen) > 0 && len(certIds) == 0 {
this.Fail("请添加至少一个证书")
}
certRefs := []*sslconfigs.SSLCertRef{}
for _, certId := range certIds {
certRefs = append(certRefs, &sslconfigs.SSLCertRef{
IsOn: true,
CertId: certId,
})
}
certRefsJSON, err := json.Marshal(certRefs)
if err != nil {
this.ErrorPage(err)
return
}
// 创建策略
if len(certIds) > 0 {
sslPolicyCreateResp, err := this.RPC().SSLPolicyRPC().CreateSSLPolicy(this.AdminContext(), &pb.CreateSSLPolicyRequest{
CertsJSON: certRefsJSON,
})
if err != nil {
this.ErrorPage(err)
return
}
sslPolicyId := sslPolicyCreateResp.SslPolicyId
httpsConfig.SSLPolicyRef = &sslconfigs.SSLPolicyRef{
IsOn: true,
SSLPolicyId: sslPolicyId,
}
}
// 访问地址
accessAddrs := []*serverconfigs.NetworkAddressConfig{}
err = json.Unmarshal(params.AccessAddrsJSON, &accessAddrs)
if err != nil {
this.ErrorPage(err)
return
}
if len(accessAddrs) == 0 {
this.Fail("请添加至少一个外部访问地址")
}
httpJSON, err := json.Marshal(httpConfig)
if err != nil {
this.ErrorPage(err)
return
}
httpsJSON, err := json.Marshal(httpsConfig)
if err != nil {
this.ErrorPage(err)
return
}
_, err = this.RPC().APINodeRPC().CreateAPINode(this.AdminContext(), &pb.CreateAPINodeRequest{
Name: params.Name,
Description: params.Description,
HttpJSON: httpJSON,
HttpsJSON: httpsJSON,
AccessAddrsJSON: params.AccessAddrsJSON,
IsOn: params.IsOn,
})
if err != nil {
this.ErrorPage(err)

View File

@@ -0,0 +1,44 @@
package node
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/actions"
"net"
)
// 添加地址
type CreateAddrPopupAction struct {
actionutils.ParentAction
}
func (this *CreateAddrPopupAction) Init() {
this.Nav("", "", "")
}
func (this *CreateAddrPopupAction) RunGet(params struct {
}) {
this.Show()
}
func (this *CreateAddrPopupAction) RunPost(params struct {
Protocol string
Addr string
Must *actions.Must
}) {
params.Must.
Field("addr", params.Addr).
Require("请输入访问地址")
host, port, err := net.SplitHostPort(params.Addr)
if err != nil {
this.FailField("addr", "错误的访问地址")
}
addrConfig := &serverconfigs.NetworkAddressConfig{
Protocol: serverconfigs.Protocol(params.Protocol),
Host: host,
PortRange: port,
}
this.Data["addr"] = addrConfig
this.Success()
}

View File

@@ -2,8 +2,8 @@ package node
import (
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/actions"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
@@ -48,8 +48,12 @@ func (this *Helper) BeforeAction(action *actions.ActionObject) (goNext bool) {
// 顶部Tab栏
selectedTabbar, _ := action.Data["mainTab"]
tabbar := actionutils.NewTabbar()
tabbar.Add("当前节点:"+node.Name, "", "/api", "left long alternate arrow", false)
tabbar.Add("节点列表", "", "/api", "", false)
tabbar.Add("设置", "", "/api/node/settings?nodeId="+nodeIdString, "setting", selectedTabbar == "setting")
{
m := tabbar.Add("当前节点:"+node.Name, "", "", "", false)
m["right"] = true
}
actionutils.SetTabbar(action, tabbar)
// 左侧菜单栏

View File

@@ -9,12 +9,17 @@ func init() {
TeaGo.BeforeStart(func(server *TeaGo.Server) {
server.
Helper(helpers.NewUserMustAuth()).
Helper(NewHelper()).
Prefix("/api/node").
// 这里不受Helper的约束
GetPost("/createAddrPopup", new(CreateAddrPopupAction)).
GetPost("/updateAddrPopup", new(UpdateAddrPopupAction)).
// 节点相关
Helper(NewHelper()).
GetPost("/settings", new(SettingsAction)).
EndAll()
})
}

View File

@@ -1,8 +1,11 @@
package node
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs"
"github.com/iwind/TeaGo/actions"
"github.com/iwind/TeaGo/maps"
)
@@ -32,12 +35,69 @@ func (this *SettingsAction) RunGet(params struct {
return
}
httpConfig := &serverconfigs.HTTPProtocolConfig{}
if len(node.HttpJSON) > 0 {
err = json.Unmarshal(node.HttpJSON, httpConfig)
if err != nil {
this.ErrorPage(err)
return
}
}
httpsConfig := &serverconfigs.HTTPSProtocolConfig{}
if len(node.HttpsJSON) > 0 {
err = json.Unmarshal(node.HttpsJSON, httpsConfig)
if err != nil {
this.ErrorPage(err)
return
}
}
// 监听地址
listens := []*serverconfigs.NetworkAddressConfig{}
listens = append(listens, httpConfig.Listen...)
listens = append(listens, httpsConfig.Listen...)
// 证书信息
certs := []*sslconfigs.SSLCertConfig{}
sslPolicyId := int64(0)
if httpsConfig.SSLPolicyRef != nil && httpsConfig.SSLPolicyRef.SSLPolicyId > 0 {
sslPolicyConfigResp, err := this.RPC().SSLPolicyRPC().FindEnabledSSLPolicyConfig(this.AdminContext(), &pb.FindEnabledSSLPolicyConfigRequest{SslPolicyId: httpsConfig.SSLPolicyRef.SSLPolicyId})
if err != nil {
this.ErrorPage(err)
return
}
sslPolicyConfigJSON := sslPolicyConfigResp.SslPolicyJSON
if len(sslPolicyConfigJSON) > 0 {
sslPolicyId = httpsConfig.SSLPolicyRef.SSLPolicyId
sslPolicy := &sslconfigs.SSLPolicy{}
err = json.Unmarshal(sslPolicyConfigJSON, sslPolicy)
if err != nil {
this.ErrorPage(err)
return
}
certs = sslPolicy.Certs
}
}
accessAddrs := []*serverconfigs.NetworkAddressConfig{}
if len(node.AccessAddrsJSON) > 0 {
err = json.Unmarshal(node.AccessAddrsJSON, &accessAddrs)
if err != nil {
this.ErrorPage(err)
return
}
}
this.Data["node"] = maps.Map{
"id": node.Id,
"name": node.Name,
"description": node.Description,
"host": node.Host,
"port": node.Port,
"isOn": node.IsOn,
"listens": listens,
"certs": certs,
"sslPolicyId": sslPolicyId,
"accessAddrs": accessAddrs,
}
this.Show()
@@ -45,29 +105,128 @@ func (this *SettingsAction) RunGet(params struct {
// 保存基础设置
func (this *SettingsAction) RunPost(params struct {
NodeId int64
Name string
Host string
Port int
Description string
NodeId int64
Name string
SslPolicyId int64
ListensJSON []byte
CertIdsJSON []byte
AccessAddrsJSON []byte
Description string
IsOn bool
Must *actions.Must
}) {
params.Must.
Field("name", params.Name).
Require("请输入API节点").
Field("host", params.Host).
Require("请输入主机地址").
Field("port", params.Port).
Gt(0, "端口不能小于1").
Lte(65535, "端口不能大于65535")
Require("请输入API节点名称")
_, err := this.RPC().APINodeRPC().UpdateAPINode(this.AdminContext(), &pb.UpdateAPINodeRequest{
NodeId: params.NodeId,
Name: params.Name,
Description: params.Description,
Host: params.Host,
Port: int32(params.Port),
httpConfig := &serverconfigs.HTTPProtocolConfig{}
httpsConfig := &serverconfigs.HTTPSProtocolConfig{}
// 监听地址
listens := []*serverconfigs.NetworkAddressConfig{}
err := json.Unmarshal(params.ListensJSON, &listens)
if err != nil {
this.ErrorPage(err)
return
}
if len(listens) == 0 {
this.Fail("请添加至少一个进程监听地址")
}
for _, addr := range listens {
if addr.Protocol.IsHTTPFamily() {
httpConfig.IsOn = true
httpConfig.Listen = append(httpConfig.Listen, addr)
} else if addr.Protocol.IsHTTPSFamily() {
httpsConfig.IsOn = true
httpsConfig.Listen = append(httpsConfig.Listen, addr)
}
}
// 证书
certIds := []int64{}
if len(params.CertIdsJSON) > 0 {
err = json.Unmarshal(params.CertIdsJSON, &certIds)
if err != nil {
this.ErrorPage(err)
return
}
}
if httpsConfig.IsOn && len(httpsConfig.Listen) > 0 && len(certIds) == 0 {
this.Fail("请添加至少一个证书")
}
certRefs := []*sslconfigs.SSLCertRef{}
for _, certId := range certIds {
certRefs = append(certRefs, &sslconfigs.SSLCertRef{
IsOn: true,
CertId: certId,
})
}
certRefsJSON, err := json.Marshal(certRefs)
if err != nil {
this.ErrorPage(err)
return
}
// 创建策略
sslPolicyId := params.SslPolicyId
if sslPolicyId == 0 {
if len(certIds) > 0 {
sslPolicyCreateResp, err := this.RPC().SSLPolicyRPC().CreateSSLPolicy(this.AdminContext(), &pb.CreateSSLPolicyRequest{
CertsJSON: certRefsJSON,
})
if err != nil {
this.ErrorPage(err)
return
}
sslPolicyId = sslPolicyCreateResp.SslPolicyId
}
} else {
_, err = this.RPC().SSLPolicyRPC().UpdateSSLPolicy(this.AdminContext(), &pb.UpdateSSLPolicyRequest{
SslPolicyId: sslPolicyId,
CertsJSON: certRefsJSON,
})
if err != nil {
this.ErrorPage(err)
return
}
}
httpsConfig.SSLPolicyRef = &sslconfigs.SSLPolicyRef{
IsOn: true,
SSLPolicyId: sslPolicyId,
}
// 访问地址
accessAddrs := []*serverconfigs.NetworkAddressConfig{}
err = json.Unmarshal(params.AccessAddrsJSON, &accessAddrs)
if err != nil {
this.ErrorPage(err)
return
}
if len(accessAddrs) == 0 {
this.Fail("请添加至少一个外部访问地址")
}
httpJSON, err := json.Marshal(httpConfig)
if err != nil {
this.ErrorPage(err)
return
}
httpsJSON, err := json.Marshal(httpsConfig)
if err != nil {
this.ErrorPage(err)
return
}
_, err = this.RPC().APINodeRPC().UpdateAPINode(this.AdminContext(), &pb.UpdateAPINodeRequest{
NodeId: params.NodeId,
Name: params.Name,
Description: params.Description,
HttpJSON: httpJSON,
HttpsJSON: httpsJSON,
AccessAddrsJSON: params.AccessAddrsJSON,
IsOn: params.IsOn,
})
if err != nil {
this.ErrorPage(err)

View File

@@ -0,0 +1,42 @@
package node
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/actions"
"net"
)
type UpdateAddrPopupAction struct {
actionutils.ParentAction
}
func (this *UpdateAddrPopupAction) Init() {
this.Nav("", "", "")
}
func (this *UpdateAddrPopupAction) RunGet(params struct{}) {
this.Show()
}
func (this *UpdateAddrPopupAction) RunPost(params struct {
Protocol string
Addr string
Must *actions.Must
}) {
params.Must.
Field("addr", params.Addr).
Require("请输入访问地址")
host, port, err := net.SplitHostPort(params.Addr)
if err != nil {
this.FailField("addr", "错误的访问地址")
}
addrConfig := &serverconfigs.NetworkAddressConfig{
Protocol: serverconfigs.Protocol(params.Protocol),
Host: host,
PortRange: port,
}
this.Data["addr"] = addrConfig
this.Success()
}

View File

@@ -1,8 +1,8 @@
package node
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/actions"
"github.com/iwind/TeaGo/maps"
"strings"
@@ -76,7 +76,7 @@ func (this *InstallAction) RunGet(params struct {
apiNodes := apiNodesResp.Nodes
apiEndpoints := []string{}
for _, apiNode := range apiNodes {
apiEndpoints = append(apiEndpoints, apiNode.Address)
apiEndpoints = append(apiEndpoints, apiNode.AccessAddrs...)
}
this.Data["apiEndpoints"] = "\"" + strings.Join(apiEndpoints, "\", \"") + "\""

View File

@@ -0,0 +1,155 @@
package nodeutils
import (
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/configs"
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"strconv"
"sync"
)
type MessageResult struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
IsOK bool `json:"isOk"`
Message string `json:"message"`
}
// 向集群发送命令消息
func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindAllEnabledNodesWithClusterId(ctx, &pb.FindAllEnabledNodesWithClusterIdRequest{ClusterId: clusterId})
if err != nil {
return results, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
if apiNodeResp.Node == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
apiNode := apiNodeResp.Node
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
return
}

View File

@@ -0,0 +1,22 @@
package nodeutils
import (
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"testing"
)
func TestSendMessageToCluster(t *testing.T) {
rpcClient, err := rpc.SharedRPC()
if err != nil {
t.Fatal(err)
}
ctx := rpcClient.Context(1)
results, err := SendMessageToCluster(ctx, 1, "test", nil, 30)
if err != nil {
t.Fatal(err)
}
logs.PrintAsJSON(results, t)
}

View File

@@ -0,0 +1,38 @@
package cacheutils
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/errors"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
)
// 查找缓存策略名称并忽略错误
func FindCachePolicyNameWithoutError(parent *actionutils.ParentAction, cachePolicyId int64) string {
policy, err := FindCachePolicy(parent, cachePolicyId)
if err != nil {
return ""
}
if policy == nil {
return ""
}
return policy.Name
}
// 查找缓存策略配置
func FindCachePolicy(parent *actionutils.ParentAction, cachePolicyId int64) (*serverconfigs.HTTPCachePolicy, error) {
resp, err := parent.RPC().HTTPCachePolicyRPC().FindEnabledHTTPCachePolicyConfig(parent.AdminContext(), &pb.FindEnabledHTTPCachePolicyConfigRequest{CachePolicyId: cachePolicyId})
if err != nil {
return nil, err
}
if len(resp.CachePolicyJSON) == 0 {
return nil, errors.New("cache policy not found")
}
config := &serverconfigs.HTTPCachePolicy{}
err = json.Unmarshal(resp.CachePolicyJSON, config)
if err != nil {
return nil, err
}
return config, nil
}

View File

@@ -7,7 +7,7 @@ type CleanAction struct {
}
func (this *CleanAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "clean")
}
func (this *CleanAction) RunGet(params struct{}) {

View File

@@ -1,8 +1,11 @@
package cache
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/servers/components/cache/cacheutils"
"github.com/iwind/TeaGo/actions"
"net/http"
"reflect"
)
type Helper struct {
@@ -12,11 +15,23 @@ func NewHelper() *Helper {
return &Helper{}
}
func (this *Helper) BeforeAction(action *actions.ActionObject) {
func (this *Helper) BeforeAction(actionPtr actions.ActionWrapper) {
action := actionPtr.Object()
if action.Request.Method != http.MethodGet {
return
}
action.Data["mainTab"] = "component"
action.Data["secondMenuItem"] = "cache"
cachePolicyId := action.ParamInt64("cachePolicyId")
action.Data["cachePolicyId"] = cachePolicyId
parentActionValue := reflect.ValueOf(actionPtr).Elem().FieldByName("ParentAction")
if parentActionValue.IsValid() {
parentAction, isOk := parentActionValue.Interface().(actionutils.ParentAction)
if isOk {
action.Data["cachePolicyName"] = cacheutils.FindCachePolicyNameWithoutError(&parentAction, cachePolicyId)
}
}
}

View File

@@ -16,13 +16,15 @@ func init() {
Get("", new(IndexAction)).
GetPost("/createPopup", new(CreatePopupAction)).
Get("/policy", new(PolicyAction)).
GetPost("/updatePopup", new(UpdatePopupAction)).
GetPost("/update", new(UpdateAction)).
GetPost("/clean", new(CleanAction)).
GetPost("/preheat", new(PreheatAction)).
GetPost("/purge", new(PurgeAction)).
GetPost("/stat", new(StatAction)).
GetPost("/test", new(TestAction)).
Post("/delete", new(DeleteAction)).
Post("/testRead", new(TestReadAction)).
Post("/testWrite", new(TestWriteAction)).
EndAll()
})
}

View File

@@ -1,15 +1,30 @@
package cache
import "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/servers/components/cache/cacheutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
)
type PolicyAction struct {
actionutils.ParentAction
}
func (this *PolicyAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "index")
}
func (this *PolicyAction) RunGet(params struct{}) {
func (this *PolicyAction) RunGet(params struct {
CachePolicyId int64
}) {
cachePolicy, err := cacheutils.FindCachePolicy(this.Parent(), params.CachePolicyId)
if err != nil {
this.ErrorPage(err)
return
}
this.Data["cachePolicy"] = cachePolicy
this.Data["typeName"] = serverconfigs.FindCachePolicyTypeName(cachePolicy.Type)
this.Show()
}

View File

@@ -7,7 +7,7 @@ type PreheatAction struct {
}
func (this *PreheatAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "preheat")
}
func (this *PreheatAction) RunGet(params struct{}) {

View File

@@ -7,7 +7,7 @@ type PurgeAction struct {
}
func (this *PurgeAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "purge")
}
func (this *PurgeAction) RunGet(params struct{}) {

View File

@@ -7,7 +7,7 @@ type StatAction struct {
}
func (this *StatAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "stat")
}
func (this *StatAction) RunGet(params struct{}) {

View File

@@ -1,15 +1,34 @@
package cache
import "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/maps"
)
type TestAction struct {
actionutils.ParentAction
}
func (this *TestAction) Init() {
this.Nav("", "", "")
this.Nav("", "", "test")
}
func (this *TestAction) RunGet(params struct{}) {
// 集群列表
clustersResp, err := this.RPC().NodeClusterRPC().FindAllEnabledNodeClusters(this.AdminContext(), &pb.FindAllEnabledNodeClustersRequest{})
if err != nil {
this.ErrorPage(err)
return
}
clusterMaps := []maps.Map{}
for _, cluster := range clustersResp.Clusters {
clusterMaps = append(clusterMaps, maps.Map{
"id": cluster.Id,
"name": cluster.Name,
})
}
this.Data["clusters"] = clusterMaps
this.Show()
}

View File

@@ -0,0 +1,52 @@
package cache
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/nodes/nodeutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
type TestReadAction struct {
actionutils.ParentAction
}
func (this *TestReadAction) RunPost(params struct {
ClusterId int64
CachePolicyId int64
Key string
}) {
cachePolicyResp, err := this.RPC().HTTPCachePolicyRPC().FindEnabledHTTPCachePolicyConfig(this.AdminContext(), &pb.FindEnabledHTTPCachePolicyConfigRequest{CachePolicyId: params.CachePolicyId})
if err != nil {
this.ErrorPage(err)
return
}
cachePolicyJSON := cachePolicyResp.CachePolicyJSON
if len(cachePolicyJSON) == 0 {
this.Fail("找不到要操作的缓存策略")
}
// 发送命令
msg := &messageconfigs.ReadCacheMessage{
CachePolicyJSON: cachePolicyJSON,
Key: params.Key,
}
results, err := nodeutils.SendMessageToCluster(this.AdminContext(), params.ClusterId, messageconfigs.MessageCodeReadCache, msg, 10)
if err != nil {
this.ErrorPage(err)
return
}
isAllOk := true
for _, result := range results {
if !result.IsOK {
isAllOk = false
break
}
}
this.Data["isAllOk"] = isAllOk
this.Data["results"] = results
this.Success()
}

View File

@@ -0,0 +1,55 @@
package cache
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/nodes/nodeutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
type TestWriteAction struct {
actionutils.ParentAction
}
func (this *TestWriteAction) RunPost(params struct {
ClusterId int64
CachePolicyId int64
Key string
Value string
}) {
cachePolicyResp, err := this.RPC().HTTPCachePolicyRPC().FindEnabledHTTPCachePolicyConfig(this.AdminContext(), &pb.FindEnabledHTTPCachePolicyConfigRequest{CachePolicyId: params.CachePolicyId})
if err != nil {
this.ErrorPage(err)
return
}
cachePolicyJSON := cachePolicyResp.CachePolicyJSON
if len(cachePolicyJSON) == 0 {
this.Fail("找不到要操作的缓存策略")
}
// 发送命令
msg := &messageconfigs.WriteCacheMessage{
CachePolicyJSON: cachePolicyJSON,
Key: params.Key,
Value: []byte(params.Value),
LifeSeconds: 3600,
}
results, err := nodeutils.SendMessageToCluster(this.AdminContext(), params.ClusterId, messageconfigs.MessageCodeWriteCache, msg, 10)
if err != nil {
this.ErrorPage(err)
return
}
isAllOk := true
for _, result := range results {
if !result.IsOK {
isAllOk = false
break
}
}
this.Data["isAllOk"] = isAllOk
this.Data["results"] = results
this.Success()
}

View File

@@ -8,15 +8,15 @@ import (
"github.com/iwind/TeaGo/actions"
)
type UpdatePopupAction struct {
type UpdateAction struct {
actionutils.ParentAction
}
func (this *UpdatePopupAction) Init() {
this.Nav("", "", "")
func (this *UpdateAction) Init() {
this.Nav("", "", "update")
}
func (this *UpdatePopupAction) RunGet(params struct {
func (this *UpdateAction) RunGet(params struct {
CachePolicyId int64
}) {
configResp, err := this.RPC().HTTPCachePolicyRPC().FindEnabledHTTPCachePolicyConfig(this.AdminContext(), &pb.FindEnabledHTTPCachePolicyConfigRequest{CachePolicyId: params.CachePolicyId})
@@ -44,7 +44,7 @@ func (this *UpdatePopupAction) RunGet(params struct {
this.Show()
}
func (this *UpdatePopupAction) RunPost(params struct {
func (this *UpdateAction) RunPost(params struct {
CachePolicyId int64
Name string

View File

@@ -78,7 +78,7 @@ func (this *ServerHelper) createLeftMenu(action *actions.ActionObject) {
// TABBAR
selectedTabbar, _ := action.Data["mainTab"]
tabbar := actionutils.NewTabbar()
tabbar.Add("服务首页", "", "/servers", "", false)
tabbar.Add("服务列表", "", "/servers", "", false)
//tabbar.Add("看板", "", "/servers/server/board?serverId="+serverIdString, "dashboard", selectedTabbar == "board")
tabbar.Add("日志", "", "/servers/server/log?serverId="+serverIdString, "history", selectedTabbar == "log")
//tabbar.Add("统计", "", "/servers/server/stat?serverId="+serverIdString, "chart area", selectedTabbar == "stat")