实现基础的DDoS防护

This commit is contained in:
刘祥超
2022-05-18 21:03:51 +08:00
parent 45620dcdb7
commit 9bdd9a433c
31 changed files with 2605 additions and 58 deletions

View File

@@ -1,6 +1,7 @@
package nodes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
@@ -14,16 +15,20 @@ import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/maps"
"io"
"net"
"net/http"
"net/url"
"os/exec"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
@@ -119,6 +124,8 @@ func (this *APIStream) loop() error {
err = this.handleNewNodeTask(message)
case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务
err = this.handleCheckSystemdService(message)
case messageconfigs.MessageCodeCheckLocalFirewall: // 检查本地防火墙
err = this.handleCheckLocalFirewall(message)
case messageconfigs.MessageCodeChangeAPINode: // 修改API节点地址
err = this.handleChangeAPINode(message)
default:
@@ -569,7 +576,7 @@ func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage)
return nil
}
cmd := utils.NewCommandExecutor()
var cmd = utils.NewCommandExecutor()
shortName := teaconst.SystemdServiceName
cmd.Add(systemctl, "is-enabled", shortName)
output, err := cmd.Run()
@@ -585,6 +592,63 @@ func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage)
return nil
}
// 检查本地防火墙
func (this *APIStream) handleCheckLocalFirewall(message *pb.NodeStreamMessage) error {
var dataMessage = &messageconfigs.CheckLocalFirewallMessage{}
err := json.Unmarshal(message.DataJSON, dataMessage)
if err != nil {
this.replyFail(message.RequestId, "decode message data failed: "+err.Error())
return nil
}
// nft
if dataMessage.Name == "nftables" {
if runtime.GOOS != "linux" {
this.replyFail(message.RequestId, "not Linux system")
return nil
}
nft, err := exec.LookPath("nft")
if err != nil {
this.replyFail(message.RequestId, "'nft' not found: "+err.Error())
return nil
}
var cmd = exec.Command(nft, "--version")
var output = &bytes.Buffer{}
cmd.Stdout = output
err = cmd.Run()
if err != nil {
this.replyFail(message.RequestId, "get version failed: "+err.Error())
return nil
}
var outputString = output.String()
var versionMatches = regexp.MustCompile(`nftables v([\d.]+)`).FindStringSubmatch(outputString)
if len(versionMatches) <= 1 {
this.replyFail(message.RequestId, "can not get nft version")
return nil
}
var version = versionMatches[1]
var result = maps.Map{
"version": version,
}
var protectionConfig = sharedNodeConfig.DDOSProtection
err = firewalls.SharedDDoSProtectionManager.Apply(protectionConfig)
if err != nil {
this.replyFail(message.RequestId, dataMessage.Name+"was installed, but apply DDoS protection config failed: "+err.Error())
} else {
this.replyOk(message.RequestId, string(result.AsJSON()))
}
} else {
this.replyFail(message.RequestId, "invalid firewall name '"+dataMessage.Name+"'")
}
return nil
}
// 修改API地址
func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error {
config, err := configs.LoadAPIConfig()
@@ -660,6 +724,11 @@ func (this *APIStream) replyOk(requestId int64, message string) {
_ = this.stream.Send(&pb.NodeStreamMessage{RequestId: requestId, IsOk: true, Message: message})
}
// 回复成功并包含数据
func (this *APIStream) replyOkData(requestId int64, message string, dataJSON []byte) {
_ = this.stream.Send(&pb.NodeStreamMessage{RequestId: requestId, IsOk: true, Message: message, DataJSON: dataJSON})
}
// 获取缓存存取对象
func (this *APIStream) cacheStorage(message *pb.NodeStreamMessage, cachePolicyJSON []byte) (storage caches.StorageInterface, shouldStop bool, err error) {
cachePolicy := &serverconfigs.HTTPCachePolicy{}

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/waf"
@@ -21,8 +20,7 @@ import (
// ClientConn 客户端连接
type ClientConn struct {
once sync.Once
globalLimiter *ratelimit.Counter
once sync.Once
isTLS bool
hasDeadline bool
@@ -33,7 +31,7 @@ type ClientConn struct {
BaseClientConn
}
func NewClientConn(conn net.Conn, isTLS bool, quickClose bool, globalLimiter *ratelimit.Counter) net.Conn {
func NewClientConn(conn net.Conn, isTLS bool, quickClose bool) net.Conn {
if quickClose {
// TCP
tcpConn, ok := conn.(*net.TCPConn)
@@ -43,7 +41,7 @@ func NewClientConn(conn net.Conn, isTLS bool, quickClose bool, globalLimiter *ra
}
}
return &ClientConn{BaseClientConn: BaseClientConn{rawConn: conn}, isTLS: isTLS, globalLimiter: globalLimiter}
return &ClientConn{BaseClientConn: BaseClientConn{rawConn: conn}, isTLS: isTLS}
}
func (this *ClientConn) Read(b []byte) (n int, err error) {
@@ -96,13 +94,6 @@ func (this *ClientConn) Close() error {
err := this.rawConn.Close()
// 全局并发数限制
this.once.Do(func() {
if this.globalLimiter != nil {
this.globalLimiter.Release()
}
})
// 单个服务并发数限制
sharedClientConnLimiter.Remove(this.rawConn.RemoteAddr().String())

View File

@@ -3,16 +3,12 @@
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"net"
)
var sharedConnectionsLimiter = ratelimit.NewCounter(nodeconfigs.DefaultTCPMaxConnections)
// ClientListener 客户端网络监听
type ClientListener struct {
rawListener net.Listener
@@ -36,13 +32,8 @@ func (this *ClientListener) IsTLS() bool {
}
func (this *ClientListener) Accept() (net.Conn, error) {
// 限制并发连接数
var limiter = sharedConnectionsLimiter
limiter.Ack()
conn, err := this.rawListener.Accept()
if err != nil {
limiter.Release()
return nil, err
}
@@ -60,12 +51,11 @@ func (this *ClientListener) Accept() (net.Conn, error) {
}
_ = conn.Close()
limiter.Release()
return this.Accept()
}
}
return NewClientConn(conn, this.isTLS, this.quickClose, limiter), nil
return NewClientConn(conn, this.isTLS, this.quickClose), nil
}
func (this *ClientListener) Close() error {

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/configs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
@@ -15,7 +16,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/stats"
@@ -368,6 +368,38 @@ func (this *Node) loop() error {
}
sharedNodeConfig.ParentNodes = parentNodes
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
if err != nil {
return err
}
case "ddosProtectionChanged":
resp, err := rpcClient.NodeRPC().FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{})
if err != nil {
return err
}
if len(resp.DdosProtectionJSON) == 0 {
if sharedNodeConfig != nil {
sharedNodeConfig.DDOSProtection = nil
}
} else {
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
if err != nil {
return errors.New("decode DDoS protection config failed: " + err.Error())
}
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
if err != nil {
// 不阻塞
remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error())
}
}
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
@@ -730,7 +762,6 @@ func (this *Node) listenSock() error {
"ipConns": ipConns,
"serverConns": serverConns,
"total": sharedListenerManager.TotalActiveConnections(),
"limiter": sharedConnectionsLimiter.Len(),
},
})
case "dropIP":
@@ -854,17 +885,6 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
this.maxThreads = config.MaxThreads
}
// max tcp connections
if config.TCPMaxConnections <= 0 {
config.TCPMaxConnections = nodeconfigs.DefaultTCPMaxConnections
}
if config.TCPMaxConnections != sharedConnectionsLimiter.Count() {
remotelogs.Println("NODE", "[TCP]changed tcp max connections to '"+types.String(config.TCPMaxConnections)+"'")
sharedConnectionsLimiter.Close()
sharedConnectionsLimiter = ratelimit.NewCounter(config.TCPMaxConnections)
}
// timezone
var timeZone = config.TimeZone
if len(timeZone) == 0 {