diff --git a/go.mod b/go.mod index 973340d7..9327115c 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.4.2 - github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 github.com/pkg/sftp v1.12.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a google.golang.org/grpc v1.32.0 diff --git a/go.sum b/go.sum index 3b408569..b6751b0b 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= -github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -22,8 +21,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= -github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.0.0-beta.7/go.mod h1:FGJAWDWFht1sQ4qxyJHZZbVyvnVcKQN0E3u5/5lRz+g= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -53,13 +50,10 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/iwind/TeaGo v0.0.0-20200822074248-b1cf7248c98a/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= -github.com/iwind/TeaGo v0.0.0-20200910072805-729cffe36729 h1:/v0WhSFVeNay/dA5zU9iCBXlgVDfxnztuanlauXE0gM= -github.com/iwind/TeaGo v0.0.0-20200910072805-729cffe36729/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= -github.com/iwind/TeaGo v0.0.0-20200916035436-dbdbf25f8524 h1:WnARCxusBjX5vJ8E71AjhuxSeAMGfEiYvi42XVK/Yf8= -github.com/iwind/TeaGo v0.0.0-20200916035436-dbdbf25f8524/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e h1:/xn7wUvlwaoA5IkdBUctv2OQbJSZ0/Dw8qRJmn55sJk= github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 h1:7OZC/Qy7Z/hK9vG6YQOwHNOUPunSImYYJMiIfvuDQZ0= +github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= @@ -93,9 +87,7 @@ github.com/pkg/sftp v1.12.0 h1:/f3b24xrDhkhddlaobPe2JgBqfdt+gC/NYl0QY9IOuI= github.com/pkg/sftp v1.12.0/go.mod h1:fUqqXB5vEgVCZ131L+9say31RAri6aF6KDViawhxKK8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= diff --git a/internal/db/models/http_access_log_policy_dao.go b/internal/db/models/http_access_log_policy_dao.go index 849b792f..cdabbfef 100644 --- a/internal/db/models/http_access_log_policy_dao.go +++ b/internal/db/models/http_access_log_policy_dao.go @@ -29,6 +29,20 @@ func NewHTTPAccessLogPolicyDAO() *HTTPAccessLogPolicyDAO { var SharedHTTPAccessLogPolicyDAO = NewHTTPAccessLogPolicyDAO() +// 初始化 +func (this *HTTPAccessLogPolicyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(id int64) error { _, err := this.Query(). @@ -104,14 +118,13 @@ func (this *HTTPAccessLogPolicyDAO) ComposeAccessLogPolicyConfig(policyId int64) } // 条件 - if IsNotNull(policy.Conds) { - // TODO 需要用更全面的条件管理器来代替RequestCond - conds := []*shared.RequestCond{} - err = json.Unmarshal([]byte(policy.Conds), &conds) + if IsNotNull(policy.CondGroups) { + condGroups := []*shared.HTTPRequestCondGroup{} + err = json.Unmarshal([]byte(policy.CondGroups), &condGroups) if err != nil { return nil, err } - config.Conds = conds + config.CondGroups = condGroups } return config, nil diff --git a/internal/db/models/http_access_log_policy_model.go b/internal/db/models/http_access_log_policy_model.go index b4fef975..a2690280 100644 --- a/internal/db/models/http_access_log_policy_model.go +++ b/internal/db/models/http_access_log_policy_model.go @@ -12,7 +12,7 @@ type HTTPAccessLogPolicy struct { IsOn uint8 `field:"isOn"` // 是否启用 Type string `field:"type"` // 存储类型 Options string `field:"options"` // 存储选项 - Conds string `field:"conds"` // 请求条件 + CondGroups string `field:"condGroups"` // 请求条件 } type HTTPAccessLogPolicyOperator struct { @@ -26,7 +26,7 @@ type HTTPAccessLogPolicyOperator struct { IsOn interface{} // 是否启用 Type interface{} // 存储类型 Options interface{} // 存储选项 - Conds interface{} // 请求条件 + CondGroups interface{} // 请求条件 } func NewHTTPAccessLogPolicyOperator() *HTTPAccessLogPolicyOperator { diff --git a/internal/db/models/http_cache_policy_dao.go b/internal/db/models/http_cache_policy_dao.go index a253336a..f65e6516 100644 --- a/internal/db/models/http_cache_policy_dao.go +++ b/internal/db/models/http_cache_policy_dao.go @@ -26,6 +26,20 @@ func NewHTTPCachePolicyDAO() *HTTPCachePolicyDAO { var SharedHTTPCachePolicyDAO = NewHTTPCachePolicyDAO() +// 初始化 +func (this *HTTPCachePolicyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPCachePolicyDAO) EnableHTTPCachePolicy(id int64) error { _, err := this.Query(). diff --git a/internal/db/models/http_cache_policy_model.go b/internal/db/models/http_cache_policy_model.go index 0c1817b0..c5da28c9 100644 --- a/internal/db/models/http_cache_policy_model.go +++ b/internal/db/models/http_cache_policy_model.go @@ -16,7 +16,7 @@ type HTTPCachePolicy struct { SkipCacheControlValues string `field:"skipCacheControlValues"` // 忽略的cache-control SkipSetCookie uint8 `field:"skipSetCookie"` // 是否忽略Set-Cookie Header EnableRequestCachePragma uint8 `field:"enableRequestCachePragma"` // 是否支持客户端的Pragma: no-cache - Conds string `field:"conds"` // 请求条件 + CondGroups string `field:"condGroups"` // 请求条件 CreatedAt uint64 `field:"createdAt"` // 创建时间 State uint8 `field:"state"` // 状态 } @@ -36,7 +36,7 @@ type HTTPCachePolicyOperator struct { SkipCacheControlValues interface{} // 忽略的cache-control SkipSetCookie interface{} // 是否忽略Set-Cookie Header EnableRequestCachePragma interface{} // 是否支持客户端的Pragma: no-cache - Conds interface{} // 请求条件 + CondGroups interface{} // 请求条件 CreatedAt interface{} // 创建时间 State interface{} // 状态 } diff --git a/internal/db/models/http_firewall_policy_dao.go b/internal/db/models/http_firewall_policy_dao.go index 812e6e9d..448e523b 100644 --- a/internal/db/models/http_firewall_policy_dao.go +++ b/internal/db/models/http_firewall_policy_dao.go @@ -26,6 +26,20 @@ func NewHTTPFirewallPolicyDAO() *HTTPFirewallPolicyDAO { var SharedHTTPFirewallPolicyDAO = NewHTTPFirewallPolicyDAO() +// 初始化 +func (this *HTTPFirewallPolicyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPFirewallPolicyDAO) EnableHTTPFirewallPolicy(id int64) error { _, err := this.Query(). diff --git a/internal/db/models/http_firewall_policy_model.go b/internal/db/models/http_firewall_policy_model.go index 69de8b56..13702ce6 100644 --- a/internal/db/models/http_firewall_policy_model.go +++ b/internal/db/models/http_firewall_policy_model.go @@ -12,7 +12,7 @@ type HTTPFirewallPolicy struct { Name string `field:"name"` // 名称 Inbound string `field:"inbound"` // 入站规则 Outbound string `field:"outbound"` // 出站规则 - Conds string `field:"conds"` // 条件 + CondGroups string `field:"condGroups"` // 条件 } type HTTPFirewallPolicyOperator struct { @@ -26,7 +26,7 @@ type HTTPFirewallPolicyOperator struct { Name interface{} // 名称 Inbound interface{} // 入站规则 Outbound interface{} // 出站规则 - Conds interface{} // 条件 + CondGroups interface{} // 条件 } func NewHTTPFirewallPolicyOperator() *HTTPFirewallPolicyOperator { diff --git a/internal/db/models/http_firewall_rule_dao.go b/internal/db/models/http_firewall_rule_dao.go index 176a0db3..884c4f7d 100644 --- a/internal/db/models/http_firewall_rule_dao.go +++ b/internal/db/models/http_firewall_rule_dao.go @@ -26,6 +26,20 @@ func NewHTTPFirewallRuleDAO() *HTTPFirewallRuleDAO { var SharedHTTPFirewallRuleDAO = NewHTTPFirewallRuleDAO() +// 初始化 +func (this *HTTPFirewallRuleDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPFirewallRuleDAO) EnableHTTPFirewallRule(id uint32) error { _, err := this.Query(). diff --git a/internal/db/models/http_firewall_rule_group_dao.go b/internal/db/models/http_firewall_rule_group_dao.go index 1effb379..b2d0348a 100644 --- a/internal/db/models/http_firewall_rule_group_dao.go +++ b/internal/db/models/http_firewall_rule_group_dao.go @@ -26,6 +26,20 @@ func NewHTTPFirewallRuleGroupDAO() *HTTPFirewallRuleGroupDAO { var SharedHTTPFirewallRuleGroupDAO = NewHTTPFirewallRuleGroupDAO() +// 初始化 +func (this *HTTPFirewallRuleGroupDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPFirewallRuleGroupDAO) EnableHTTPFirewallRuleGroup(id uint32) error { _, err := this.Query(). diff --git a/internal/db/models/http_firewall_rule_set_dao.go b/internal/db/models/http_firewall_rule_set_dao.go index 2856d080..e4e210ee 100644 --- a/internal/db/models/http_firewall_rule_set_dao.go +++ b/internal/db/models/http_firewall_rule_set_dao.go @@ -26,6 +26,20 @@ func NewHTTPFirewallRuleSetDAO() *HTTPFirewallRuleSetDAO { var SharedHTTPFirewallRuleSetDAO = NewHTTPFirewallRuleSetDAO() +// 初始化 +func (this *HTTPFirewallRuleSetDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPFirewallRuleSetDAO) EnableHTTPFirewallRuleSet(id uint32) error { _, err := this.Query(). diff --git a/internal/db/models/http_gzip_dao.go b/internal/db/models/http_gzip_dao.go index 8ee5772b..7135efbd 100644 --- a/internal/db/models/http_gzip_dao.go +++ b/internal/db/models/http_gzip_dao.go @@ -31,6 +31,20 @@ func NewHTTPGzipDAO() *HTTPGzipDAO { var SharedHTTPGzipDAO = NewHTTPGzipDAO() +// 初始化 +func (this *HTTPGzipDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPGzipDAO) EnableHTTPGzip(id int64) error { _, err := this.Query(). diff --git a/internal/db/models/http_gzip_model.go b/internal/db/models/http_gzip_model.go index 00f5bc1a..7a9441c7 100644 --- a/internal/db/models/http_gzip_model.go +++ b/internal/db/models/http_gzip_model.go @@ -2,29 +2,29 @@ package models // Gzip配置 type HTTPGzip struct { - Id uint32 `field:"id"` // ID - AdminId uint32 `field:"adminId"` // 管理员ID - UserId uint32 `field:"userId"` // 用户ID - IsOn uint8 `field:"isOn"` // 是否启用 - Level uint32 `field:"level"` // 压缩级别 - MinLength string `field:"minLength"` // 可压缩最小值 - MaxLength string `field:"maxLength"` // 可压缩最大值 - State uint8 `field:"state"` // 状态 - CreatedAt uint64 `field:"createdAt"` // 创建时间 - Conds string `field:"conds"` // 条件 + Id uint32 `field:"id"` // ID + AdminId uint32 `field:"adminId"` // 管理员ID + UserId uint32 `field:"userId"` // 用户ID + IsOn uint8 `field:"isOn"` // 是否启用 + Level uint32 `field:"level"` // 压缩级别 + MinLength string `field:"minLength"` // 可压缩最小值 + MaxLength string `field:"maxLength"` // 可压缩最大值 + State uint8 `field:"state"` // 状态 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + CondGroups string `field:"condGroups"` // 条件 } type HTTPGzipOperator struct { - Id interface{} // ID - AdminId interface{} // 管理员ID - UserId interface{} // 用户ID - IsOn interface{} // 是否启用 - Level interface{} // 压缩级别 - MinLength interface{} // 可压缩最小值 - MaxLength interface{} // 可压缩最大值 - State interface{} // 状态 - CreatedAt interface{} // 创建时间 - Conds interface{} // 条件 + Id interface{} // ID + AdminId interface{} // 管理员ID + UserId interface{} // 用户ID + IsOn interface{} // 是否启用 + Level interface{} // 压缩级别 + MinLength interface{} // 可压缩最小值 + MaxLength interface{} // 可压缩最大值 + State interface{} // 状态 + CreatedAt interface{} // 创建时间 + CondGroups interface{} // 条件 } func NewHTTPGzipOperator() *HTTPGzipOperator { diff --git a/internal/db/models/http_header_dao.go b/internal/db/models/http_header_dao.go index fca24023..3df781a7 100644 --- a/internal/db/models/http_header_dao.go +++ b/internal/db/models/http_header_dao.go @@ -30,6 +30,20 @@ func NewHTTPHeaderDAO() *HTTPHeaderDAO { var SharedHTTPHeaderDAO = NewHTTPHeaderDAO() +// 初始化 +func (this *HTTPHeaderDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPHeaderDAO) EnableHTTPHeader(id int64) error { _, err := this.Query(). @@ -75,7 +89,17 @@ func (this *HTTPHeaderDAO) CreateHeader(name string, value string) (int64, error op.IsOn = true op.Name = name op.Value = value - _, err := this.Save(op) + + statusConfig := &shared.HTTPStatusConfig{ + Always: true, + } + statusJSON, err := json.Marshal(statusConfig) + if err != nil { + return 0, err + } + op.Status = statusJSON + + _, err = this.Save(op) if err != nil { return 0, err } diff --git a/internal/db/models/http_header_policy_dao.go b/internal/db/models/http_header_policy_dao.go index 96feb92c..619a471f 100644 --- a/internal/db/models/http_header_policy_dao.go +++ b/internal/db/models/http_header_policy_dao.go @@ -30,6 +30,20 @@ func NewHTTPHeaderPolicyDAO() *HTTPHeaderPolicyDAO { var SharedHTTPHeaderPolicyDAO = NewHTTPHeaderPolicyDAO() +// 初始化 +func (this *HTTPHeaderPolicyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPHeaderPolicyDAO) EnableHTTPHeaderPolicy(id int64) error { _, err := this.Query(). @@ -83,8 +97,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateAddingHeaders(policyId int64, headersJSON op.AddHeaders = headersJSON _, err := this.Save(op) - // TODO 更新相关配置 - return err } @@ -99,8 +111,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateSettingHeaders(policyId int64, headersJSO op.SetHeaders = headersJSON _, err := this.Save(op) - // TODO 更新相关配置 - return err } @@ -115,8 +125,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateReplacingHeaders(policyId int64, headersJ op.ReplaceHeaders = headersJSON _, err := this.Save(op) - // TODO 更新相关配置 - return err } @@ -131,8 +139,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateAddingTrailers(policyId int64, headersJSO op.AddTrailers = headersJSON _, err := this.Save(op) - // TODO 更新相关配置 - return err } @@ -152,8 +158,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateDeletingHeaders(policyId int64, headerNam op.DeleteHeaders = string(namesJSON) _, err = this.Save(op) - // TODO 更新相关配置 - return err } @@ -173,78 +177,91 @@ func (this *HTTPHeaderPolicyDAO) ComposeHeaderPolicyConfig(headerPolicyId int64) // AddHeaders if len(policy.AddHeaders) > 0 { - headers := []*shared.HTTPHeaderConfig{} - err = json.Unmarshal([]byte(policy.AddHeaders), &headers) + refs := []*shared.HTTPHeaderRef{} + err = json.Unmarshal([]byte(policy.AddHeaders), &refs) if err != nil { return nil, err } - if len(headers) > 0 { - for k, header := range headers { - headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id) + if len(refs) > 0 { + for _, ref := range refs { + headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId) if err != nil { return nil, err } - headers[k] = headerConfig + config.AddHeaders = append(config.AddHeaders, headerConfig) } - - config.AddHeaders = headers } } // AddTrailers if len(policy.AddTrailers) > 0 { - headers := []*shared.HTTPHeaderConfig{} - err = json.Unmarshal([]byte(policy.AddTrailers), &headers) + refs := []*shared.HTTPHeaderRef{} + err = json.Unmarshal([]byte(policy.AddTrailers), &refs) if err != nil { return nil, err } - if len(headers) > 0 { - for k, header := range headers { - headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id) + if len(refs) > 0 { + resultRefs := []*shared.HTTPHeaderRef{} + for _, ref := range refs { + headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId) if err != nil { return nil, err } - headers[k] = headerConfig + if headerConfig == nil { + continue + } + resultRefs = append(resultRefs, ref) + config.AddTrailers = append(config.AddTrailers, headerConfig) } - config.AddTrailers = headers + config.AddHeaderRefs = resultRefs } } // SetHeaders if len(policy.SetHeaders) > 0 { - headers := []*shared.HTTPHeaderConfig{} - err = json.Unmarshal([]byte(policy.SetHeaders), &headers) + refs := []*shared.HTTPHeaderRef{} + err = json.Unmarshal([]byte(policy.SetHeaders), &refs) if err != nil { return nil, err } - if len(headers) > 0 { - for k, header := range headers { - headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id) + if len(refs) > 0 { + resultRefs := []*shared.HTTPHeaderRef{} + for _, ref := range refs { + headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId) if err != nil { return nil, err } - headers[k] = headerConfig + if headerConfig == nil { + continue + } + resultRefs = append(resultRefs, ref) + config.SetHeaders = append(config.SetHeaders, headerConfig) } - config.SetHeaders = headers + config.SetHeaderRefs = resultRefs } } // ReplaceHeaders if len(policy.ReplaceHeaders) > 0 { - headers := []*shared.HTTPHeaderConfig{} - err = json.Unmarshal([]byte(policy.ReplaceHeaders), &headers) + refs := []*shared.HTTPHeaderRef{} + err = json.Unmarshal([]byte(policy.ReplaceHeaders), &refs) if err != nil { return nil, err } - if len(headers) > 0 { - for k, header := range headers { - headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id) + if len(refs) > 0 { + resultRefs := []*shared.HTTPHeaderRef{} + for _, ref := range refs { + headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId) if err != nil { return nil, err } - headers[k] = headerConfig + if headerConfig == nil { + continue + } + resultRefs = append(resultRefs, ref) + config.ReplaceHeaders = append(config.ReplaceHeaders, headerConfig) } - config.ReplaceHeaders = headers + config.ReplaceHeaderRefs = resultRefs } } @@ -255,7 +272,7 @@ func (this *HTTPHeaderPolicyDAO) ComposeHeaderPolicyConfig(headerPolicyId int64) if err != nil { return nil, err } - config.DeletedHeaders = headers + config.DeleteHeaders = headers } // Expires diff --git a/internal/db/models/http_location_dao.go b/internal/db/models/http_location_dao.go index 6c50f28b..1f81a116 100644 --- a/internal/db/models/http_location_dao.go +++ b/internal/db/models/http_location_dao.go @@ -30,6 +30,20 @@ func NewHTTPLocationDAO() *HTTPLocationDAO { var SharedHTTPLocationDAO = NewHTTPLocationDAO() +// 初始化 +func (this *HTTPLocationDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPLocationDAO) EnableHTTPLocation(id int64) error { _, err := this.Query(). diff --git a/internal/db/models/http_location_model.go b/internal/db/models/http_location_model.go index e569d673..95b971fb 100644 --- a/internal/db/models/http_location_model.go +++ b/internal/db/models/http_location_model.go @@ -17,6 +17,7 @@ type HTTPLocation struct { ReverseProxy string `field:"reverseProxy"` // 反向代理 UrlPrefix string `field:"urlPrefix"` // URL前缀 IsBreak uint8 `field:"isBreak"` // 是否终止匹配 + CondGroups string `field:"condGroups"` // 匹配条件 } type HTTPLocationOperator struct { @@ -35,6 +36,7 @@ type HTTPLocationOperator struct { ReverseProxy interface{} // 反向代理 UrlPrefix interface{} // URL前缀 IsBreak interface{} // 是否终止匹配 + CondGroups interface{} // 匹配条件 } func NewHTTPLocationOperator() *HTTPLocationOperator { diff --git a/internal/db/models/http_page_dao.go b/internal/db/models/http_page_dao.go index e86bb427..8f073126 100644 --- a/internal/db/models/http_page_dao.go +++ b/internal/db/models/http_page_dao.go @@ -30,6 +30,20 @@ func NewHTTPPageDAO() *HTTPPageDAO { var SharedHTTPPageDAO = NewHTTPPageDAO() +// 初始化 +func (this *HTTPPageDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPPageDAO) EnableHTTPPage(id int64) error { _, err := this.Query(). @@ -107,7 +121,6 @@ func (this *HTTPPageDAO) UpdatePage(pageId int64, statusList []string, url strin op.NewStatus = newStatus _, err = this.Save(op) - // TODO 修改相关引用的对象 return err } diff --git a/internal/db/models/http_web_dao.go b/internal/db/models/http_web_dao.go index 122b919b..a5a7cefa 100644 --- a/internal/db/models/http_web_dao.go +++ b/internal/db/models/http_web_dao.go @@ -31,6 +31,19 @@ func NewHTTPWebDAO() *HTTPWebDAO { var SharedHTTPWebDAO = NewHTTPWebDAO() +func (this *HTTPWebDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *HTTPWebDAO) EnableHTTPWeb(id int64) error { _, err := this.Query(). @@ -238,8 +251,6 @@ func (this *HTTPWebDAO) ComposeWebConfig(webId int64) (*serverconfigs.HTTPWebCon config.RedirectToHttps = redirectToHTTPSConfig } - // TODO 更多配置 - return config, nil } @@ -264,11 +275,7 @@ func (this *HTTPWebDAO) UpdateWeb(webId int64, root string) error { op.Id = webId op.Root = root _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 修改Gzip配置 @@ -280,11 +287,7 @@ func (this *HTTPWebDAO) UpdateWebGzip(webId int64, gzipJSON []byte) error { op.Id = webId op.Gzip = gzipJSON _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 修改字符编码 @@ -296,11 +299,7 @@ func (this *HTTPWebDAO) UpdateWebCharset(webId int64, charsetJSON []byte) error op.Id = webId op.Charset = charsetJSON _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改请求Header策略 @@ -312,11 +311,7 @@ func (this *HTTPWebDAO) UpdateWebRequestHeaderPolicy(webId int64, headerPolicyJS op.Id = webId op.RequestHeader = JSONBytes(headerPolicyJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改响应Header策略 @@ -328,11 +323,7 @@ func (this *HTTPWebDAO) UpdateWebResponseHeaderPolicy(webId int64, headerPolicyJ op.Id = webId op.ResponseHeader = JSONBytes(headerPolicyJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改特殊页面配置 @@ -344,11 +335,7 @@ func (this *HTTPWebDAO) UpdateWebPages(webId int64, pagesJSON []byte) error { op.Id = webId op.Pages = JSONBytes(pagesJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改Shutdown配置 @@ -360,11 +347,7 @@ func (this *HTTPWebDAO) UpdateWebShutdown(webId int64, shutdownJSON []byte) erro op.Id = webId op.Shutdown = JSONBytes(shutdownJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改访问日志策略 @@ -376,11 +359,7 @@ func (this *HTTPWebDAO) UpdateWebAccessLogConfig(webId int64, accessLogJSON []by op.Id = webId op.AccessLog = JSONBytes(accessLogJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改统计配置 @@ -392,11 +371,7 @@ func (this *HTTPWebDAO) UpdateWebStat(webId int64, statJSON []byte) error { op.Id = webId op.Stat = JSONBytes(statJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改缓存配置 @@ -408,11 +383,7 @@ func (this *HTTPWebDAO) UpdateWebCache(webId int64, cacheJSON []byte) error { op.Id = webId op.Cache = JSONBytes(cacheJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改防火墙配置 @@ -424,11 +395,7 @@ func (this *HTTPWebDAO) UpdateWebFirewall(webId int64, firewallJSON []byte) erro op.Id = webId op.Firewall = JSONBytes(firewallJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改路径规则配置 @@ -440,11 +407,7 @@ func (this *HTTPWebDAO) UpdateWebLocations(webId int64, locationsJSON []byte) er op.Id = webId op.Locations = JSONBytes(locationsJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) + return err } // 更改跳转到HTTPS设置 @@ -456,21 +419,5 @@ func (this *HTTPWebDAO) UpdateWebRedirectToHTTPS(webId int64, redirectToHTTPSJSO op.Id = webId op.RedirectToHttps = JSONBytes(redirectToHTTPSJSON) _, err := this.Save(op) - if err != nil { - return err - } - - return this.NotifyUpdating(webId) -} - -// 通知更新 -func (this *HTTPWebDAO) NotifyUpdating(webId int64) error { - err := SharedServerDAO.UpdateServerIsUpdatingWithWebId(webId) - if err != nil { - return err - } - - // TODO 更新所有使用此Web配置的Location所在服务 - - return nil + return err } diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 697ebaf4..d0cd1402 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -3,20 +3,20 @@ package models import ( "encoding/json" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + "strconv" ) const ( NodeStateEnabled = 1 // 已启用 NodeStateDisabled = 0 // 已禁用 - - NodeInstallStateAll = 0 // 全部 - NodeInstallStateInstalled = 1 // 已安装 - NodeInstallStateNotInstalled = 2 // 未安装 ) type NodeDAO dbs.DAO @@ -182,7 +182,7 @@ func (this *NodeDAO) CountAllEnabledNodes() (int64, error) { } // 列出单页节点 -func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId int64, installState int8) (result []*Node, err error) { +func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId int64, installState configutils.BoolState, activeState configutils.BoolState) (result []*Node, err error) { query := this.Query(). State(NodeStateEnabled). Offset(offset). @@ -197,14 +197,24 @@ func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId i // 安装状态 switch installState { - case NodeInstallStateAll: - // 不做任何事情 - case NodeInstallStateInstalled: + case configutils.BoolStateAll: + // 所有 + case configutils.BoolStateYes: query.Attr("isInstalled", 1) - case NodeInstallStateNotInstalled: + case configutils.BoolStateNo: query.Attr("isInstalled", 0) } + // 在线状态 + switch activeState { + case configutils.BoolStateAll: + // 所有 + case configutils.BoolStateYes: + query.Where("JSON_EXTRACT(status, '$.isActive') AND JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()<=60") + case configutils.BoolStateNo: + query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()>60)") + } + _, err = query.FindAll() return } @@ -266,7 +276,7 @@ func (this *NodeDAO) FindAllNodeIdsMatch(clusterId int64) (result []int64, err e } // 计算节点数量 -func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState int8) (int64, error) { +func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState configutils.BoolState, activeState configutils.BoolState) (int64, error) { query := this.Query() query.State(NodeStateEnabled) @@ -277,14 +287,24 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState int // 安装状态 switch installState { - case NodeInstallStateAll: - // 不做任何事情 - case NodeInstallStateInstalled: + case configutils.BoolStateAll: + // 所有 + case configutils.BoolStateYes: query.Attr("isInstalled", 1) - case NodeInstallStateNotInstalled: + case configutils.BoolStateNo: query.Attr("isInstalled", 0) } + // 在线状态 + switch activeState { + case configutils.BoolStateAll: + // 所有 + case configutils.BoolStateYes: + query.Where("JSON_EXTRACT(status, '$.isActive') AND JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()<=60") + case configutils.BoolStateNo: + query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()>60)") + } + return query.Count() } @@ -347,6 +367,61 @@ func (this *NodeDAO) UpdateNodeInstallStatus(nodeId int64, status *NodeInstallSt return err } +// 组合配置 +func (this *NodeDAO) ComposeNodeConfig(nodeId int64) (*nodeconfigs.NodeConfig, error) { + node, err := this.FindEnabledNode(nodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, errors.New("node not found '" + strconv.FormatInt(nodeId, 10) + "'") + } + + config := &nodeconfigs.NodeConfig{ + Id: node.UniqueId, + IsOn: node.IsOn == 1, + Servers: nil, + Version: int64(node.Version), + Name: node.Name, + } + + // 获取所有的服务 + servers, err := SharedServerDAO.FindAllEnabledServersWithNode(int64(node.Id)) + if err != nil { + return nil, err + } + + for _, server := range servers { + if len(server.Config) == 0 { + continue + } + + serverConfig := &serverconfigs.ServerConfig{} + err = json.Unmarshal([]byte(server.Config), serverConfig) + if err != nil { + return nil, err + } + config.Servers = append(config.Servers, serverConfig) + } + + // 全局设置 + // TODO 根据用户的不同读取不同的全局设置 + settingJSON, err := SharedSysSettingDAO.ReadSetting(SettingCodeGlobalConfig) + if err != nil { + return nil, err + } + if len(settingJSON) > 0 { + globalConfig := &serverconfigs.GlobalConfig{} + err = json.Unmarshal(settingJSON, globalConfig) + if err != nil { + return nil, err + } + config.GlobalConfig = globalConfig + } + + return config, nil +} + // 生成唯一ID func (this *NodeDAO) genUniqueId() (string, error) { for { diff --git a/internal/db/models/origin_dao.go b/internal/db/models/origin_dao.go index 849474b1..7f02e860 100644 --- a/internal/db/models/origin_dao.go +++ b/internal/db/models/origin_dao.go @@ -32,6 +32,20 @@ func NewOriginDAO() *OriginDAO { var SharedOriginDAO = NewOriginDAO() +// 初始化 +func (this *OriginDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *OriginDAO) EnableOrigin(id int64) error { _, err := this.Query(). @@ -167,7 +181,7 @@ func (this *OriginDAO) ComposeOriginConfig(originId int64) (*serverconfigs.Origi return nil, err } if policyConfig != nil { - config.RequestHeaders = policyConfig + config.RequestHeaderPolicy = policyConfig } } @@ -177,7 +191,7 @@ func (this *OriginDAO) ComposeOriginConfig(originId int64) (*serverconfigs.Origi return nil, err } if policyConfig != nil { - config.ResponseHeaders = policyConfig + config.ResponseHeaderPolicy = policyConfig } } diff --git a/internal/db/models/origin_dao_test.go b/internal/db/models/origin_dao_test.go index 2c8ab4a6..9e75391f 100644 --- a/internal/db/models/origin_dao_test.go +++ b/internal/db/models/origin_dao_test.go @@ -6,7 +6,7 @@ import ( ) func TestOriginServerDAO_ComposeOriginConfig(t *testing.T) { - config, err := SharedOriginServerDAO.ComposeOriginConfig(1) + config, err := SharedOriginDAO.ComposeOriginConfig(1) if err != nil { t.Fatal(err) } diff --git a/internal/db/models/reverse_proxy_dao.go b/internal/db/models/reverse_proxy_dao.go index 39d8d955..b9d9f5ec 100644 --- a/internal/db/models/reverse_proxy_dao.go +++ b/internal/db/models/reverse_proxy_dao.go @@ -30,13 +30,30 @@ func NewReverseProxyDAO() *ReverseProxyDAO { var SharedReverseProxyDAO = NewReverseProxyDAO() +// 初始化 +func (this *ReverseProxyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} + // 启用条目 func (this *ReverseProxyDAO) EnableReverseProxy(id int64) error { _, err := this.Query(). Pk(id). Set("state", ReverseProxyStateEnabled). Update() - return err + if err != nil { + return err + } + return this.CreateEvent() } // 禁用条目 @@ -45,7 +62,10 @@ func (this *ReverseProxyDAO) DisableReverseProxy(id int64) error { Pk(id). Set("state", ReverseProxyStateDisabled). Update() - return err + if err != nil { + return err + } + return this.CreateEvent() } // 查找启用中的条目 @@ -106,7 +126,7 @@ func (this *ReverseProxyDAO) ComposeReverseProxyConfig(reverseProxyId int64) (*s return nil, err } for _, originConfig := range originRefs { - originConfig, err := SharedOriginDAO.ComposeOriginConfig(int64(originConfig.OriginId)) + originConfig, err := SharedOriginDAO.ComposeOriginConfig(originConfig.OriginId) if err != nil { return nil, err } @@ -155,8 +175,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyScheduling(reverseProxyId int64, } _, err := this.Save(op) - // TODO 更新所有使用此反向代理的服务 - return err } @@ -174,8 +192,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyPrimaryOrigins(reverseProxyId int } _, err := this.Save(op) - // TODO 更新所有使用此反向代理的服务 - return err } @@ -193,8 +209,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyBackupOrigins(reverseProxyId int6 } _, err := this.Save(op) - // TODO 更新所有使用此反向代理的服务 - return err } @@ -206,3 +220,8 @@ func (this *ReverseProxyDAO) UpdateReverseProxyIsOn(reverseProxyId int64, isOn b Update() return err } + +// 通知更新 +func (this *ReverseProxyDAO) CreateEvent() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) +} diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index db514fd5..8b3f9725 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -1,13 +1,14 @@ package models import ( + "crypto/md5" "encoding/json" "errors" + "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" - "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" ) @@ -31,6 +32,13 @@ func NewServerDAO() *ServerDAO { var SharedServerDAO = NewServerDAO() +// 初始化 +func (this *ServerDAO) Init() { + this.DAOObject.Init() + + // 这里不处理增删改事件,是为了避免Server修改本身的时候,也要触发别的Server变更 +} + // 启用条目 func (this *ServerDAO) EnableServer(id uint32) (rowsAffected int64, err error) { return this.Query(). @@ -70,13 +78,7 @@ func (this *ServerDAO) FindEnabledServerType(serverId int64) (string, error) { // 创建服务 func (this *ServerDAO) CreateServer(adminId int64, userId int64, serverType serverconfigs.ServerType, name string, description string, serverNamesJSON string, httpJSON string, httpsJSON string, tcpJSON string, tlsJSON string, unixJSON string, udpJSON string, webId int64, reverseProxyJSON []byte, clusterId int64, includeNodesJSON string, excludeNodesJSON string) (serverId int64, err error) { - uniqueId, err := this.genUniqueId() - if err != nil { - return 0, err - } - op := NewServerOperator() - op.UniqueId = uniqueId op.UserId = userId op.AdminId = adminId op.Name = name @@ -127,8 +129,7 @@ func (this *ServerDAO) CreateServer(adminId int64, userId int64, serverType serv } serverId = types.Int64(op.Id) - err = this.RenewServerConfig(serverId) - return serverId, err + return serverId, nil } // 修改服务基本信息 @@ -141,25 +142,47 @@ func (this *ServerDAO) UpdateServerBasic(serverId int64, name string, descriptio op.Name = name op.Description = description op.ClusterId = clusterId - op.Version = dbs.SQL("version=version+1") _, err := this.Save(op) - return err + if err != nil { + return err + } + + return this.createEvent() } // 修改服务配置 -func (this *ServerDAO) UpdateServerConfig(serverId int64, config []byte) error { +func (this *ServerDAO) UpdateServerConfig(serverId int64, configJSON []byte) (isChanged bool, err error) { if serverId <= 0 { - return errors.New("serverId should not be smaller than 0") + return false, errors.New("serverId should not be smaller than 0") } - if len(config) == 0 { - config = []byte("null") - } - _, err := this.Query(). + + // 查询以前的md5 + oldConfigMd5, err := this.Query(). Pk(serverId). - Set("config", string(config)). - Set("version", dbs.SQL("version+1")). - Update() - return err + Result("configMd5"). + FindStringCol("") + if err != nil { + return false, err + } + + m := md5.New() + _, _ = m.Write(configJSON) + h := m.Sum(nil) + newConfigMd5 := fmt.Sprintf("%x", h) + + // 如果配置相同则不更改 + if oldConfigMd5 == newConfigMd5 { + return false, nil + } + + op := NewServerOperator() + op.Id = serverId + op.Config = JSONBytes(configJSON) + op.Version = dbs.SQL("version+1") + + op.ConfigMd5 = newConfigMd5 + _, err = this.Save(op) + return true, err } // 修改HTTP配置 @@ -177,7 +200,8 @@ func (this *ServerDAO) UpdateServerHTTP(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改HTTPS配置 @@ -195,7 +219,8 @@ func (this *ServerDAO) UpdateServerHTTPS(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改TCP配置 @@ -213,7 +238,8 @@ func (this *ServerDAO) UpdateServerTCP(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改TLS配置 @@ -231,7 +257,8 @@ func (this *ServerDAO) UpdateServerTLS(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改Unix配置 @@ -249,7 +276,8 @@ func (this *ServerDAO) UpdateServerUnix(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改UDP配置 @@ -267,7 +295,8 @@ func (this *ServerDAO) UpdateServerUDP(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改Web配置 @@ -282,7 +311,7 @@ func (this *ServerDAO) UpdateServerWeb(serverId int64, webId int64) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + return this.createEvent() } // 初始化Web配置 @@ -304,6 +333,11 @@ func (this *ServerDAO) InitServerWeb(serverId int64) (int64, error) { return 0, err } + err = this.createEvent() + if err != nil { + return webId, err + } + return webId, nil } @@ -322,7 +356,8 @@ func (this *ServerDAO) UpdateServerNames(serverId int64, config []byte) error { if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 修改反向代理配置 @@ -337,7 +372,8 @@ func (this *ServerDAO) UpdateServerReverseProxy(serverId int64, config []byte) e if err != nil { return err } - return this.RenewServerConfig(serverId) + + return this.createEvent() } // 计算所有可用服务数量 @@ -379,6 +415,36 @@ func (this *ServerDAO) FindAllEnabledServersWithNode(nodeId int64) (result []*Se return } +// 获取所有的服务ID +func (this *ServerDAO) FindAllEnabledServerIds() (serverIds []int64, err error) { + ones, err := this.Query(). + State(ServerStateEnabled). + AscPk(). + ResultPk(). + FindAll() + for _, one := range ones { + serverIds = append(serverIds, int64(one.(*Server).Id)) + } + return +} + +// 查找服务的搜索条件 +func (this *ServerDAO) FindServerNodeFilters(serverId int64) (isOk bool, clusterId int64, err error) { + one, err := this.Query(). + Pk(serverId). + Result("clusterId"). + Find() + if err != nil { + return false, 0, err + } + if one == nil { + isOk = false + return + } + server := one.(*Server) + return true, int64(server.ClusterId), nil +} + // 构造服务的Config func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.ServerConfig, error) { server, err := this.FindEnabledServer(serverId) @@ -396,12 +462,6 @@ func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.Serve config.Name = server.Name config.Description = server.Description - // Components - // TODO - - // Filters - // TODO - // ServerNames if len(server.ServerNames) > 0 && server.ServerNames != "null" { serverNames := []*serverconfigs.ServerNameConfig{} @@ -505,14 +565,14 @@ func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.Serve } // 更新服务的Config配置 -func (this *ServerDAO) RenewServerConfig(serverId int64) error { +func (this *ServerDAO) RenewServerConfig(serverId int64) (isChanged bool, err error) { serverConfig, err := this.ComposeServerConfig(serverId) if err != nil { - return err + return false, err } - data, err := serverConfig.AsJSON() + data, err := json.Marshal(serverConfig) if err != nil { - return err + return false, err } return this.UpdateServerConfig(serverId, data) } @@ -534,32 +594,6 @@ func (this *ServerDAO) FindReverseProxyRef(serverId int64) (*serverconfigs.Rever return config, err } -// 查找需要更新的Server -func (this *ServerDAO) FindUpdatingServerIds() (serverIds []int64, err error) { - ones, err := this.Query(). - State(ServerStateEnabled). - Attr("isUpdating", true). - ResultPk(). - FindAll() - if err != nil { - return nil, err - } - for _, one := range ones { - serverIds = append(serverIds, int64(one.(*Server).Id)) - } - return -} - -// 修改服务是否需要更新 -func (this *ServerDAO) UpdateServerIsUpdating(serverId int64, isUpdating bool) error { - _, err := this.Query(). - Pk(serverId). - Set("isUpdating", isUpdating). - Update() - return err -} - -// 查找WebId func (this *ServerDAO) FindServerWebId(serverId int64) (int64, error) { webId, err := this.Query(). Pk(serverId). @@ -571,28 +605,7 @@ func (this *ServerDAO) FindServerWebId(serverId int64) (int64, error) { return int64(webId), nil } -// 更新所有Web相关的处于更新状态 -func (this *ServerDAO) UpdateServerIsUpdatingWithWebId(webId int64) error { - _, err := this.Query(). - Attr("webId", webId). - Set("isUpdating", true). - Update() - return err -} - -// 生成唯一ID -func (this *ServerDAO) genUniqueId() (string, error) { - for { - uniqueId := rands.HexString(32) - ok, err := this.Query(). - Attr("uniqueId", uniqueId). - Exist() - if err != nil { - return "", err - } - if ok { - continue - } - return uniqueId, nil - } +// 创建事件 +func (this *ServerDAO) createEvent() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) } diff --git a/internal/db/models/server_dao_test.go b/internal/db/models/server_dao_test.go index 9ab9abe6..d0f1842b 100644 --- a/internal/db/models/server_dao_test.go +++ b/internal/db/models/server_dao_test.go @@ -1,6 +1,8 @@ package models import ( + "crypto/md5" + "encoding/json" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/logs" "testing" @@ -13,3 +15,30 @@ func TestServerDAO_ComposeServerConfig(t *testing.T) { } logs.PrintAsJSON(config, t) } + +func TestServerDAO_UpdateServerConfig(t *testing.T) { + config, err := SharedServerDAO.ComposeServerConfig(1) + if err != nil { + t.Fatal(err) + } + + configJSON, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + _, err = SharedServerDAO.UpdateServerConfig(1, configJSON) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestNewServerDAO_md5(t *testing.T) { + m := md5.New() + _, err := m.Write([]byte("123456")) + if err != nil { + t.Fatal(err) + } + h := m.Sum(nil) + t.Logf("%x", h) +} diff --git a/internal/db/models/server_model.go b/internal/db/models/server_model.go index 81d1079b..bdd38f13 100644 --- a/internal/db/models/server_model.go +++ b/internal/db/models/server_model.go @@ -4,7 +4,6 @@ package models type Server struct { Id uint32 `field:"id"` // ID IsOn uint8 `field:"isOn"` // 是否启用 - UniqueId string `field:"uniqueId"` // 唯一ID UserId uint32 `field:"userId"` // 用户ID AdminId uint32 `field:"adminId"` // 管理员ID Type string `field:"type"` // 服务类型 @@ -21,19 +20,18 @@ type Server struct { ReverseProxy string `field:"reverseProxy"` // 反向代理配置 GroupIds string `field:"groupIds"` // 分组ID列表 Config string `field:"config"` // 服务配置,自动生成 + ConfigMd5 string `field:"configMd5"` // Md5 ClusterId uint32 `field:"clusterId"` // 集群ID IncludeNodes string `field:"includeNodes"` // 部署条件 ExcludeNodes string `field:"excludeNodes"` // 节点排除条件 Version uint32 `field:"version"` // 版本号 CreatedAt uint64 `field:"createdAt"` // 创建时间 - IsUpdating uint8 `field:"isUpdating"` // 是否正在更新 State uint8 `field:"state"` // 状态 } type ServerOperator struct { Id interface{} // ID IsOn interface{} // 是否启用 - UniqueId interface{} // 唯一ID UserId interface{} // 用户ID AdminId interface{} // 管理员ID Type interface{} // 服务类型 @@ -50,12 +48,12 @@ type ServerOperator struct { ReverseProxy interface{} // 反向代理配置 GroupIds interface{} // 分组ID列表 Config interface{} // 服务配置,自动生成 + ConfigMd5 interface{} // Md5 ClusterId interface{} // 集群ID IncludeNodes interface{} // 部署条件 ExcludeNodes interface{} // 节点排除条件 Version interface{} // 版本号 CreatedAt interface{} // 创建时间 - IsUpdating interface{} // 是否正在更新 State interface{} // 状态 } diff --git a/internal/db/models/sys_event_dao.go b/internal/db/models/sys_event_dao.go new file mode 100644 index 00000000..7c68f589 --- /dev/null +++ b/internal/db/models/sys_event_dao.go @@ -0,0 +1,61 @@ +package models + +import ( + "encoding/json" + "errors" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +type SysEventDAO dbs.DAO + +func NewSysEventDAO() *SysEventDAO { + return dbs.NewDAO(&SysEventDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeSysEvents", + Model: new(SysEvent), + PkName: "id", + }, + }).(*SysEventDAO) +} + +var SharedSysEventDAO = NewSysEventDAO() + +// 创建事件 +func (this *SysEventDAO) CreateEvent(event EventInterface) error { + if event == nil { + return errors.New("event should not be nil") + } + + op := NewSysEventOperator() + op.Type = event.Type() + + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + op.Params = eventJSON + + _, err = this.Save(op) + return err +} + +// 查找事件 +func (this *SysEventDAO) FindEvents(size int64) (result []*SysEvent, err error) { + _, err = this.Query(). + Asc(). + Limit(size). + Slice(&result). + FindAll() + return +} + +// 删除事件 +func (this *SysEventDAO) DeleteEvent(eventId int64) error { + _, err := this.Query(). + Pk(eventId). + Delete() + return err +} diff --git a/internal/db/models/sys_event_dao_test.go b/internal/db/models/sys_event_dao_test.go new file mode 100644 index 00000000..1d540a80 --- /dev/null +++ b/internal/db/models/sys_event_dao_test.go @@ -0,0 +1,17 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "testing" +) + +func TestSysEvent_DecodeEvent(t *testing.T) { + event := &SysEvent{ + Type: "serverChange", + } + eventObj, err := event.DecodeEvent() + if err != nil { + t.Fatal(err) + } + t.Log(eventObj) +} diff --git a/internal/db/models/sys_event_model.go b/internal/db/models/sys_event_model.go new file mode 100644 index 00000000..8286dc64 --- /dev/null +++ b/internal/db/models/sys_event_model.go @@ -0,0 +1,20 @@ +package models + +// 系统事件 +type SysEvent struct { + Id uint64 `field:"id"` // ID + Type string `field:"type"` // 类型 + Params string `field:"params"` // 参数 + CreatedAt uint64 `field:"createdAt"` // 创建时间 +} + +type SysEventOperator struct { + Id interface{} // ID + Type interface{} // 类型 + Params interface{} // 参数 + CreatedAt interface{} // 创建时间 +} + +func NewSysEventOperator() *SysEventOperator { + return &SysEventOperator{} +} diff --git a/internal/db/models/sys_event_model_ext.go b/internal/db/models/sys_event_model_ext.go new file mode 100644 index 00000000..5a076a92 --- /dev/null +++ b/internal/db/models/sys_event_model_ext.go @@ -0,0 +1,27 @@ +package models + +import ( + "encoding/json" + "errors" + "reflect" +) + +// 解码事件 +func (this *SysEvent) DecodeEvent() (EventInterface, error) { + // 解析数据类型 + t, isOk := eventTypeMapping[this.Type] + if !isOk { + return nil, errors.New("can not found event type '" + this.Type + "'") + } + ptr := reflect.New(t).Interface().(EventInterface) + + // 解析参数 + if IsNotNull(this.Params) { + err := json.Unmarshal([]byte(this.Params), ptr) + if err != nil { + return nil, err + } + } + + return ptr, nil +} diff --git a/internal/db/models/sys_event_types.go b/internal/db/models/sys_event_types.go new file mode 100644 index 00000000..9458f9f7 --- /dev/null +++ b/internal/db/models/sys_event_types.go @@ -0,0 +1,64 @@ +package models + +import ( + "reflect" +) + +var eventTypeMapping = map[string]reflect.Type{} // eventType => reflect type + +func init() { + for _, event := range []EventInterface{ + NewServerChangeEvent(), + } { + eventTypeMapping[event.Type()] = reflect.ValueOf(event).Elem().Type() + } +} + +// 接口 +type EventInterface interface { + Type() string + Run() error +} + +// 服务变化 +type ServerChangeEvent struct { +} + +func NewServerChangeEvent() *ServerChangeEvent { + return &ServerChangeEvent{} +} + +func (this *ServerChangeEvent) Type() string { + return "serverChange" +} + +func (this *ServerChangeEvent) Run() error { + serverIds, err := SharedServerDAO.FindAllEnabledServerIds() + if err != nil { + return err + } + for _, serverId := range serverIds { + isChanged, err := SharedServerDAO.RenewServerConfig(serverId) + if err != nil { + return err + } + if !isChanged { + continue + } + + // 检查节点是否需要更新 + isOk, clusterId, err := SharedServerDAO.FindServerNodeFilters(serverId) + if err != nil { + return err + } + if !isOk { + continue + } + err = SharedNodeDAO.UpdateAllNodesLatestVersionMatch(clusterId) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/db/models/sys_locker_dao.go b/internal/db/models/sys_locker_dao.go new file mode 100644 index 00000000..6ece112a --- /dev/null +++ b/internal/db/models/sys_locker_dao.go @@ -0,0 +1,106 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" + "time" +) + +type SysLockerDAO dbs.DAO + +func NewSysLockerDAO() *SysLockerDAO { + return dbs.NewDAO(&SysLockerDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeSysLockers", + Model: new(SysLocker), + PkName: "id", + }, + }).(*SysLockerDAO) +} + +var SharedSysLockerDAO = NewSysLockerDAO() + +// 开锁 +func (this *SysLockerDAO) Lock(key string, timeout int64) (bool, error) { + maxErrors := 5 + for { + one, err := this.Query(). + Attr("key", key). + Find() + if err != nil { + maxErrors-- + if maxErrors < 0 { + return false, err + } + continue + } + + // 如果没有锁,则创建 + if one == nil { + op := NewSysLockerOperator() + op.Key = key + op.TimeoutAt = time.Now().Unix() + timeout + op.Version = 1 + _, err := this.Save(op) + if err != nil { + maxErrors-- + if maxErrors < 0 { + return false, err + } + continue + } + + return true, nil + } + + // 如果已经有锁 + locker := one.(*SysLocker) + if time.Now().Unix() <= int64(locker.TimeoutAt) { + return false, nil + } + + // 修改 + op := NewSysLockerOperator() + op.Id = locker.Id + op.Version = locker.Version + 1 + op.TimeoutAt = time.Now().Unix() + timeout + _, err = this.Save(op) + if err != nil { + maxErrors-- + if maxErrors < 0 { + return false, err + } + continue + } + + // 再次查询版本 + version, err := this.Query(). + Attr("key", key). + Result("version"). + FindCol("0") + if err != nil { + maxErrors-- + if maxErrors < 0 { + return false, err + } + continue + } + if types.Int64(version) != int64(locker.Version)+1 { + return false, nil + } + + return true, nil + } +} + +// 解锁 +func (this *SysLockerDAO) Unlock(key string) error { + _, err := this.Query(). + Attr("key", key). + Set("timeoutAt", time.Now().Unix()-86400*365). + Update() + return err +} diff --git a/internal/db/models/sys_locker_dao_test.go b/internal/db/models/sys_locker_dao_test.go new file mode 100644 index 00000000..9f2faa4f --- /dev/null +++ b/internal/db/models/sys_locker_dao_test.go @@ -0,0 +1,21 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "testing" +) + +func TestSysLockerDAO_Lock(t *testing.T) { + isOk, err := SharedSysLockerDAO.Lock("test", 600) + if err != nil { + t.Fatal(err) + } + t.Log(isOk) + + if isOk { + err = SharedSysLockerDAO.Unlock("test") + if err != nil { + t.Fatal(err) + } + } +} diff --git a/internal/db/models/sys_locker_model.go b/internal/db/models/sys_locker_model.go new file mode 100644 index 00000000..0bddd098 --- /dev/null +++ b/internal/db/models/sys_locker_model.go @@ -0,0 +1,20 @@ +package models + +// 并发锁 +type SysLocker struct { + Id uint64 `field:"id"` // ID + Key string `field:"key"` // 键值 + Version uint64 `field:"version"` // 版本号 + TimeoutAt uint64 `field:"timeoutAt"` // 超时时间 +} + +type SysLockerOperator struct { + Id interface{} // ID + Key interface{} // 键值 + Version interface{} // 版本号 + TimeoutAt interface{} // 超时时间 +} + +func NewSysLockerOperator() *SysLockerOperator { + return &SysLockerOperator{} +} diff --git a/internal/db/models/sys_locker_model_ext.go b/internal/db/models/sys_locker_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/sys_locker_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/sys_setting_dao.go b/internal/db/models/sys_setting_dao.go new file mode 100644 index 00000000..33c2ac9b --- /dev/null +++ b/internal/db/models/sys_setting_dao.go @@ -0,0 +1,86 @@ +package models + +import ( + "fmt" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +type SysSettingDAO dbs.DAO + +type SettingCode = string + +const ( + SettingCodeGlobalConfig SettingCode = "globalConfig" +) + +func NewSysSettingDAO() *SysSettingDAO { + return dbs.NewDAO(&SysSettingDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeSysSettings", + Model: new(SysSetting), + PkName: "id", + }, + }).(*SysSettingDAO) +} + +var SharedSysSettingDAO = NewSysSettingDAO() + +// 设置配置 +func (this *SysSettingDAO) UpdateSetting(code string, valueJSON []byte, args ...interface{}) error { + if len(args) > 0 { + code = fmt.Sprintf(code, args...) + } + + countRetries := 3 + var lastErr error + for i := 0; i < countRetries; i++ { + settingId, err := this.Query(). + Attr("code", code). + ResultPk(). + FindInt64Col(0) + if err != nil { + return err + } + + if settingId == 0 { + // 新建 + op := NewSysSettingOperator() + op.Code = code + op.Value = valueJSON + _, err = this.Save(op) + if err != nil { + lastErr = err + + // 因为错误的原因可能是因为code冲突,所以这里我们继续执行 + continue + } + return nil + } + + // 修改 + op := NewSysSettingOperator() + op.Id = settingId + op.Value = valueJSON + _, err = this.Save(op) + if err != nil { + return err + } + } + + return lastErr +} + +// 读取配置 +func (this *SysSettingDAO) ReadSetting(code string, args ...interface{}) (valueJSON []byte, err error) { + if len(args) > 0 { + code = fmt.Sprintf(code, args...) + } + col, err := this.Query(). + Attr("code", code). + Result("value"). + FindStringCol("") + return []byte(col), err +} diff --git a/internal/db/models/sys_setting_dao_test.go b/internal/db/models/sys_setting_dao_test.go new file mode 100644 index 00000000..571233b8 --- /dev/null +++ b/internal/db/models/sys_setting_dao_test.go @@ -0,0 +1,32 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "testing" +) + +func TestSysSettingDAO_UpdateSetting(t *testing.T) { + err := SharedSysSettingDAO.UpdateSetting("hello", []byte(`"world"`)) + if err != nil { + t.Fatal(err) + } + + value, err := SharedSysSettingDAO.ReadSetting("hello") + if err != nil { + t.Fatal(err) + } + t.Log("value:", string(value)) +} + +func TestSysSettingDAO_UpdateSetting_Args(t *testing.T) { + err := SharedSysSettingDAO.UpdateSetting("hello %d", []byte(`"world 123"`), 123) + if err != nil { + t.Fatal(err) + } + + value, err := SharedSysSettingDAO.ReadSetting("hello %d", 123) + if err != nil { + t.Fatal(err) + } + t.Log("value:", string(value)) +} diff --git a/internal/db/models/sys_setting_model.go b/internal/db/models/sys_setting_model.go new file mode 100644 index 00000000..22b132d3 --- /dev/null +++ b/internal/db/models/sys_setting_model.go @@ -0,0 +1,20 @@ +package models + +// 系统配置 +type SysSetting struct { + Id uint32 `field:"id"` // ID + UserId uint32 `field:"userId"` // 用户ID + Code string `field:"code"` // 代号 + Value string `field:"value"` // 配置值 +} + +type SysSettingOperator struct { + Id interface{} // ID + UserId interface{} // 用户ID + Code interface{} // 代号 + Value interface{} // 配置值 +} + +func NewSysSettingOperator() *SysSettingOperator { + return &SysSettingOperator{} +} diff --git a/internal/db/models/sys_setting_model_ext.go b/internal/db/models/sys_setting_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/sys_setting_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/tcp_firewall_policy_dao.go b/internal/db/models/tcp_firewall_policy_dao.go index d62be0c7..52ef2bfd 100644 --- a/internal/db/models/tcp_firewall_policy_dao.go +++ b/internal/db/models/tcp_firewall_policy_dao.go @@ -20,3 +20,17 @@ func NewTCPFirewallPolicyDAO() *TCPFirewallPolicyDAO { } var SharedTCPFirewallPolicyDAO = NewTCPFirewallPolicyDAO() + +// 初始化 +func (this *TCPFirewallPolicyDAO) Init() { + this.DAOObject.Init() + this.DAOObject.OnUpdate(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnInsert(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) + this.DAOObject.OnDelete(func() error { + return SharedSysEventDAO.CreateEvent(NewServerChangeEvent()) + }) +} diff --git a/internal/rpc/services/service_http_access_log_policy.go b/internal/rpc/services/service_http_access_log_policy.go index 80b2a617..90624abf 100644 --- a/internal/rpc/services/service_http_access_log_policy.go +++ b/internal/rpc/services/service_http_access_log_policy.go @@ -31,7 +31,7 @@ func (this *HTTPAccessLogPolicyService) FindAllEnabledHTTPAccessLogPolicies(ctx IsOn: policy.IsOn == 1, Type: policy.Name, OptionsJSON: []byte(policy.Options), - CondsJSON: []byte(policy.Conds), + CondsJSON: []byte(policy.CondGroups), }) } diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index d616f64c..cd18312b 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -7,10 +7,9 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/installers" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/logs" - "github.com/iwind/TeaGo/maps" - "github.com/iwind/TeaGo/types" ) type NodeService struct { @@ -62,7 +61,7 @@ func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb. if err != nil { return nil, err } - count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(req.ClusterId, types.Int8(req.InstallState)) + count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(req.ClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState)) if err != nil { return nil, err } @@ -75,7 +74,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List if err != nil { return nil, err } - nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(req.Offset, req.Size, req.ClusterId, types.Int8(req.InstallState)) + nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(req.Offset, req.Size, req.ClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState)) if err != nil { return nil, err } @@ -106,6 +105,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List result = append(result, &pb.Node{ Id: int64(node.Id), Name: node.Name, + Version: int64(node.Version), IsInstalled: node.IsInstalled == 1, Status: node.Status, Cluster: &pb.NodeCluster{ @@ -223,13 +223,15 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable } return &pb.FindEnabledNodeResponse{Node: &pb.Node{ - Id: int64(node.Id), - Name: node.Name, - Status: node.Status, - UniqueId: node.UniqueId, - Secret: node.Secret, - InstallDir: node.InstallDir, - IsInstalled: node.IsInstalled == 1, + Id: int64(node.Id), + Name: node.Name, + Status: node.Status, + UniqueId: node.UniqueId, + Version: int64(node.Version), + LatestVersion: int64(node.LatestVersion), + Secret: node.Secret, + InstallDir: node.InstallDir, + IsInstalled: node.IsInstalled == 1, Cluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, @@ -241,50 +243,19 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable // 组合节点配置 func (this *NodeService) ComposeNodeConfig(ctx context.Context, req *pb.ComposeNodeConfigRequest) (*pb.ComposeNodeConfigResponse, error) { + _ = req + // 校验节点 _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) if err != nil { return nil, err } - - node, err := models.SharedNodeDAO.FindEnabledNode(nodeId) - if err != nil { - return nil, err - } - if node == nil { - return nil, errors.New("node validate failed, please check 'nodeId' or 'secret'") - } - - nodeMap := maps.Map{ - "id": node.UniqueId, - "isOn": node.IsOn == 1, - "servers": []maps.Map{}, - "version": node.Version, - } - - // 获取所有的服务 - servers, err := models.SharedServerDAO.FindAllEnabledServersWithNode(int64(node.Id)) + nodeConfig, err := models.SharedNodeDAO.ComposeNodeConfig(nodeId) if err != nil { return nil, err } - serverMaps := []maps.Map{} - for _, server := range servers { - if len(server.Config) == 0 { - continue - } - configMap := maps.Map{} - err = json.Unmarshal([]byte(server.Config), &configMap) - if err != nil { - return nil, err - } - configMap["id"] = server.UniqueId - configMap["version"] = server.Version - serverMaps = append(serverMaps, configMap) - } - nodeMap["servers"] = serverMaps - - data, err := json.Marshal(nodeMap) + data, err := json.Marshal(nodeConfig) if err != nil { return nil, err } @@ -294,6 +265,7 @@ func (this *NodeService) ComposeNodeConfig(ctx context.Context, req *pb.ComposeN // 节点stream func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) error { + // TODO 使用此stream快速通知边缘节点更新 // 校验节点 _, nodeId, err := rpcutils.ValidateRequest(server.Context(), rpcutils.UserTypeNode) if err != nil { @@ -301,6 +273,8 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } logs.Println("nodeId:", nodeId) + _ = server.Send(&pb.NodeStreamResponse{}) + for { req, err := server.Recv() if err != nil { diff --git a/internal/rpc/services/service_server.go b/internal/rpc/services/service_server.go index df75d2dd..08205ac2 100644 --- a/internal/rpc/services/service_server.go +++ b/internal/rpc/services/service_server.go @@ -101,12 +101,6 @@ func (this *ServerService) UpdateServerHTTP(ctx context.Context, req *pb.UpdateS return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -136,12 +130,6 @@ func (this *ServerService) UpdateServerHTTPS(ctx context.Context, req *pb.Update return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -171,12 +159,6 @@ func (this *ServerService) UpdateServerTCP(ctx context.Context, req *pb.UpdateSe return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -206,12 +188,6 @@ func (this *ServerService) UpdateServerTLS(ctx context.Context, req *pb.UpdateSe return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -241,12 +217,6 @@ func (this *ServerService) UpdateServerUnix(ctx context.Context, req *pb.UpdateS return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -276,12 +246,6 @@ func (this *ServerService) UpdateServerUDP(ctx context.Context, req *pb.UpdateSe return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -311,12 +275,6 @@ func (this *ServerService) UpdateServerWeb(ctx context.Context, req *pb.UpdateSe return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -346,12 +304,6 @@ func (this *ServerService) UpdateServerReverseProxy(ctx context.Context, req *pb return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } @@ -381,12 +333,6 @@ func (this *ServerService) UpdateServerNames(ctx context.Context, req *pb.Update return nil, err } - // 更新新的节点版本 - err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId)) - if err != nil { - return nil, err - } - return &pb.RPCUpdateSuccess{}, nil } diff --git a/internal/tasks/event_looper.go b/internal/tasks/event_looper.go new file mode 100644 index 00000000..0e6c3583 --- /dev/null +++ b/internal/tasks/event_looper.go @@ -0,0 +1,69 @@ +package tasks + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/iwind/TeaGo/logs" + "time" +) + +func init() { + looper := NewEventLooper() + go looper.Start() +} + +type EventLooper struct { +} + +func NewEventLooper() *EventLooper { + return &EventLooper{} +} + +func (this *EventLooper) Start() { + ticker := time.NewTicker(2 * time.Second) + for range ticker.C { + err := this.loop() + if err != nil { + logs.Println("[EVENT_LOOPER]" + err.Error()) + } + } +} + +func (this *EventLooper) loop() error { + lockerKey := "eventLooper" + isOk, err := models.SharedSysLockerDAO.Lock(lockerKey, 3600) + if err != nil { + return err + } + defer func() { + err = models.SharedSysLockerDAO.Unlock(lockerKey) + if err != nil { + logs.Println("[EVENT_LOOPER]" + err.Error()) + } + }() + if !isOk { + return nil + } + + events, err := models.SharedSysEventDAO.FindEvents(100) + if err != nil { + return err + } + for _, eventOne := range events { + event, err := eventOne.DecodeEvent() + if err != nil { + logs.Println("[EVENT_LOOPER]" + err.Error()) + continue + } + err = event.Run() + if err != nil { + logs.Println("[EVENT_LOOPER]" + err.Error()) + continue + } + err = models.SharedSysEventDAO.DeleteEvent(int64(eventOne.Id)) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/tasks/task_server_update.go b/internal/tasks/task_server_update.go deleted file mode 100644 index 6061be03..00000000 --- a/internal/tasks/task_server_update.go +++ /dev/null @@ -1,70 +0,0 @@ -package tasks - -import ( - "encoding/json" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/iwind/TeaGo/logs" - "time" -) - -// TODO 考虑多个API服务同时运行的冲突 -func init() { - task := &ServerUpdateTask{} - go task.Run() -} - -// 更新服务配置 -type ServerUpdateTask struct { -} - -func (this *ServerUpdateTask) Run() { - ticker := time.NewTicker(1 * time.Second) - for range ticker.C { - this.loop() - } -} - -func (this *ServerUpdateTask) loop() { - serverIds, err := models.SharedServerDAO.FindUpdatingServerIds() - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - return - } - if len(serverIds) == 0 { - return - } - for _, serverId := range serverIds { - // 查找配置 - config, err := models.SharedServerDAO.ComposeServerConfig(serverId) - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - continue - } - if config == nil { - err = models.SharedServerDAO.UpdateServerIsUpdating(serverId, false) - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - continue - } - } - configData, err := json.Marshal(config) - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - continue - } - - // 修改配置 - err = models.SharedServerDAO.UpdateServerConfig(serverId, configData) - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - continue - } - - // 修改更新状态 - err = models.SharedServerDAO.UpdateServerIsUpdating(serverId, false) - if err != nil { - logs.Println("[ServerUpdateTask]" + err.Error()) - continue - } - } -}