diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index a48c75c..3bf8fdf 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -160,7 +160,7 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf // 严格查找域名 func (this *BaseListener) findNamedServerMatched(name string) (serverConfig *serverconfigs.ServerConfig, serverName string) { - group := this.Group + var group = this.Group if group == nil { return nil, "" } diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index 79e29fd..e09b72f 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -175,6 +175,15 @@ func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http. } } + // 检查用户 + if server != nil && server.UserId > 0 { + if !SharedUserManager.CheckUserServersIsEnabled(server.UserId) { + rawWriter.WriteHeader(http.StatusNotFound) + _, _ = rawWriter.Write([]byte("The site owner is unavailable.")) + return + } + } + // 包装新请求对象 var req = &HTTPRequest{ RawReq: rawReq, diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 0bd93ec..afe7b7c 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -92,7 +92,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } // 是否已达到流量限制 - if this.reachedTrafficLimit() { + if this.reachedTrafficLimit() || (server.UserId > 0 && !SharedUserManager.CheckUserServersIsEnabled(server.UserId)) { // 关闭连接 tcpConn, ok := conn.(LingerConn) if ok { diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index 89a46a7..12e7c54 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -170,6 +170,11 @@ func (this *UDPListener) servePacketListener(listener UDPPacketListener) error { return nil } + // 检查用户状态 + if firstServer.UserId > 0 && !SharedUserManager.CheckUserServersIsEnabled(firstServer.UserId) { + return nil + } + n, cm, clientAddr, err := listener.ReadFrom(buffer) if err != nil { if this.isClosed { diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 0505d42..91a158d 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -430,6 +430,22 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta } } } + case "userServersStateChanged": + if task.UserId > 0 { + resp, err := rpcClient.UserRPC.CheckUserServersState(nodeCtx, &pb.CheckUserServersStateRequest{UserId: task.UserId}) + if err != nil { + return err + } + + SharedUserManager.UpdateUserServersIsEnabled(task.UserId, resp.IsEnabled) + + if resp.IsEnabled { + err = this.syncUserServersConfig(task.UserId) + if err != nil { + return err + } + } + } default: remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled") } @@ -615,6 +631,36 @@ func (this *Node) syncServerConfig(serverId int64) error { return nil } +// 同步某个用户下的所有服务配置 +func (this *Node) syncUserServersConfig(userId int64) error { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + serverConfigsResp, err := rpcClient.ServerRPC.ComposeAllUserServersConfig(rpcClient.Context(), &pb.ComposeAllUserServersConfigRequest{ + UserId: userId, + }) + if err != nil { + return err + } + if len(serverConfigsResp.ServersConfigJSON) == 0 { + return nil + } + var serverConfigs = []*serverconfigs.ServerConfig{} + err = json.Unmarshal(serverConfigsResp.ServersConfigJSON, &serverConfigs) + if err != nil { + return err + } + this.locker.Lock() + defer this.locker.Unlock() + + for _, config := range serverConfigs { + this.updatingServerMap[config.Id] = config + } + + return nil +} + // 启动同步计时器 func (this *Node) startSyncTimer() { // TODO 这个时间间隔可以自行设置 diff --git a/internal/nodes/user_manager.go b/internal/nodes/user_manager.go new file mode 100644 index 0000000..ae5c6ca --- /dev/null +++ b/internal/nodes/user_manager.go @@ -0,0 +1,49 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "sync" +) + +var SharedUserManager = NewUserManager() + +type User struct { + ServersEnabled bool +} + +type UserManager struct { + userMap map[int64]*User // id => *User + + locker sync.RWMutex +} + +func NewUserManager() *UserManager { + return &UserManager{ + userMap: map[int64]*User{}, + } +} + +func (this *UserManager) UpdateUserServersIsEnabled(userId int64, isEnabled bool) { + this.locker.Lock() + u, ok := this.userMap[userId] + if ok { + u.ServersEnabled = isEnabled + } else { + u = &User{ServersEnabled: isEnabled} + this.userMap[userId] = u + } + this.locker.Unlock() +} + +func (this *UserManager) CheckUserServersIsEnabled(userId int64) (isEnabled bool) { + this.locker.RLock() + u, ok := this.userMap[userId] + if ok { + isEnabled = u.ServersEnabled + } else { + isEnabled = true + } + this.locker.RUnlock() + return +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 3cadeac..ed04eb9 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -49,6 +49,7 @@ type RPCClient struct { FirewallRPC pb.FirewallServiceClient SSLCertRPC pb.SSLCertServiceClient ScriptRPC pb.ScriptServiceClient + UserRPC pb.UserServiceClient } func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { @@ -81,6 +82,7 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { client.FirewallRPC = pb.NewFirewallServiceClient(client) client.SSLCertRPC = pb.NewSSLCertServiceClient(client) client.ScriptRPC = pb.NewScriptServiceClient(client) + client.UserRPC = pb.NewUserServiceClient(client) err := client.init() if err != nil {