Files
EdgeAPI/internal/nodes/api_node.go

235 lines
8.1 KiB
Go
Raw Normal View History

2020-10-04 14:27:14 +08:00
package nodes
import (
"crypto/tls"
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
"github.com/TeaOSLab/EdgeAPI/internal/setup"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2020-10-13 20:05:13 +08:00
"github.com/iwind/TeaGo/dbs"
2020-10-04 14:27:14 +08:00
"github.com/iwind/TeaGo/logs"
stringutil "github.com/iwind/TeaGo/utils/string"
2020-10-04 14:27:14 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"net"
"os"
"strconv"
)
var sharedAPIConfig *configs.APIConfig = nil
type APINode struct {
}
func NewAPINode() *APINode {
return &APINode{}
}
func (this *APINode) Start() {
logs.Println("[API_NODE]start api node, pid: " + strconv.Itoa(os.Getpid()))
// 自动升级
err := this.autoUpgrade()
if err != nil {
logs.Println("[API_NODE]auto upgrade failed: " + err.Error())
return
}
2020-10-04 14:27:14 +08:00
2020-10-13 20:05:13 +08:00
// 数据库通知启动
dbs.NotifyReady()
2020-10-04 14:27:14 +08:00
// 读取配置
config, err := configs.SharedAPIConfig()
if err != nil {
logs.Println("[API_NODE]start failed: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
sharedAPIConfig = config
// 校验
apiNode, err := models.SharedAPINodeDAO.FindEnabledAPINodeWithUniqueIdAndSecret(config.NodeId, config.Secret)
if err != nil {
logs.Println("[API_NODE]start failed: read api node from database failed: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
if apiNode == nil {
logs.Println("[API_NODE]can not start node, wrong 'nodeId' or 'secret'")
2020-10-04 14:27:14 +08:00
return
}
config.SetNumberId(int64(apiNode.Id))
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
// 监听RPC服务
logs.Println("[API_NODE]starting rpc ...")
2020-10-04 14:27:14 +08:00
// HTTP
httpConfig, err := apiNode.DecodeHTTP()
if err != nil {
logs.Println("[API_NODE]decode http config: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
isListening := false
if httpConfig != nil && httpConfig.IsOn && len(httpConfig.Listen) > 0 {
for _, listen := range httpConfig.Listen {
for _, addr := range listen.Addresses() {
listener, err := net.Listen("tcp", addr)
if err != nil {
logs.Println("[API_NODE]listening '" + addr + "' failed: " + err.Error())
2020-10-04 14:27:14 +08:00
continue
}
go func() {
err := this.listenRPC(listener, nil)
if err != nil {
logs.Println("[API_NODE]listening '" + addr + "' rpc: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
}()
isListening = true
}
}
}
// HTTPS
httpsConfig, err := apiNode.DecodeHTTPS()
if err != nil {
logs.Println("[API_NODE]decode https config: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
if httpsConfig != nil &&
httpsConfig.IsOn &&
len(httpsConfig.Listen) > 0 &&
httpsConfig.SSLPolicy != nil &&
httpsConfig.SSLPolicy.IsOn &&
len(httpsConfig.SSLPolicy.Certs) > 0 {
certs := []tls.Certificate{}
for _, cert := range httpsConfig.SSLPolicy.Certs {
certs = append(certs, *cert.CertObject())
}
for _, listen := range httpsConfig.Listen {
for _, addr := range listen.Addresses() {
listener, err := net.Listen("tcp", addr)
if err != nil {
logs.Println("[API_NODE]listening '" + addr + "' failed: " + err.Error())
2020-10-04 14:27:14 +08:00
continue
}
go func() {
err := this.listenRPC(listener, &tls.Config{
Certificates: certs,
})
if err != nil {
logs.Println("[API_NODE]listening '" + addr + "' rpc: " + err.Error())
2020-10-04 14:27:14 +08:00
return
}
}()
isListening = true
}
}
}
if !isListening {
logs.Println("[API_NODE]the api node does have a listening address")
2020-10-04 14:27:14 +08:00
return
}
// 保持进程
select {}
}
// 启动RPC监听
func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) error {
var rpcServer *grpc.Server
if tlsConfig == nil {
logs.Println("[API_NODE]listening http://" + listener.Addr().String() + " ...")
2020-10-04 14:27:14 +08:00
rpcServer = grpc.NewServer()
} else {
logs.Println("[API_NODE]listening https://" + listener.Addr().String() + " ...")
2020-10-04 14:27:14 +08:00
rpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
}
pb.RegisterAdminServiceServer(rpcServer, &services.AdminService{})
pb.RegisterNodeGrantServiceServer(rpcServer, &services.NodeGrantService{})
pb.RegisterServerServiceServer(rpcServer, &services.ServerService{})
pb.RegisterNodeServiceServer(rpcServer, &services.NodeService{})
pb.RegisterNodeClusterServiceServer(rpcServer, &services.NodeClusterService{})
pb.RegisterNodeIPAddressServiceServer(rpcServer, &services.NodeIPAddressService{})
pb.RegisterAPINodeServiceServer(rpcServer, &services.APINodeService{})
pb.RegisterOriginServiceServer(rpcServer, &services.OriginService{})
pb.RegisterHTTPWebServiceServer(rpcServer, &services.HTTPWebService{})
pb.RegisterReverseProxyServiceServer(rpcServer, &services.ReverseProxyService{})
pb.RegisterHTTPGzipServiceServer(rpcServer, &services.HTTPGzipService{})
pb.RegisterHTTPHeaderPolicyServiceServer(rpcServer, &services.HTTPHeaderPolicyService{})
pb.RegisterHTTPHeaderServiceServer(rpcServer, &services.HTTPHeaderService{})
pb.RegisterHTTPPageServiceServer(rpcServer, &services.HTTPPageService{})
pb.RegisterHTTPAccessLogPolicyServiceServer(rpcServer, &services.HTTPAccessLogPolicyService{})
pb.RegisterHTTPCachePolicyServiceServer(rpcServer, &services.HTTPCachePolicyService{})
pb.RegisterHTTPFirewallPolicyServiceServer(rpcServer, &services.HTTPFirewallPolicyService{})
pb.RegisterHTTPLocationServiceServer(rpcServer, &services.HTTPLocationService{})
pb.RegisterHTTPWebsocketServiceServer(rpcServer, &services.HTTPWebsocketService{})
pb.RegisterHTTPRewriteRuleServiceServer(rpcServer, &services.HTTPRewriteRuleService{})
pb.RegisterSSLCertServiceServer(rpcServer, &services.SSLCertService{})
pb.RegisterSSLPolicyServiceServer(rpcServer, &services.SSLPolicyService{})
pb.RegisterSysSettingServiceServer(rpcServer, &services.SysSettingService{})
2020-10-07 11:18:12 +08:00
pb.RegisterHTTPFirewallRuleGroupServiceServer(rpcServer, &services.HTTPFirewallRuleGroupService{})
2020-10-08 11:11:49 +08:00
pb.RegisterHTTPFirewallRuleSetServiceServer(rpcServer, &services.HTTPFirewallRuleSetService{})
2020-10-08 17:55:10 +08:00
pb.RegisterDBNodeServiceServer(rpcServer, &services.DBNodeService{})
2020-10-09 11:06:37 +08:00
pb.RegisterNodeLogServiceServer(rpcServer, &services.NodeLogService{})
2020-10-10 11:49:21 +08:00
pb.RegisterHTTPAccessLogServiceServer(rpcServer, &services.HTTPAccessLogService{})
2020-10-20 20:18:06 +08:00
pb.RegisterMessageServiceServer(rpcServer, &services.MessageService{})
2020-10-28 18:21:15 +08:00
pb.RegisterNodeGroupServiceServer(rpcServer, &services.NodeGroupService{})
pb.RegisterServerGroupServiceServer(rpcServer, &services.ServerGroupService{})
2020-11-04 15:51:32 +08:00
pb.RegisterIPLibraryServiceServer(rpcServer, &services.IPLibraryService{})
pb.RegisterFileChunkServiceServer(rpcServer, &services.FileChunkService{})
pb.RegisterFileServiceServer(rpcServer, &services.FileService{})
2020-11-06 11:02:53 +08:00
pb.RegisterRegionCountryServiceServer(rpcServer, &services.RegionCountryService{})
pb.RegisterRegionProvinceServiceServer(rpcServer, &services.RegionProvinceService{})
2020-11-07 19:40:24 +08:00
pb.RegisterIPListServiceServer(rpcServer, &services.IPListService{})
pb.RegisterIPItemServiceServer(rpcServer, &services.IPItemService{})
2020-11-10 20:30:55 +08:00
pb.RegisterLogServiceServer(rpcServer, &services.LogService{})
2020-11-11 21:32:25 +08:00
pb.RegisterDNSProviderServiceServer(rpcServer, &services.DNSProviderService{})
2020-11-12 14:41:28 +08:00
pb.RegisterDNSDomainServiceServer(rpcServer, &services.DNSDomainService{})
2020-11-14 21:28:07 +08:00
pb.RegisterDNSServiceServer(rpcServer, &services.DNSService{})
2020-10-04 14:27:14 +08:00
err := rpcServer.Serve(listener)
if err != nil {
return errors.New("[API_NODE]start rpc failed: " + err.Error())
}
return nil
}
// 自动升级
func (this *APINode) autoUpgrade() error {
db, err := dbs.Default()
if err != nil {
return errors.New("load database config failed: " + err.Error())
}
one, err := db.FindOne("SELECT version FROM edgeVersions LIMIT 1")
if err != nil {
return errors.New("query version failed: " + err.Error())
}
if one != nil {
// 如果是同样的版本,则直接认为是最新版本
version := one.GetString("version")
if stringutil.VersionCompare(version, teaconst.Version) >= 0 {
return nil
}
2020-10-04 14:27:14 +08:00
}
dbConfig, err := db.Config()
if err != nil {
return errors.New("read db config failed: " + err.Error())
}
logs.Println("[API_NODE]upgrade database starting ...")
err = setup.NewSQLExecutor(dbConfig).Run()
if err != nil {
return errors.New("execute sql failed: " + err.Error())
}
logs.Println("[API_NODE]upgrade database done")
2020-10-04 14:27:14 +08:00
return nil
}