实现HTTP部分功能

This commit is contained in:
刘祥超
2020-09-26 08:06:40 +08:00
parent e12ecca2f6
commit 50731984ec
45 changed files with 1117 additions and 456 deletions

4
go.mod
View File

@@ -9,9 +9,7 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.4.2
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6
github.com/pkg/sftp v1.12.0
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
google.golang.org/grpc v1.32.0

12
go.sum
View File

@@ -4,7 +4,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -22,8 +21,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.0.0-beta.7/go.mod h1:FGJAWDWFht1sQ4qxyJHZZbVyvnVcKQN0E3u5/5lRz+g=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
@@ -53,13 +50,10 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iwind/TeaGo v0.0.0-20200822074248-b1cf7248c98a/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20200910072805-729cffe36729 h1:/v0WhSFVeNay/dA5zU9iCBXlgVDfxnztuanlauXE0gM=
github.com/iwind/TeaGo v0.0.0-20200910072805-729cffe36729/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20200916035436-dbdbf25f8524 h1:WnARCxusBjX5vJ8E71AjhuxSeAMGfEiYvi42XVK/Yf8=
github.com/iwind/TeaGo v0.0.0-20200916035436-dbdbf25f8524/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e h1:/xn7wUvlwaoA5IkdBUctv2OQbJSZ0/Dw8qRJmn55sJk=
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 h1:7OZC/Qy7Z/hK9vG6YQOwHNOUPunSImYYJMiIfvuDQZ0=
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
@@ -93,9 +87,7 @@ github.com/pkg/sftp v1.12.0 h1:/f3b24xrDhkhddlaobPe2JgBqfdt+gC/NYl0QY9IOuI=
github.com/pkg/sftp v1.12.0/go.mod h1:fUqqXB5vEgVCZ131L+9say31RAri6aF6KDViawhxKK8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=

View File

@@ -29,6 +29,20 @@ func NewHTTPAccessLogPolicyDAO() *HTTPAccessLogPolicyDAO {
var SharedHTTPAccessLogPolicyDAO = NewHTTPAccessLogPolicyDAO()
// 初始化
func (this *HTTPAccessLogPolicyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(id int64) error {
_, err := this.Query().
@@ -104,14 +118,13 @@ func (this *HTTPAccessLogPolicyDAO) ComposeAccessLogPolicyConfig(policyId int64)
}
// 条件
if IsNotNull(policy.Conds) {
// TODO 需要用更全面的条件管理器来代替RequestCond
conds := []*shared.RequestCond{}
err = json.Unmarshal([]byte(policy.Conds), &conds)
if IsNotNull(policy.CondGroups) {
condGroups := []*shared.HTTPRequestCondGroup{}
err = json.Unmarshal([]byte(policy.CondGroups), &condGroups)
if err != nil {
return nil, err
}
config.Conds = conds
config.CondGroups = condGroups
}
return config, nil

View File

@@ -12,7 +12,7 @@ type HTTPAccessLogPolicy struct {
IsOn uint8 `field:"isOn"` // 是否启用
Type string `field:"type"` // 存储类型
Options string `field:"options"` // 存储选项
Conds string `field:"conds"` // 请求条件
CondGroups string `field:"condGroups"` // 请求条件
}
type HTTPAccessLogPolicyOperator struct {
@@ -26,7 +26,7 @@ type HTTPAccessLogPolicyOperator struct {
IsOn interface{} // 是否启用
Type interface{} // 存储类型
Options interface{} // 存储选项
Conds interface{} // 请求条件
CondGroups interface{} // 请求条件
}
func NewHTTPAccessLogPolicyOperator() *HTTPAccessLogPolicyOperator {

View File

@@ -26,6 +26,20 @@ func NewHTTPCachePolicyDAO() *HTTPCachePolicyDAO {
var SharedHTTPCachePolicyDAO = NewHTTPCachePolicyDAO()
// 初始化
func (this *HTTPCachePolicyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPCachePolicyDAO) EnableHTTPCachePolicy(id int64) error {
_, err := this.Query().

View File

@@ -16,7 +16,7 @@ type HTTPCachePolicy struct {
SkipCacheControlValues string `field:"skipCacheControlValues"` // 忽略的cache-control
SkipSetCookie uint8 `field:"skipSetCookie"` // 是否忽略Set-Cookie Header
EnableRequestCachePragma uint8 `field:"enableRequestCachePragma"` // 是否支持客户端的Pragma: no-cache
Conds string `field:"conds"` // 请求条件
CondGroups string `field:"condGroups"` // 请求条件
CreatedAt uint64 `field:"createdAt"` // 创建时间
State uint8 `field:"state"` // 状态
}
@@ -36,7 +36,7 @@ type HTTPCachePolicyOperator struct {
SkipCacheControlValues interface{} // 忽略的cache-control
SkipSetCookie interface{} // 是否忽略Set-Cookie Header
EnableRequestCachePragma interface{} // 是否支持客户端的Pragma: no-cache
Conds interface{} // 请求条件
CondGroups interface{} // 请求条件
CreatedAt interface{} // 创建时间
State interface{} // 状态
}

View File

@@ -26,6 +26,20 @@ func NewHTTPFirewallPolicyDAO() *HTTPFirewallPolicyDAO {
var SharedHTTPFirewallPolicyDAO = NewHTTPFirewallPolicyDAO()
// 初始化
func (this *HTTPFirewallPolicyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPFirewallPolicyDAO) EnableHTTPFirewallPolicy(id int64) error {
_, err := this.Query().

View File

@@ -12,7 +12,7 @@ type HTTPFirewallPolicy struct {
Name string `field:"name"` // 名称
Inbound string `field:"inbound"` // 入站规则
Outbound string `field:"outbound"` // 出站规则
Conds string `field:"conds"` // 条件
CondGroups string `field:"condGroups"` // 条件
}
type HTTPFirewallPolicyOperator struct {
@@ -26,7 +26,7 @@ type HTTPFirewallPolicyOperator struct {
Name interface{} // 名称
Inbound interface{} // 入站规则
Outbound interface{} // 出站规则
Conds interface{} // 条件
CondGroups interface{} // 条件
}
func NewHTTPFirewallPolicyOperator() *HTTPFirewallPolicyOperator {

View File

@@ -26,6 +26,20 @@ func NewHTTPFirewallRuleDAO() *HTTPFirewallRuleDAO {
var SharedHTTPFirewallRuleDAO = NewHTTPFirewallRuleDAO()
// 初始化
func (this *HTTPFirewallRuleDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPFirewallRuleDAO) EnableHTTPFirewallRule(id uint32) error {
_, err := this.Query().

View File

@@ -26,6 +26,20 @@ func NewHTTPFirewallRuleGroupDAO() *HTTPFirewallRuleGroupDAO {
var SharedHTTPFirewallRuleGroupDAO = NewHTTPFirewallRuleGroupDAO()
// 初始化
func (this *HTTPFirewallRuleGroupDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPFirewallRuleGroupDAO) EnableHTTPFirewallRuleGroup(id uint32) error {
_, err := this.Query().

View File

@@ -26,6 +26,20 @@ func NewHTTPFirewallRuleSetDAO() *HTTPFirewallRuleSetDAO {
var SharedHTTPFirewallRuleSetDAO = NewHTTPFirewallRuleSetDAO()
// 初始化
func (this *HTTPFirewallRuleSetDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPFirewallRuleSetDAO) EnableHTTPFirewallRuleSet(id uint32) error {
_, err := this.Query().

View File

@@ -31,6 +31,20 @@ func NewHTTPGzipDAO() *HTTPGzipDAO {
var SharedHTTPGzipDAO = NewHTTPGzipDAO()
// 初始化
func (this *HTTPGzipDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPGzipDAO) EnableHTTPGzip(id int64) error {
_, err := this.Query().

View File

@@ -2,29 +2,29 @@ package models
// Gzip配置
type HTTPGzip struct {
Id uint32 `field:"id"` // ID
AdminId uint32 `field:"adminId"` // 管理员ID
UserId uint32 `field:"userId"` // 用户ID
IsOn uint8 `field:"isOn"` // 是否启用
Level uint32 `field:"level"` // 压缩级别
MinLength string `field:"minLength"` // 可压缩最小值
MaxLength string `field:"maxLength"` // 可压缩最大值
State uint8 `field:"state"` // 状态
CreatedAt uint64 `field:"createdAt"` // 创建时间
Conds string `field:"conds"` // 条件
Id uint32 `field:"id"` // ID
AdminId uint32 `field:"adminId"` // 管理员ID
UserId uint32 `field:"userId"` // 用户ID
IsOn uint8 `field:"isOn"` // 是否启用
Level uint32 `field:"level"` // 压缩级别
MinLength string `field:"minLength"` // 可压缩最小值
MaxLength string `field:"maxLength"` // 可压缩最大值
State uint8 `field:"state"` // 状态
CreatedAt uint64 `field:"createdAt"` // 创建时间
CondGroups string `field:"condGroups"` // 条件
}
type HTTPGzipOperator struct {
Id interface{} // ID
AdminId interface{} // 管理员ID
UserId interface{} // 用户ID
IsOn interface{} // 是否启用
Level interface{} // 压缩级别
MinLength interface{} // 可压缩最小值
MaxLength interface{} // 可压缩最大值
State interface{} // 状态
CreatedAt interface{} // 创建时间
Conds interface{} // 条件
Id interface{} // ID
AdminId interface{} // 管理员ID
UserId interface{} // 用户ID
IsOn interface{} // 是否启用
Level interface{} // 压缩级别
MinLength interface{} // 可压缩最小值
MaxLength interface{} // 可压缩最大值
State interface{} // 状态
CreatedAt interface{} // 创建时间
CondGroups interface{} // 条件
}
func NewHTTPGzipOperator() *HTTPGzipOperator {

View File

@@ -30,6 +30,20 @@ func NewHTTPHeaderDAO() *HTTPHeaderDAO {
var SharedHTTPHeaderDAO = NewHTTPHeaderDAO()
// 初始化
func (this *HTTPHeaderDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPHeaderDAO) EnableHTTPHeader(id int64) error {
_, err := this.Query().
@@ -75,7 +89,17 @@ func (this *HTTPHeaderDAO) CreateHeader(name string, value string) (int64, error
op.IsOn = true
op.Name = name
op.Value = value
_, err := this.Save(op)
statusConfig := &shared.HTTPStatusConfig{
Always: true,
}
statusJSON, err := json.Marshal(statusConfig)
if err != nil {
return 0, err
}
op.Status = statusJSON
_, err = this.Save(op)
if err != nil {
return 0, err
}

View File

@@ -30,6 +30,20 @@ func NewHTTPHeaderPolicyDAO() *HTTPHeaderPolicyDAO {
var SharedHTTPHeaderPolicyDAO = NewHTTPHeaderPolicyDAO()
// 初始化
func (this *HTTPHeaderPolicyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPHeaderPolicyDAO) EnableHTTPHeaderPolicy(id int64) error {
_, err := this.Query().
@@ -83,8 +97,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateAddingHeaders(policyId int64, headersJSON
op.AddHeaders = headersJSON
_, err := this.Save(op)
// TODO 更新相关配置
return err
}
@@ -99,8 +111,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateSettingHeaders(policyId int64, headersJSO
op.SetHeaders = headersJSON
_, err := this.Save(op)
// TODO 更新相关配置
return err
}
@@ -115,8 +125,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateReplacingHeaders(policyId int64, headersJ
op.ReplaceHeaders = headersJSON
_, err := this.Save(op)
// TODO 更新相关配置
return err
}
@@ -131,8 +139,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateAddingTrailers(policyId int64, headersJSO
op.AddTrailers = headersJSON
_, err := this.Save(op)
// TODO 更新相关配置
return err
}
@@ -152,8 +158,6 @@ func (this *HTTPHeaderPolicyDAO) UpdateDeletingHeaders(policyId int64, headerNam
op.DeleteHeaders = string(namesJSON)
_, err = this.Save(op)
// TODO 更新相关配置
return err
}
@@ -173,78 +177,91 @@ func (this *HTTPHeaderPolicyDAO) ComposeHeaderPolicyConfig(headerPolicyId int64)
// AddHeaders
if len(policy.AddHeaders) > 0 {
headers := []*shared.HTTPHeaderConfig{}
err = json.Unmarshal([]byte(policy.AddHeaders), &headers)
refs := []*shared.HTTPHeaderRef{}
err = json.Unmarshal([]byte(policy.AddHeaders), &refs)
if err != nil {
return nil, err
}
if len(headers) > 0 {
for k, header := range headers {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id)
if len(refs) > 0 {
for _, ref := range refs {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId)
if err != nil {
return nil, err
}
headers[k] = headerConfig
config.AddHeaders = append(config.AddHeaders, headerConfig)
}
config.AddHeaders = headers
}
}
// AddTrailers
if len(policy.AddTrailers) > 0 {
headers := []*shared.HTTPHeaderConfig{}
err = json.Unmarshal([]byte(policy.AddTrailers), &headers)
refs := []*shared.HTTPHeaderRef{}
err = json.Unmarshal([]byte(policy.AddTrailers), &refs)
if err != nil {
return nil, err
}
if len(headers) > 0 {
for k, header := range headers {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id)
if len(refs) > 0 {
resultRefs := []*shared.HTTPHeaderRef{}
for _, ref := range refs {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId)
if err != nil {
return nil, err
}
headers[k] = headerConfig
if headerConfig == nil {
continue
}
resultRefs = append(resultRefs, ref)
config.AddTrailers = append(config.AddTrailers, headerConfig)
}
config.AddTrailers = headers
config.AddHeaderRefs = resultRefs
}
}
// SetHeaders
if len(policy.SetHeaders) > 0 {
headers := []*shared.HTTPHeaderConfig{}
err = json.Unmarshal([]byte(policy.SetHeaders), &headers)
refs := []*shared.HTTPHeaderRef{}
err = json.Unmarshal([]byte(policy.SetHeaders), &refs)
if err != nil {
return nil, err
}
if len(headers) > 0 {
for k, header := range headers {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id)
if len(refs) > 0 {
resultRefs := []*shared.HTTPHeaderRef{}
for _, ref := range refs {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId)
if err != nil {
return nil, err
}
headers[k] = headerConfig
if headerConfig == nil {
continue
}
resultRefs = append(resultRefs, ref)
config.SetHeaders = append(config.SetHeaders, headerConfig)
}
config.SetHeaders = headers
config.SetHeaderRefs = resultRefs
}
}
// ReplaceHeaders
if len(policy.ReplaceHeaders) > 0 {
headers := []*shared.HTTPHeaderConfig{}
err = json.Unmarshal([]byte(policy.ReplaceHeaders), &headers)
refs := []*shared.HTTPHeaderRef{}
err = json.Unmarshal([]byte(policy.ReplaceHeaders), &refs)
if err != nil {
return nil, err
}
if len(headers) > 0 {
for k, header := range headers {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(header.Id)
if len(refs) > 0 {
resultRefs := []*shared.HTTPHeaderRef{}
for _, ref := range refs {
headerConfig, err := SharedHTTPHeaderDAO.ComposeHeaderConfig(ref.HeaderId)
if err != nil {
return nil, err
}
headers[k] = headerConfig
if headerConfig == nil {
continue
}
resultRefs = append(resultRefs, ref)
config.ReplaceHeaders = append(config.ReplaceHeaders, headerConfig)
}
config.ReplaceHeaders = headers
config.ReplaceHeaderRefs = resultRefs
}
}
@@ -255,7 +272,7 @@ func (this *HTTPHeaderPolicyDAO) ComposeHeaderPolicyConfig(headerPolicyId int64)
if err != nil {
return nil, err
}
config.DeletedHeaders = headers
config.DeleteHeaders = headers
}
// Expires

View File

@@ -30,6 +30,20 @@ func NewHTTPLocationDAO() *HTTPLocationDAO {
var SharedHTTPLocationDAO = NewHTTPLocationDAO()
// 初始化
func (this *HTTPLocationDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPLocationDAO) EnableHTTPLocation(id int64) error {
_, err := this.Query().

View File

@@ -17,6 +17,7 @@ type HTTPLocation struct {
ReverseProxy string `field:"reverseProxy"` // 反向代理
UrlPrefix string `field:"urlPrefix"` // URL前缀
IsBreak uint8 `field:"isBreak"` // 是否终止匹配
CondGroups string `field:"condGroups"` // 匹配条件
}
type HTTPLocationOperator struct {
@@ -35,6 +36,7 @@ type HTTPLocationOperator struct {
ReverseProxy interface{} // 反向代理
UrlPrefix interface{} // URL前缀
IsBreak interface{} // 是否终止匹配
CondGroups interface{} // 匹配条件
}
func NewHTTPLocationOperator() *HTTPLocationOperator {

View File

@@ -30,6 +30,20 @@ func NewHTTPPageDAO() *HTTPPageDAO {
var SharedHTTPPageDAO = NewHTTPPageDAO()
// 初始化
func (this *HTTPPageDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPPageDAO) EnableHTTPPage(id int64) error {
_, err := this.Query().
@@ -107,7 +121,6 @@ func (this *HTTPPageDAO) UpdatePage(pageId int64, statusList []string, url strin
op.NewStatus = newStatus
_, err = this.Save(op)
// TODO 修改相关引用的对象
return err
}

View File

@@ -31,6 +31,19 @@ func NewHTTPWebDAO() *HTTPWebDAO {
var SharedHTTPWebDAO = NewHTTPWebDAO()
func (this *HTTPWebDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *HTTPWebDAO) EnableHTTPWeb(id int64) error {
_, err := this.Query().
@@ -238,8 +251,6 @@ func (this *HTTPWebDAO) ComposeWebConfig(webId int64) (*serverconfigs.HTTPWebCon
config.RedirectToHttps = redirectToHTTPSConfig
}
// TODO 更多配置
return config, nil
}
@@ -264,11 +275,7 @@ func (this *HTTPWebDAO) UpdateWeb(webId int64, root string) error {
op.Id = webId
op.Root = root
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 修改Gzip配置
@@ -280,11 +287,7 @@ func (this *HTTPWebDAO) UpdateWebGzip(webId int64, gzipJSON []byte) error {
op.Id = webId
op.Gzip = gzipJSON
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 修改字符编码
@@ -296,11 +299,7 @@ func (this *HTTPWebDAO) UpdateWebCharset(webId int64, charsetJSON []byte) error
op.Id = webId
op.Charset = charsetJSON
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改请求Header策略
@@ -312,11 +311,7 @@ func (this *HTTPWebDAO) UpdateWebRequestHeaderPolicy(webId int64, headerPolicyJS
op.Id = webId
op.RequestHeader = JSONBytes(headerPolicyJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改响应Header策略
@@ -328,11 +323,7 @@ func (this *HTTPWebDAO) UpdateWebResponseHeaderPolicy(webId int64, headerPolicyJ
op.Id = webId
op.ResponseHeader = JSONBytes(headerPolicyJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改特殊页面配置
@@ -344,11 +335,7 @@ func (this *HTTPWebDAO) UpdateWebPages(webId int64, pagesJSON []byte) error {
op.Id = webId
op.Pages = JSONBytes(pagesJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改Shutdown配置
@@ -360,11 +347,7 @@ func (this *HTTPWebDAO) UpdateWebShutdown(webId int64, shutdownJSON []byte) erro
op.Id = webId
op.Shutdown = JSONBytes(shutdownJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改访问日志策略
@@ -376,11 +359,7 @@ func (this *HTTPWebDAO) UpdateWebAccessLogConfig(webId int64, accessLogJSON []by
op.Id = webId
op.AccessLog = JSONBytes(accessLogJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改统计配置
@@ -392,11 +371,7 @@ func (this *HTTPWebDAO) UpdateWebStat(webId int64, statJSON []byte) error {
op.Id = webId
op.Stat = JSONBytes(statJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改缓存配置
@@ -408,11 +383,7 @@ func (this *HTTPWebDAO) UpdateWebCache(webId int64, cacheJSON []byte) error {
op.Id = webId
op.Cache = JSONBytes(cacheJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改防火墙配置
@@ -424,11 +395,7 @@ func (this *HTTPWebDAO) UpdateWebFirewall(webId int64, firewallJSON []byte) erro
op.Id = webId
op.Firewall = JSONBytes(firewallJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改路径规则配置
@@ -440,11 +407,7 @@ func (this *HTTPWebDAO) UpdateWebLocations(webId int64, locationsJSON []byte) er
op.Id = webId
op.Locations = JSONBytes(locationsJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
return err
}
// 更改跳转到HTTPS设置
@@ -456,21 +419,5 @@ func (this *HTTPWebDAO) UpdateWebRedirectToHTTPS(webId int64, redirectToHTTPSJSO
op.Id = webId
op.RedirectToHttps = JSONBytes(redirectToHTTPSJSON)
_, err := this.Save(op)
if err != nil {
return err
}
return this.NotifyUpdating(webId)
}
// 通知更新
func (this *HTTPWebDAO) NotifyUpdating(webId int64) error {
err := SharedServerDAO.UpdateServerIsUpdatingWithWebId(webId)
if err != nil {
return err
}
// TODO 更新所有使用此Web配置的Location所在服务
return nil
return err
}

View File

@@ -3,20 +3,20 @@ package models
import (
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"strconv"
)
const (
NodeStateEnabled = 1 // 已启用
NodeStateDisabled = 0 // 已禁用
NodeInstallStateAll = 0 // 全部
NodeInstallStateInstalled = 1 // 已安装
NodeInstallStateNotInstalled = 2 // 未安装
)
type NodeDAO dbs.DAO
@@ -182,7 +182,7 @@ func (this *NodeDAO) CountAllEnabledNodes() (int64, error) {
}
// 列出单页节点
func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId int64, installState int8) (result []*Node, err error) {
func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId int64, installState configutils.BoolState, activeState configutils.BoolState) (result []*Node, err error) {
query := this.Query().
State(NodeStateEnabled).
Offset(offset).
@@ -197,14 +197,24 @@ func (this *NodeDAO) ListEnabledNodesMatch(offset int64, size int64, clusterId i
// 安装状态
switch installState {
case NodeInstallStateAll:
// 不做任何事情
case NodeInstallStateInstalled:
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
query.Attr("isInstalled", 1)
case NodeInstallStateNotInstalled:
case configutils.BoolStateNo:
query.Attr("isInstalled", 0)
}
// 在线状态
switch activeState {
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
query.Where("JSON_EXTRACT(status, '$.isActive') AND JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()<=60")
case configutils.BoolStateNo:
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()>60)")
}
_, err = query.FindAll()
return
}
@@ -266,7 +276,7 @@ func (this *NodeDAO) FindAllNodeIdsMatch(clusterId int64) (result []int64, err e
}
// 计算节点数量
func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState int8) (int64, error) {
func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState configutils.BoolState, activeState configutils.BoolState) (int64, error) {
query := this.Query()
query.State(NodeStateEnabled)
@@ -277,14 +287,24 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState int
// 安装状态
switch installState {
case NodeInstallStateAll:
// 不做任何事情
case NodeInstallStateInstalled:
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
query.Attr("isInstalled", 1)
case NodeInstallStateNotInstalled:
case configutils.BoolStateNo:
query.Attr("isInstalled", 0)
}
// 在线状态
switch activeState {
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
query.Where("JSON_EXTRACT(status, '$.isActive') AND JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()<=60")
case configutils.BoolStateNo:
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR JSON_EXTRACT(status, '$.updatedAt')-UNIX_TIMESTAMP()>60)")
}
return query.Count()
}
@@ -347,6 +367,61 @@ func (this *NodeDAO) UpdateNodeInstallStatus(nodeId int64, status *NodeInstallSt
return err
}
// 组合配置
func (this *NodeDAO) ComposeNodeConfig(nodeId int64) (*nodeconfigs.NodeConfig, error) {
node, err := this.FindEnabledNode(nodeId)
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("node not found '" + strconv.FormatInt(nodeId, 10) + "'")
}
config := &nodeconfigs.NodeConfig{
Id: node.UniqueId,
IsOn: node.IsOn == 1,
Servers: nil,
Version: int64(node.Version),
Name: node.Name,
}
// 获取所有的服务
servers, err := SharedServerDAO.FindAllEnabledServersWithNode(int64(node.Id))
if err != nil {
return nil, err
}
for _, server := range servers {
if len(server.Config) == 0 {
continue
}
serverConfig := &serverconfigs.ServerConfig{}
err = json.Unmarshal([]byte(server.Config), serverConfig)
if err != nil {
return nil, err
}
config.Servers = append(config.Servers, serverConfig)
}
// 全局设置
// TODO 根据用户的不同读取不同的全局设置
settingJSON, err := SharedSysSettingDAO.ReadSetting(SettingCodeGlobalConfig)
if err != nil {
return nil, err
}
if len(settingJSON) > 0 {
globalConfig := &serverconfigs.GlobalConfig{}
err = json.Unmarshal(settingJSON, globalConfig)
if err != nil {
return nil, err
}
config.GlobalConfig = globalConfig
}
return config, nil
}
// 生成唯一ID
func (this *NodeDAO) genUniqueId() (string, error) {
for {

View File

@@ -32,6 +32,20 @@ func NewOriginDAO() *OriginDAO {
var SharedOriginDAO = NewOriginDAO()
// 初始化
func (this *OriginDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *OriginDAO) EnableOrigin(id int64) error {
_, err := this.Query().
@@ -167,7 +181,7 @@ func (this *OriginDAO) ComposeOriginConfig(originId int64) (*serverconfigs.Origi
return nil, err
}
if policyConfig != nil {
config.RequestHeaders = policyConfig
config.RequestHeaderPolicy = policyConfig
}
}
@@ -177,7 +191,7 @@ func (this *OriginDAO) ComposeOriginConfig(originId int64) (*serverconfigs.Origi
return nil, err
}
if policyConfig != nil {
config.ResponseHeaders = policyConfig
config.ResponseHeaderPolicy = policyConfig
}
}

View File

@@ -6,7 +6,7 @@ import (
)
func TestOriginServerDAO_ComposeOriginConfig(t *testing.T) {
config, err := SharedOriginServerDAO.ComposeOriginConfig(1)
config, err := SharedOriginDAO.ComposeOriginConfig(1)
if err != nil {
t.Fatal(err)
}

View File

@@ -30,13 +30,30 @@ func NewReverseProxyDAO() *ReverseProxyDAO {
var SharedReverseProxyDAO = NewReverseProxyDAO()
// 初始化
func (this *ReverseProxyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}
// 启用条目
func (this *ReverseProxyDAO) EnableReverseProxy(id int64) error {
_, err := this.Query().
Pk(id).
Set("state", ReverseProxyStateEnabled).
Update()
return err
if err != nil {
return err
}
return this.CreateEvent()
}
// 禁用条目
@@ -45,7 +62,10 @@ func (this *ReverseProxyDAO) DisableReverseProxy(id int64) error {
Pk(id).
Set("state", ReverseProxyStateDisabled).
Update()
return err
if err != nil {
return err
}
return this.CreateEvent()
}
// 查找启用中的条目
@@ -106,7 +126,7 @@ func (this *ReverseProxyDAO) ComposeReverseProxyConfig(reverseProxyId int64) (*s
return nil, err
}
for _, originConfig := range originRefs {
originConfig, err := SharedOriginDAO.ComposeOriginConfig(int64(originConfig.OriginId))
originConfig, err := SharedOriginDAO.ComposeOriginConfig(originConfig.OriginId)
if err != nil {
return nil, err
}
@@ -155,8 +175,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyScheduling(reverseProxyId int64,
}
_, err := this.Save(op)
// TODO 更新所有使用此反向代理的服务
return err
}
@@ -174,8 +192,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyPrimaryOrigins(reverseProxyId int
}
_, err := this.Save(op)
// TODO 更新所有使用此反向代理的服务
return err
}
@@ -193,8 +209,6 @@ func (this *ReverseProxyDAO) UpdateReverseProxyBackupOrigins(reverseProxyId int6
}
_, err := this.Save(op)
// TODO 更新所有使用此反向代理的服务
return err
}
@@ -206,3 +220,8 @@ func (this *ReverseProxyDAO) UpdateReverseProxyIsOn(reverseProxyId int64, isOn b
Update()
return err
}
// 通知更新
func (this *ReverseProxyDAO) CreateEvent() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
}

View File

@@ -1,13 +1,14 @@
package models
import (
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
)
@@ -31,6 +32,13 @@ func NewServerDAO() *ServerDAO {
var SharedServerDAO = NewServerDAO()
// 初始化
func (this *ServerDAO) Init() {
this.DAOObject.Init()
// 这里不处理增删改事件是为了避免Server修改本身的时候也要触发别的Server变更
}
// 启用条目
func (this *ServerDAO) EnableServer(id uint32) (rowsAffected int64, err error) {
return this.Query().
@@ -70,13 +78,7 @@ func (this *ServerDAO) FindEnabledServerType(serverId int64) (string, error) {
// 创建服务
func (this *ServerDAO) CreateServer(adminId int64, userId int64, serverType serverconfigs.ServerType, name string, description string, serverNamesJSON string, httpJSON string, httpsJSON string, tcpJSON string, tlsJSON string, unixJSON string, udpJSON string, webId int64, reverseProxyJSON []byte, clusterId int64, includeNodesJSON string, excludeNodesJSON string) (serverId int64, err error) {
uniqueId, err := this.genUniqueId()
if err != nil {
return 0, err
}
op := NewServerOperator()
op.UniqueId = uniqueId
op.UserId = userId
op.AdminId = adminId
op.Name = name
@@ -127,8 +129,7 @@ func (this *ServerDAO) CreateServer(adminId int64, userId int64, serverType serv
}
serverId = types.Int64(op.Id)
err = this.RenewServerConfig(serverId)
return serverId, err
return serverId, nil
}
// 修改服务基本信息
@@ -141,25 +142,47 @@ func (this *ServerDAO) UpdateServerBasic(serverId int64, name string, descriptio
op.Name = name
op.Description = description
op.ClusterId = clusterId
op.Version = dbs.SQL("version=version+1")
_, err := this.Save(op)
return err
if err != nil {
return err
}
return this.createEvent()
}
// 修改服务配置
func (this *ServerDAO) UpdateServerConfig(serverId int64, config []byte) error {
func (this *ServerDAO) UpdateServerConfig(serverId int64, configJSON []byte) (isChanged bool, err error) {
if serverId <= 0 {
return errors.New("serverId should not be smaller than 0")
return false, errors.New("serverId should not be smaller than 0")
}
if len(config) == 0 {
config = []byte("null")
}
_, err := this.Query().
// 查询以前的md5
oldConfigMd5, err := this.Query().
Pk(serverId).
Set("config", string(config)).
Set("version", dbs.SQL("version+1")).
Update()
return err
Result("configMd5").
FindStringCol("")
if err != nil {
return false, err
}
m := md5.New()
_, _ = m.Write(configJSON)
h := m.Sum(nil)
newConfigMd5 := fmt.Sprintf("%x", h)
// 如果配置相同则不更改
if oldConfigMd5 == newConfigMd5 {
return false, nil
}
op := NewServerOperator()
op.Id = serverId
op.Config = JSONBytes(configJSON)
op.Version = dbs.SQL("version+1")
op.ConfigMd5 = newConfigMd5
_, err = this.Save(op)
return true, err
}
// 修改HTTP配置
@@ -177,7 +200,8 @@ func (this *ServerDAO) UpdateServerHTTP(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改HTTPS配置
@@ -195,7 +219,8 @@ func (this *ServerDAO) UpdateServerHTTPS(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改TCP配置
@@ -213,7 +238,8 @@ func (this *ServerDAO) UpdateServerTCP(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改TLS配置
@@ -231,7 +257,8 @@ func (this *ServerDAO) UpdateServerTLS(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改Unix配置
@@ -249,7 +276,8 @@ func (this *ServerDAO) UpdateServerUnix(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改UDP配置
@@ -267,7 +295,8 @@ func (this *ServerDAO) UpdateServerUDP(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改Web配置
@@ -282,7 +311,7 @@ func (this *ServerDAO) UpdateServerWeb(serverId int64, webId int64) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 初始化Web配置
@@ -304,6 +333,11 @@ func (this *ServerDAO) InitServerWeb(serverId int64) (int64, error) {
return 0, err
}
err = this.createEvent()
if err != nil {
return webId, err
}
return webId, nil
}
@@ -322,7 +356,8 @@ func (this *ServerDAO) UpdateServerNames(serverId int64, config []byte) error {
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 修改反向代理配置
@@ -337,7 +372,8 @@ func (this *ServerDAO) UpdateServerReverseProxy(serverId int64, config []byte) e
if err != nil {
return err
}
return this.RenewServerConfig(serverId)
return this.createEvent()
}
// 计算所有可用服务数量
@@ -379,6 +415,36 @@ func (this *ServerDAO) FindAllEnabledServersWithNode(nodeId int64) (result []*Se
return
}
// 获取所有的服务ID
func (this *ServerDAO) FindAllEnabledServerIds() (serverIds []int64, err error) {
ones, err := this.Query().
State(ServerStateEnabled).
AscPk().
ResultPk().
FindAll()
for _, one := range ones {
serverIds = append(serverIds, int64(one.(*Server).Id))
}
return
}
// 查找服务的搜索条件
func (this *ServerDAO) FindServerNodeFilters(serverId int64) (isOk bool, clusterId int64, err error) {
one, err := this.Query().
Pk(serverId).
Result("clusterId").
Find()
if err != nil {
return false, 0, err
}
if one == nil {
isOk = false
return
}
server := one.(*Server)
return true, int64(server.ClusterId), nil
}
// 构造服务的Config
func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.ServerConfig, error) {
server, err := this.FindEnabledServer(serverId)
@@ -396,12 +462,6 @@ func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.Serve
config.Name = server.Name
config.Description = server.Description
// Components
// TODO
// Filters
// TODO
// ServerNames
if len(server.ServerNames) > 0 && server.ServerNames != "null" {
serverNames := []*serverconfigs.ServerNameConfig{}
@@ -505,14 +565,14 @@ func (this *ServerDAO) ComposeServerConfig(serverId int64) (*serverconfigs.Serve
}
// 更新服务的Config配置
func (this *ServerDAO) RenewServerConfig(serverId int64) error {
func (this *ServerDAO) RenewServerConfig(serverId int64) (isChanged bool, err error) {
serverConfig, err := this.ComposeServerConfig(serverId)
if err != nil {
return err
return false, err
}
data, err := serverConfig.AsJSON()
data, err := json.Marshal(serverConfig)
if err != nil {
return err
return false, err
}
return this.UpdateServerConfig(serverId, data)
}
@@ -534,32 +594,6 @@ func (this *ServerDAO) FindReverseProxyRef(serverId int64) (*serverconfigs.Rever
return config, err
}
// 查找需要更新的Server
func (this *ServerDAO) FindUpdatingServerIds() (serverIds []int64, err error) {
ones, err := this.Query().
State(ServerStateEnabled).
Attr("isUpdating", true).
ResultPk().
FindAll()
if err != nil {
return nil, err
}
for _, one := range ones {
serverIds = append(serverIds, int64(one.(*Server).Id))
}
return
}
// 修改服务是否需要更新
func (this *ServerDAO) UpdateServerIsUpdating(serverId int64, isUpdating bool) error {
_, err := this.Query().
Pk(serverId).
Set("isUpdating", isUpdating).
Update()
return err
}
// 查找WebId
func (this *ServerDAO) FindServerWebId(serverId int64) (int64, error) {
webId, err := this.Query().
Pk(serverId).
@@ -571,28 +605,7 @@ func (this *ServerDAO) FindServerWebId(serverId int64) (int64, error) {
return int64(webId), nil
}
// 更新所有Web相关的处于更新状态
func (this *ServerDAO) UpdateServerIsUpdatingWithWebId(webId int64) error {
_, err := this.Query().
Attr("webId", webId).
Set("isUpdating", true).
Update()
return err
}
// 生成唯一ID
func (this *ServerDAO) genUniqueId() (string, error) {
for {
uniqueId := rands.HexString(32)
ok, err := this.Query().
Attr("uniqueId", uniqueId).
Exist()
if err != nil {
return "", err
}
if ok {
continue
}
return uniqueId, nil
}
// 创建事件
func (this *ServerDAO) createEvent() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
}

View File

@@ -1,6 +1,8 @@
package models
import (
"crypto/md5"
"encoding/json"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/logs"
"testing"
@@ -13,3 +15,30 @@ func TestServerDAO_ComposeServerConfig(t *testing.T) {
}
logs.PrintAsJSON(config, t)
}
func TestServerDAO_UpdateServerConfig(t *testing.T) {
config, err := SharedServerDAO.ComposeServerConfig(1)
if err != nil {
t.Fatal(err)
}
configJSON, err := json.Marshal(config)
if err != nil {
t.Fatal(err)
}
_, err = SharedServerDAO.UpdateServerConfig(1, configJSON)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestNewServerDAO_md5(t *testing.T) {
m := md5.New()
_, err := m.Write([]byte("123456"))
if err != nil {
t.Fatal(err)
}
h := m.Sum(nil)
t.Logf("%x", h)
}

View File

@@ -4,7 +4,6 @@ package models
type Server struct {
Id uint32 `field:"id"` // ID
IsOn uint8 `field:"isOn"` // 是否启用
UniqueId string `field:"uniqueId"` // 唯一ID
UserId uint32 `field:"userId"` // 用户ID
AdminId uint32 `field:"adminId"` // 管理员ID
Type string `field:"type"` // 服务类型
@@ -21,19 +20,18 @@ type Server struct {
ReverseProxy string `field:"reverseProxy"` // 反向代理配置
GroupIds string `field:"groupIds"` // 分组ID列表
Config string `field:"config"` // 服务配置,自动生成
ConfigMd5 string `field:"configMd5"` // Md5
ClusterId uint32 `field:"clusterId"` // 集群ID
IncludeNodes string `field:"includeNodes"` // 部署条件
ExcludeNodes string `field:"excludeNodes"` // 节点排除条件
Version uint32 `field:"version"` // 版本号
CreatedAt uint64 `field:"createdAt"` // 创建时间
IsUpdating uint8 `field:"isUpdating"` // 是否正在更新
State uint8 `field:"state"` // 状态
}
type ServerOperator struct {
Id interface{} // ID
IsOn interface{} // 是否启用
UniqueId interface{} // 唯一ID
UserId interface{} // 用户ID
AdminId interface{} // 管理员ID
Type interface{} // 服务类型
@@ -50,12 +48,12 @@ type ServerOperator struct {
ReverseProxy interface{} // 反向代理配置
GroupIds interface{} // 分组ID列表
Config interface{} // 服务配置,自动生成
ConfigMd5 interface{} // Md5
ClusterId interface{} // 集群ID
IncludeNodes interface{} // 部署条件
ExcludeNodes interface{} // 节点排除条件
Version interface{} // 版本号
CreatedAt interface{} // 创建时间
IsUpdating interface{} // 是否正在更新
State interface{} // 状态
}

View File

@@ -0,0 +1,61 @@
package models
import (
"encoding/json"
"errors"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
)
type SysEventDAO dbs.DAO
func NewSysEventDAO() *SysEventDAO {
return dbs.NewDAO(&SysEventDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeSysEvents",
Model: new(SysEvent),
PkName: "id",
},
}).(*SysEventDAO)
}
var SharedSysEventDAO = NewSysEventDAO()
// 创建事件
func (this *SysEventDAO) CreateEvent(event EventInterface) error {
if event == nil {
return errors.New("event should not be nil")
}
op := NewSysEventOperator()
op.Type = event.Type()
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
op.Params = eventJSON
_, err = this.Save(op)
return err
}
// 查找事件
func (this *SysEventDAO) FindEvents(size int64) (result []*SysEvent, err error) {
_, err = this.Query().
Asc().
Limit(size).
Slice(&result).
FindAll()
return
}
// 删除事件
func (this *SysEventDAO) DeleteEvent(eventId int64) error {
_, err := this.Query().
Pk(eventId).
Delete()
return err
}

View File

@@ -0,0 +1,17 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"testing"
)
func TestSysEvent_DecodeEvent(t *testing.T) {
event := &SysEvent{
Type: "serverChange",
}
eventObj, err := event.DecodeEvent()
if err != nil {
t.Fatal(err)
}
t.Log(eventObj)
}

View File

@@ -0,0 +1,20 @@
package models
// 系统事件
type SysEvent struct {
Id uint64 `field:"id"` // ID
Type string `field:"type"` // 类型
Params string `field:"params"` // 参数
CreatedAt uint64 `field:"createdAt"` // 创建时间
}
type SysEventOperator struct {
Id interface{} // ID
Type interface{} // 类型
Params interface{} // 参数
CreatedAt interface{} // 创建时间
}
func NewSysEventOperator() *SysEventOperator {
return &SysEventOperator{}
}

View File

@@ -0,0 +1,27 @@
package models
import (
"encoding/json"
"errors"
"reflect"
)
// 解码事件
func (this *SysEvent) DecodeEvent() (EventInterface, error) {
// 解析数据类型
t, isOk := eventTypeMapping[this.Type]
if !isOk {
return nil, errors.New("can not found event type '" + this.Type + "'")
}
ptr := reflect.New(t).Interface().(EventInterface)
// 解析参数
if IsNotNull(this.Params) {
err := json.Unmarshal([]byte(this.Params), ptr)
if err != nil {
return nil, err
}
}
return ptr, nil
}

View File

@@ -0,0 +1,64 @@
package models
import (
"reflect"
)
var eventTypeMapping = map[string]reflect.Type{} // eventType => reflect type
func init() {
for _, event := range []EventInterface{
NewServerChangeEvent(),
} {
eventTypeMapping[event.Type()] = reflect.ValueOf(event).Elem().Type()
}
}
// 接口
type EventInterface interface {
Type() string
Run() error
}
// 服务变化
type ServerChangeEvent struct {
}
func NewServerChangeEvent() *ServerChangeEvent {
return &ServerChangeEvent{}
}
func (this *ServerChangeEvent) Type() string {
return "serverChange"
}
func (this *ServerChangeEvent) Run() error {
serverIds, err := SharedServerDAO.FindAllEnabledServerIds()
if err != nil {
return err
}
for _, serverId := range serverIds {
isChanged, err := SharedServerDAO.RenewServerConfig(serverId)
if err != nil {
return err
}
if !isChanged {
continue
}
// 检查节点是否需要更新
isOk, clusterId, err := SharedServerDAO.FindServerNodeFilters(serverId)
if err != nil {
return err
}
if !isOk {
continue
}
err = SharedNodeDAO.UpdateAllNodesLatestVersionMatch(clusterId)
if err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,106 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/types"
"time"
)
type SysLockerDAO dbs.DAO
func NewSysLockerDAO() *SysLockerDAO {
return dbs.NewDAO(&SysLockerDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeSysLockers",
Model: new(SysLocker),
PkName: "id",
},
}).(*SysLockerDAO)
}
var SharedSysLockerDAO = NewSysLockerDAO()
// 开锁
func (this *SysLockerDAO) Lock(key string, timeout int64) (bool, error) {
maxErrors := 5
for {
one, err := this.Query().
Attr("key", key).
Find()
if err != nil {
maxErrors--
if maxErrors < 0 {
return false, err
}
continue
}
// 如果没有锁,则创建
if one == nil {
op := NewSysLockerOperator()
op.Key = key
op.TimeoutAt = time.Now().Unix() + timeout
op.Version = 1
_, err := this.Save(op)
if err != nil {
maxErrors--
if maxErrors < 0 {
return false, err
}
continue
}
return true, nil
}
// 如果已经有锁
locker := one.(*SysLocker)
if time.Now().Unix() <= int64(locker.TimeoutAt) {
return false, nil
}
// 修改
op := NewSysLockerOperator()
op.Id = locker.Id
op.Version = locker.Version + 1
op.TimeoutAt = time.Now().Unix() + timeout
_, err = this.Save(op)
if err != nil {
maxErrors--
if maxErrors < 0 {
return false, err
}
continue
}
// 再次查询版本
version, err := this.Query().
Attr("key", key).
Result("version").
FindCol("0")
if err != nil {
maxErrors--
if maxErrors < 0 {
return false, err
}
continue
}
if types.Int64(version) != int64(locker.Version)+1 {
return false, nil
}
return true, nil
}
}
// 解锁
func (this *SysLockerDAO) Unlock(key string) error {
_, err := this.Query().
Attr("key", key).
Set("timeoutAt", time.Now().Unix()-86400*365).
Update()
return err
}

View File

@@ -0,0 +1,21 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"testing"
)
func TestSysLockerDAO_Lock(t *testing.T) {
isOk, err := SharedSysLockerDAO.Lock("test", 600)
if err != nil {
t.Fatal(err)
}
t.Log(isOk)
if isOk {
err = SharedSysLockerDAO.Unlock("test")
if err != nil {
t.Fatal(err)
}
}
}

View File

@@ -0,0 +1,20 @@
package models
// 并发锁
type SysLocker struct {
Id uint64 `field:"id"` // ID
Key string `field:"key"` // 键值
Version uint64 `field:"version"` // 版本号
TimeoutAt uint64 `field:"timeoutAt"` // 超时时间
}
type SysLockerOperator struct {
Id interface{} // ID
Key interface{} // 键值
Version interface{} // 版本号
TimeoutAt interface{} // 超时时间
}
func NewSysLockerOperator() *SysLockerOperator {
return &SysLockerOperator{}
}

View File

@@ -0,0 +1 @@
package models

View File

@@ -0,0 +1,86 @@
package models
import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
)
type SysSettingDAO dbs.DAO
type SettingCode = string
const (
SettingCodeGlobalConfig SettingCode = "globalConfig"
)
func NewSysSettingDAO() *SysSettingDAO {
return dbs.NewDAO(&SysSettingDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeSysSettings",
Model: new(SysSetting),
PkName: "id",
},
}).(*SysSettingDAO)
}
var SharedSysSettingDAO = NewSysSettingDAO()
// 设置配置
func (this *SysSettingDAO) UpdateSetting(code string, valueJSON []byte, args ...interface{}) error {
if len(args) > 0 {
code = fmt.Sprintf(code, args...)
}
countRetries := 3
var lastErr error
for i := 0; i < countRetries; i++ {
settingId, err := this.Query().
Attr("code", code).
ResultPk().
FindInt64Col(0)
if err != nil {
return err
}
if settingId == 0 {
// 新建
op := NewSysSettingOperator()
op.Code = code
op.Value = valueJSON
_, err = this.Save(op)
if err != nil {
lastErr = err
// 因为错误的原因可能是因为code冲突所以这里我们继续执行
continue
}
return nil
}
// 修改
op := NewSysSettingOperator()
op.Id = settingId
op.Value = valueJSON
_, err = this.Save(op)
if err != nil {
return err
}
}
return lastErr
}
// 读取配置
func (this *SysSettingDAO) ReadSetting(code string, args ...interface{}) (valueJSON []byte, err error) {
if len(args) > 0 {
code = fmt.Sprintf(code, args...)
}
col, err := this.Query().
Attr("code", code).
Result("value").
FindStringCol("")
return []byte(col), err
}

View File

@@ -0,0 +1,32 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"testing"
)
func TestSysSettingDAO_UpdateSetting(t *testing.T) {
err := SharedSysSettingDAO.UpdateSetting("hello", []byte(`"world"`))
if err != nil {
t.Fatal(err)
}
value, err := SharedSysSettingDAO.ReadSetting("hello")
if err != nil {
t.Fatal(err)
}
t.Log("value:", string(value))
}
func TestSysSettingDAO_UpdateSetting_Args(t *testing.T) {
err := SharedSysSettingDAO.UpdateSetting("hello %d", []byte(`"world 123"`), 123)
if err != nil {
t.Fatal(err)
}
value, err := SharedSysSettingDAO.ReadSetting("hello %d", 123)
if err != nil {
t.Fatal(err)
}
t.Log("value:", string(value))
}

View File

@@ -0,0 +1,20 @@
package models
// 系统配置
type SysSetting struct {
Id uint32 `field:"id"` // ID
UserId uint32 `field:"userId"` // 用户ID
Code string `field:"code"` // 代号
Value string `field:"value"` // 配置值
}
type SysSettingOperator struct {
Id interface{} // ID
UserId interface{} // 用户ID
Code interface{} // 代号
Value interface{} // 配置值
}
func NewSysSettingOperator() *SysSettingOperator {
return &SysSettingOperator{}
}

View File

@@ -0,0 +1 @@
package models

View File

@@ -20,3 +20,17 @@ func NewTCPFirewallPolicyDAO() *TCPFirewallPolicyDAO {
}
var SharedTCPFirewallPolicyDAO = NewTCPFirewallPolicyDAO()
// 初始化
func (this *TCPFirewallPolicyDAO) Init() {
this.DAOObject.Init()
this.DAOObject.OnUpdate(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnInsert(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
this.DAOObject.OnDelete(func() error {
return SharedSysEventDAO.CreateEvent(NewServerChangeEvent())
})
}

View File

@@ -31,7 +31,7 @@ func (this *HTTPAccessLogPolicyService) FindAllEnabledHTTPAccessLogPolicies(ctx
IsOn: policy.IsOn == 1,
Type: policy.Name,
OptionsJSON: []byte(policy.Options),
CondsJSON: []byte(policy.Conds),
CondsJSON: []byte(policy.CondGroups),
})
}

View File

@@ -7,10 +7,9 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/installers"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
)
type NodeService struct {
@@ -62,7 +61,7 @@ func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb.
if err != nil {
return nil, err
}
count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(req.ClusterId, types.Int8(req.InstallState))
count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(req.ClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState))
if err != nil {
return nil, err
}
@@ -75,7 +74,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List
if err != nil {
return nil, err
}
nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(req.Offset, req.Size, req.ClusterId, types.Int8(req.InstallState))
nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(req.Offset, req.Size, req.ClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState))
if err != nil {
return nil, err
}
@@ -106,6 +105,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List
result = append(result, &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Version: int64(node.Version),
IsInstalled: node.IsInstalled == 1,
Status: node.Status,
Cluster: &pb.NodeCluster{
@@ -223,13 +223,15 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable
}
return &pb.FindEnabledNodeResponse{Node: &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Status: node.Status,
UniqueId: node.UniqueId,
Secret: node.Secret,
InstallDir: node.InstallDir,
IsInstalled: node.IsInstalled == 1,
Id: int64(node.Id),
Name: node.Name,
Status: node.Status,
UniqueId: node.UniqueId,
Version: int64(node.Version),
LatestVersion: int64(node.LatestVersion),
Secret: node.Secret,
InstallDir: node.InstallDir,
IsInstalled: node.IsInstalled == 1,
Cluster: &pb.NodeCluster{
Id: int64(node.ClusterId),
Name: clusterName,
@@ -241,50 +243,19 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable
// 组合节点配置
func (this *NodeService) ComposeNodeConfig(ctx context.Context, req *pb.ComposeNodeConfigRequest) (*pb.ComposeNodeConfigResponse, error) {
_ = req
// 校验节点
_, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
node, err := models.SharedNodeDAO.FindEnabledNode(nodeId)
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("node validate failed, please check 'nodeId' or 'secret'")
}
nodeMap := maps.Map{
"id": node.UniqueId,
"isOn": node.IsOn == 1,
"servers": []maps.Map{},
"version": node.Version,
}
// 获取所有的服务
servers, err := models.SharedServerDAO.FindAllEnabledServersWithNode(int64(node.Id))
nodeConfig, err := models.SharedNodeDAO.ComposeNodeConfig(nodeId)
if err != nil {
return nil, err
}
serverMaps := []maps.Map{}
for _, server := range servers {
if len(server.Config) == 0 {
continue
}
configMap := maps.Map{}
err = json.Unmarshal([]byte(server.Config), &configMap)
if err != nil {
return nil, err
}
configMap["id"] = server.UniqueId
configMap["version"] = server.Version
serverMaps = append(serverMaps, configMap)
}
nodeMap["servers"] = serverMaps
data, err := json.Marshal(nodeMap)
data, err := json.Marshal(nodeConfig)
if err != nil {
return nil, err
}
@@ -294,6 +265,7 @@ func (this *NodeService) ComposeNodeConfig(ctx context.Context, req *pb.ComposeN
// 节点stream
func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) error {
// TODO 使用此stream快速通知边缘节点更新
// 校验节点
_, nodeId, err := rpcutils.ValidateRequest(server.Context(), rpcutils.UserTypeNode)
if err != nil {
@@ -301,6 +273,8 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
}
logs.Println("nodeId:", nodeId)
_ = server.Send(&pb.NodeStreamResponse{})
for {
req, err := server.Recv()
if err != nil {

View File

@@ -101,12 +101,6 @@ func (this *ServerService) UpdateServerHTTP(ctx context.Context, req *pb.UpdateS
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -136,12 +130,6 @@ func (this *ServerService) UpdateServerHTTPS(ctx context.Context, req *pb.Update
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -171,12 +159,6 @@ func (this *ServerService) UpdateServerTCP(ctx context.Context, req *pb.UpdateSe
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -206,12 +188,6 @@ func (this *ServerService) UpdateServerTLS(ctx context.Context, req *pb.UpdateSe
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -241,12 +217,6 @@ func (this *ServerService) UpdateServerUnix(ctx context.Context, req *pb.UpdateS
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -276,12 +246,6 @@ func (this *ServerService) UpdateServerUDP(ctx context.Context, req *pb.UpdateSe
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -311,12 +275,6 @@ func (this *ServerService) UpdateServerWeb(ctx context.Context, req *pb.UpdateSe
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -346,12 +304,6 @@ func (this *ServerService) UpdateServerReverseProxy(ctx context.Context, req *pb
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}
@@ -381,12 +333,6 @@ func (this *ServerService) UpdateServerNames(ctx context.Context, req *pb.Update
return nil, err
}
// 更新新的节点版本
err = models.SharedNodeDAO.UpdateAllNodesLatestVersionMatch(int64(server.ClusterId))
if err != nil {
return nil, err
}
return &pb.RPCUpdateSuccess{}, nil
}

View File

@@ -0,0 +1,69 @@
package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
looper := NewEventLooper()
go looper.Start()
}
type EventLooper struct {
}
func NewEventLooper() *EventLooper {
return &EventLooper{}
}
func (this *EventLooper) Start() {
ticker := time.NewTicker(2 * time.Second)
for range ticker.C {
err := this.loop()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
}
}
}
func (this *EventLooper) loop() error {
lockerKey := "eventLooper"
isOk, err := models.SharedSysLockerDAO.Lock(lockerKey, 3600)
if err != nil {
return err
}
defer func() {
err = models.SharedSysLockerDAO.Unlock(lockerKey)
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
}
}()
if !isOk {
return nil
}
events, err := models.SharedSysEventDAO.FindEvents(100)
if err != nil {
return err
}
for _, eventOne := range events {
event, err := eventOne.DecodeEvent()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
continue
}
err = event.Run()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
continue
}
err = models.SharedSysEventDAO.DeleteEvent(int64(eventOne.Id))
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,70 +0,0 @@
package tasks
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/iwind/TeaGo/logs"
"time"
)
// TODO 考虑多个API服务同时运行的冲突
func init() {
task := &ServerUpdateTask{}
go task.Run()
}
// 更新服务配置
type ServerUpdateTask struct {
}
func (this *ServerUpdateTask) Run() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
this.loop()
}
}
func (this *ServerUpdateTask) loop() {
serverIds, err := models.SharedServerDAO.FindUpdatingServerIds()
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
return
}
if len(serverIds) == 0 {
return
}
for _, serverId := range serverIds {
// 查找配置
config, err := models.SharedServerDAO.ComposeServerConfig(serverId)
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
continue
}
if config == nil {
err = models.SharedServerDAO.UpdateServerIsUpdating(serverId, false)
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
continue
}
}
configData, err := json.Marshal(config)
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
continue
}
// 修改配置
err = models.SharedServerDAO.UpdateServerConfig(serverId, configData)
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
continue
}
// 修改更新状态
err = models.SharedServerDAO.UpdateServerIsUpdating(serverId, false)
if err != nil {
logs.Println("[ServerUpdateTask]" + err.Error())
continue
}
}
}