支持设置单节点最大线程数、单节点TCP最大连接数

This commit is contained in:
刘祥超
2021-12-09 18:49:51 +08:00
parent 63316b6b23
commit 46dc74195b
4 changed files with 113 additions and 86 deletions

View File

@@ -178,7 +178,7 @@ func (this *NodeClusterDAO) CreateCluster(tx *dbs.Tx, adminId int64, name string
} }
// UpdateCluster 修改集群 // UpdateCluster 修改集群
func (this *NodeClusterDAO) UpdateCluster(tx *dbs.Tx, clusterId int64, name string, grantId int64, installDir string, timezone string) error { func (this *NodeClusterDAO) UpdateCluster(tx *dbs.Tx, clusterId int64, name string, grantId int64, installDir string, timezone string, nodeMaxThreads int32, nodeTCPMaxConnections int32) error {
if clusterId <= 0 { if clusterId <= 0 {
return errors.New("invalid clusterId") return errors.New("invalid clusterId")
} }
@@ -188,6 +188,17 @@ func (this *NodeClusterDAO) UpdateCluster(tx *dbs.Tx, clusterId int64, name stri
op.GrantId = grantId op.GrantId = grantId
op.InstallDir = installDir op.InstallDir = installDir
op.TimeZone = timezone op.TimeZone = timezone
if nodeMaxThreads < 0 {
nodeMaxThreads = 0
}
op.NodeMaxThreads = nodeMaxThreads
if nodeTCPMaxConnections < 0 {
nodeTCPMaxConnections = 0
}
op.NodeTCPMaxConnections = nodeTCPMaxConnections
err := this.Save(tx, op) err := this.Save(tx, op)
if err != nil { if err != nil {
return err return err
@@ -864,27 +875,27 @@ func (this *NodeClusterDAO) ExistsEnabledCluster(tx *dbs.Tx, clusterId int64) (b
Exist() Exist()
} }
// FindClusterTimezone 查找时区 // FindClusterBasicInfo 查找集群基础信息
func (this *NodeClusterDAO) FindClusterTimezone(tx *dbs.Tx, clusterId int64, cacheMap *utils.CacheMap) (string, error) { func (this *NodeClusterDAO) FindClusterBasicInfo(tx *dbs.Tx, clusterId int64, cacheMap *utils.CacheMap) (*NodeCluster, error) {
var cacheKey = this.Table + ":FindEnabledTimeZone:" + types.String(clusterId) var cacheKey = this.Table + ":FindClusterBasicInfo:" + types.String(clusterId)
if cacheMap != nil { if cacheMap != nil {
cache, ok := cacheMap.Get(cacheKey) cache, ok := cacheMap.Get(cacheKey)
if ok { if ok {
return cache.(string), nil return cache.(*NodeCluster), nil
} }
} }
timeZone, err := this.Query(tx). cluster, err := this.Query(tx).
Pk(clusterId). Pk(clusterId).
Result("timeZone"). Result("timeZone", "nodeMaxThreads", "nodeTCPMaxConnections", "cachePolicyId", "httpFirewallPolicyId").
FindStringCol("") Find()
if err != nil { if err != nil || cluster == nil {
return "", err return nil, err
} }
if cacheMap != nil { if cacheMap != nil {
cacheMap.Put(cacheKey, timeZone) cacheMap.Put(cacheKey, cluster)
} }
return timeZone, nil return cluster.(*NodeCluster), nil
} }
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新

View File

@@ -27,6 +27,8 @@ type NodeCluster struct {
AccessLog string `field:"accessLog"` // 访问日志设置 AccessLog string `field:"accessLog"` // 访问日志设置
SystemServices string `field:"systemServices"` // 系统服务设置 SystemServices string `field:"systemServices"` // 系统服务设置
TimeZone string `field:"timeZone"` // 时区 TimeZone string `field:"timeZone"` // 时区
NodeMaxThreads uint32 `field:"nodeMaxThreads"` // 节点最大线程数
NodeTCPMaxConnections uint32 `field:"nodeTCPMaxConnections"` // TCP最大连接数
} }
type NodeClusterOperator struct { type NodeClusterOperator struct {
@@ -55,6 +57,8 @@ type NodeClusterOperator struct {
AccessLog interface{} // 访问日志设置 AccessLog interface{} // 访问日志设置
SystemServices interface{} // 系统服务设置 SystemServices interface{} // 系统服务设置
TimeZone interface{} // 时区 TimeZone interface{} // 时区
NodeMaxThreads interface{} // 节点最大线程数
NodeTCPMaxConnections interface{} // TCP最大连接数
} }
func NewNodeClusterOperator() *NodeClusterOperator { func NewNodeClusterOperator() *NodeClusterOperator {

View File

@@ -782,11 +782,17 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
var primaryClusterId = int64(node.ClusterId) var primaryClusterId = int64(node.ClusterId)
var clusterIds = []int64{primaryClusterId} var clusterIds = []int64{primaryClusterId}
clusterIds = append(clusterIds, node.DecodeSecondaryClusterIds()...) clusterIds = append(clusterIds, node.DecodeSecondaryClusterIds()...)
var clusterIndex = 0
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
httpFirewallPolicyId, err := SharedNodeClusterDAO.FindClusterHTTPFirewallPolicyId(tx, clusterId, cacheMap) nodeCluster, err := SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, cacheMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nodeCluster == nil {
continue
}
var httpFirewallPolicyId = int64(nodeCluster.HttpFirewallPolicyId)
if httpFirewallPolicyId > 0 { if httpFirewallPolicyId > 0 {
firewallPolicy, err := SharedHTTPFirewallPolicyDAO.ComposeFirewallPolicy(tx, httpFirewallPolicyId, cacheMap) firewallPolicy, err := SharedHTTPFirewallPolicyDAO.ComposeFirewallPolicy(tx, httpFirewallPolicyId, cacheMap)
if err != nil { if err != nil {
@@ -798,10 +804,7 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
} }
// 缓存策略 // 缓存策略
httpCachePolicyId, err := SharedNodeClusterDAO.FindClusterHTTPCachePolicyId(tx, clusterId, cacheMap) var httpCachePolicyId = int64(nodeCluster.CachePolicyId)
if err != nil {
return nil, err
}
if httpCachePolicyId > 0 { if httpCachePolicyId > 0 {
cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap) cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap)
if err != nil { if err != nil {
@@ -813,15 +816,22 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
} }
// 时区 // 时区
timeZone, err := SharedNodeClusterDAO.FindClusterTimezone(tx, clusterId, cacheMap) if len(config.TimeZone) == 0 {
if err != nil { var timeZone = nodeCluster.TimeZone
return nil, err
}
if len(timeZone) > 0 { if len(timeZone) > 0 {
config.TimeZone = timeZone config.TimeZone = timeZone
} }
} }
// 最大线程数、TCP连接数
if clusterIndex == 0 {
config.MaxThreads = int(nodeCluster.NodeMaxThreads)
config.TCPMaxConnections = int(nodeCluster.NodeTCPMaxConnections)
}
clusterIndex++
}
// 缓存最大容量设置 // 缓存最大容量设置
if len(node.MaxCacheDiskCapacity) > 0 { if len(node.MaxCacheDiskCapacity) > 0 {
capacity := &shared.SizeCapacity{} capacity := &shared.SizeCapacity{}

View File

@@ -83,7 +83,7 @@ func (this *NodeClusterService) UpdateNodeCluster(ctx context.Context, req *pb.U
tx := this.NullTx() tx := this.NullTx()
err = models.SharedNodeClusterDAO.UpdateCluster(tx, req.NodeClusterId, req.Name, req.NodeGrantId, req.InstallDir, req.TimeZone) err = models.SharedNodeClusterDAO.UpdateCluster(tx, req.NodeClusterId, req.Name, req.NodeGrantId, req.InstallDir, req.TimeZone, req.NodeMaxThreads, req.NodeTCPMaxConnections)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -161,6 +161,8 @@ func (this *NodeClusterService) FindEnabledNodeCluster(ctx context.Context, req
DnsDomainId: int64(cluster.DnsDomainId), DnsDomainId: int64(cluster.DnsDomainId),
IsOn: cluster.IsOn == 1, IsOn: cluster.IsOn == 1,
TimeZone: cluster.TimeZone, TimeZone: cluster.TimeZone,
NodeMaxThreads: int32(cluster.NodeMaxThreads),
NodeTCPMaxConnections: int32(cluster.NodeTCPMaxConnections),
}}, nil }}, nil
} }