2020-10-04 14:27:14 +08:00
package nodes
import (
"crypto/tls"
"errors"
2021-04-15 11:16:58 +08:00
"fmt"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/configs"
2020-11-17 10:26:31 +08:00
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2021-01-12 11:49:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/events"
2020-12-30 22:01:01 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2020-11-17 10:26:31 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/setup"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2020-11-17 15:42:42 +08:00
"github.com/go-yaml/yaml"
"github.com/iwind/TeaGo/Tea"
2020-10-13 20:05:13 +08:00
"github.com/iwind/TeaGo/dbs"
2021-01-12 11:49:14 +08:00
"github.com/iwind/TeaGo/lists"
2020-10-04 14:27:14 +08:00
"github.com/iwind/TeaGo/logs"
2021-07-25 17:46:47 +08:00
"github.com/iwind/TeaGo/maps"
2021-03-25 11:48:45 +08:00
"github.com/iwind/TeaGo/types"
2020-11-17 10:26:31 +08:00
stringutil "github.com/iwind/TeaGo/utils/string"
2021-07-25 17:46:47 +08:00
"github.com/iwind/gosock/pkg/gosock"
2020-10-04 14:27:14 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
2020-11-17 15:42:42 +08:00
"io/ioutil"
2021-01-12 11:49:14 +08:00
"log"
2020-10-04 14:27:14 +08:00
"net"
"os"
2021-01-12 11:49:14 +08:00
"os/exec"
2021-03-25 11:48:45 +08:00
"regexp"
2020-10-04 14:27:14 +08:00
"strconv"
2021-06-20 19:22:24 +08:00
"sync"
2021-01-12 11:49:14 +08:00
"time"
2020-10-04 14:27:14 +08:00
)
var sharedAPIConfig * configs . APIConfig = nil
type APINode struct {
2021-06-20 19:22:24 +08:00
serviceInstanceMap map [ string ] interface { }
serviceInstanceLocker sync . Mutex
2021-07-25 17:46:47 +08:00
sock * gosock . Sock
2020-10-04 14:27:14 +08:00
}
func NewAPINode ( ) * APINode {
2021-06-20 19:22:24 +08:00
return & APINode {
serviceInstanceMap : map [ string ] interface { } { } ,
2021-07-25 17:46:47 +08:00
sock : gosock . NewTmpSock ( teaconst . ProcessName ) ,
2021-06-20 19:22:24 +08:00
}
2020-10-04 14:27:14 +08:00
}
func ( this * APINode ) Start ( ) {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start api node, pid: " + strconv . Itoa ( os . Getpid ( ) ) )
2021-04-15 11:16:58 +08:00
// 检查数据库连接
err := this . checkDB ( )
if err != nil {
logs . Println ( "[API_NODE]" + err . Error ( ) )
return
}
2021-01-12 11:49:14 +08:00
// 本地Sock
2021-04-15 11:16:58 +08:00
err = this . listenSock ( )
2021-01-12 11:49:14 +08:00
if err != nil {
logs . Println ( "[API_NODE]" + err . Error ( ) )
return
}
2020-11-17 10:26:31 +08:00
// 自动升级
2021-01-12 11:49:14 +08:00
err = this . autoUpgrade ( )
2020-11-17 10:26:31 +08:00
if err != nil {
logs . Println ( "[API_NODE]auto upgrade failed: " + err . Error ( ) )
return
}
2020-10-04 14:27:14 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
err = this . setupDB ( )
if err != nil {
logs . Println ( "[API_NODE]setup database '" + err . Error ( ) + "'" )
// 不阻断执行
}
2020-10-13 20:05:13 +08:00
// 数据库通知启动
dbs . NotifyReady ( )
2020-10-04 14:27:14 +08:00
// 读取配置
config , err := configs . SharedAPIConfig ( )
if err != nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start failed: " + err . Error ( ) )
2020-10-04 14:27:14 +08:00
return
}
sharedAPIConfig = config
// 校验
2021-01-01 23:31:30 +08:00
apiNode , err := models . SharedAPINodeDAO . FindEnabledAPINodeWithUniqueIdAndSecret ( nil , config . NodeId , config . Secret )
2020-10-04 14:27:14 +08:00
if err != nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start failed: read api node from database failed: " + err . Error ( ) )
2020-10-04 14:27:14 +08:00
return
}
if apiNode == nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]can not start node, wrong 'nodeId' or 'secret'" )
2020-10-04 14:27:14 +08:00
return
}
config . SetNumberId ( int64 ( apiNode . Id ) )
// 设置rlimit
_ = utils . SetRLimit ( 1024 * 1024 )
2020-12-29 18:28:07 +08:00
// 状态变更计时器
go NewNodeStatusExecutor ( ) . Listen ( )
2020-10-04 14:27:14 +08:00
// 监听RPC服务
2020-12-30 22:01:01 +08:00
remotelogs . Println ( "API_NODE" , "starting RPC server ..." )
2020-10-04 14:27:14 +08:00
2021-01-01 20:49:09 +08:00
isListening := this . listenPorts ( apiNode )
2020-12-29 18:28:07 +08:00
2020-10-04 14:27:14 +08:00
if ! isListening {
2020-12-30 22:01:01 +08:00
remotelogs . Error ( "API_NODE" , "the api node require at least one listening address" )
2020-10-04 14:27:14 +08:00
return
}
// 保持进程
select { }
}
2021-04-15 11:16:58 +08:00
// Daemon 实现守护进程
2021-01-12 11:49:14 +08:00
func ( this * APINode ) Daemon ( ) {
path := os . TempDir ( ) + "/edge-api.sock"
isDebug := lists . ContainsString ( os . Args , "debug" )
isDebug = true
for {
conn , err := net . DialTimeout ( "unix" , path , 1 * time . Second )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]starting ..." )
}
// 尝试启动
err = func ( ) error {
exe , err := os . Executable ( )
if err != nil {
return err
}
cmd := exec . Command ( exe )
err = cmd . Start ( )
if err != nil {
return err
}
err = cmd . Wait ( )
if err != nil {
return err
}
return nil
} ( )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]" , err )
}
time . Sleep ( 1 * time . Second )
} else {
time . Sleep ( 5 * time . Second )
}
} else {
_ = conn . Close ( )
time . Sleep ( 5 * time . Second )
}
}
}
2021-04-15 11:16:58 +08:00
// InstallSystemService 安装系统服务
2021-01-12 11:49:14 +08:00
func ( this * APINode ) InstallSystemService ( ) error {
shortName := teaconst . SystemdServiceName
exe , err := os . Executable ( )
if err != nil {
return err
}
manager := utils . NewServiceManager ( shortName , teaconst . ProductName )
err = manager . Install ( exe , [ ] string { } )
if err != nil {
return err
}
return nil
}
2020-10-04 14:27:14 +08:00
// 启动RPC监听
func ( this * APINode ) listenRPC ( listener net . Listener , tlsConfig * tls . Config ) error {
var rpcServer * grpc . Server
if tlsConfig == nil {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening GRPC http://" + listener . Addr ( ) . String ( ) + " ..." )
2020-10-04 14:27:14 +08:00
rpcServer = grpc . NewServer ( )
} else {
2021-01-01 20:49:09 +08:00
logs . Println ( "[API_NODE]listening GRPC https://" + listener . Addr ( ) . String ( ) + " ..." )
2020-10-04 14:27:14 +08:00
rpcServer = grpc . NewServer ( grpc . Creds ( credentials . NewTLS ( tlsConfig ) ) )
}
2021-05-25 15:49:13 +08:00
this . registerServices ( rpcServer )
2020-10-04 14:27:14 +08:00
err := rpcServer . Serve ( listener )
if err != nil {
2020-11-17 10:26:31 +08:00
return errors . New ( "[API_NODE]start rpc failed: " + err . Error ( ) )
}
return nil
}
2021-04-15 11:16:58 +08:00
// 检查数据库
func ( this * APINode ) checkDB ( ) error {
2021-04-15 11:18:40 +08:00
logs . Println ( "[API_NODE]checking database connection ..." )
2021-04-15 11:16:58 +08:00
db , err := dbs . Default ( )
if err != nil {
return err
}
maxTries := 600
for i := 0 ; i <= maxTries ; i ++ {
_ , err := db . Exec ( "SELECT 1" )
if err != nil {
if i == maxTries - 1 {
return err
} else {
if i % 10 == 0 { // 这让提示不会太多
2021-04-29 16:48:09 +08:00
logs . Println ( "[API_NODE]reconnecting to database (" + fmt . Sprintf ( "%.1f" , float32 ( i * 100 ) / float32 ( maxTries + 1 ) ) + "%) ..." )
2021-04-15 11:16:58 +08:00
}
time . Sleep ( 1 * time . Second )
}
} else {
2021-04-15 11:18:40 +08:00
logs . Println ( "[API_NODE]database connected" )
2021-04-15 11:16:58 +08:00
return nil
}
}
return nil
}
2020-11-17 10:26:31 +08:00
// 自动升级
func ( this * APINode ) autoUpgrade ( ) error {
2020-11-17 15:42:42 +08:00
if Tea . IsTesting ( ) {
return nil
}
// 执行SQL
config := & dbs . Config { }
configData , err := ioutil . ReadFile ( Tea . ConfigFile ( "db.yaml" ) )
2020-11-17 10:26:31 +08:00
if err != nil {
2020-11-17 15:42:42 +08:00
return errors . New ( "read database config file failed: " + err . Error ( ) )
}
err = yaml . Unmarshal ( configData , config )
if err != nil {
return errors . New ( "decode database config failed: " + err . Error ( ) )
}
dbConfig := config . DBs [ Tea . Env ]
db , err := dbs . NewInstanceFromConfig ( dbConfig )
if err != nil {
return errors . New ( "load database failed: " + err . Error ( ) )
2020-11-17 10:26:31 +08:00
}
one , err := db . FindOne ( "SELECT version FROM edgeVersions LIMIT 1" )
if err != nil {
return errors . New ( "query version failed: " + err . Error ( ) )
}
if one != nil {
// 如果是同样的版本,则直接认为是最新版本
version := one . GetString ( "version" )
if stringutil . VersionCompare ( version , teaconst . Version ) >= 0 {
return nil
}
2020-10-04 14:27:14 +08:00
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog(),因为此时还没有启动完成
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database starting ..." )
err = setup . NewSQLExecutor ( dbConfig ) . Run ( )
if err != nil {
return errors . New ( "execute sql failed: " + err . Error ( ) )
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database done" )
2020-10-04 14:27:14 +08:00
return nil
}
2021-01-01 20:49:09 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
func ( this * APINode ) setupDB ( ) error {
db , err := dbs . Default ( )
if err != nil {
return err
}
// 调整预处理语句数量
{
result , err := db . FindOne ( "SHOW VARIABLES WHERE variable_name='max_prepared_stmt_count'" )
if err != nil {
return err
}
value := result . GetString ( "Value" )
if regexp . MustCompile ( ` ^\d+$ ` ) . MatchString ( value ) {
valueInt := types . Int ( value )
if valueInt < 65535 {
_ , err := db . Exec ( "SET GLOBAL max_prepared_stmt_count=65535" )
if err != nil {
2021-07-21 09:01:37 +08:00
return errors . New ( "run 'SET GLOBAL max_prepared_stmt_count' on database failed: " + err . Error ( ) + ", \nyou can change the variable in 'my.cnf': \n~~~\n" + ` [ mysqld ]
max_prepared_stmt_count = 65535
~ ~ ~
then restart mysqld . ` )
2021-03-25 11:48:45 +08:00
}
}
}
}
return nil
}
2021-01-01 20:49:09 +08:00
// 启动端口
func ( this * APINode ) listenPorts ( apiNode * models . APINode ) ( isListening bool ) {
// HTTP
httpConfig , err := apiNode . DecodeHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode http config: " + err . Error ( ) )
return
}
isListening = false
if httpConfig != nil && httpConfig . IsOn && len ( httpConfig . Listen ) > 0 {
for _ , listen := range httpConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
go func ( ) {
err := this . listenRPC ( listener , nil )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
} ( )
isListening = true
}
}
}
// HTTPS
2021-01-01 23:31:30 +08:00
httpsConfig , err := apiNode . DecodeHTTPS ( nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode https config: " + err . Error ( ) )
return
}
if httpsConfig != nil &&
httpsConfig . IsOn &&
len ( httpsConfig . Listen ) > 0 &&
httpsConfig . SSLPolicy != nil &&
httpsConfig . SSLPolicy . IsOn &&
len ( httpsConfig . SSLPolicy . Certs ) > 0 {
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
for _ , listen := range httpsConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
go func ( ) {
err := this . listenRPC ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
} ( )
isListening = true
}
}
}
// Rest HTTP
restHTTPConfig , err := apiNode . DecodeRestHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST http config: " + err . Error ( ) )
return
}
if restHTTPConfig != nil && restHTTPConfig . IsOn && len ( restHTTPConfig . Listen ) > 0 {
for _ , listen := range restHTTPConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
continue
}
go func ( ) {
remotelogs . Println ( "API_NODE" , "listening REST http://" + addr + " ..." )
server := & RestServer { }
err := server . Listen ( listener )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
return
}
} ( )
isListening = true
}
}
}
// Rest HTTPS
2021-01-01 23:31:30 +08:00
restHTTPSConfig , err := apiNode . DecodeRestHTTPS ( nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST https config: " + err . Error ( ) )
return
}
if restHTTPSConfig != nil &&
restHTTPSConfig . IsOn &&
len ( restHTTPSConfig . Listen ) > 0 &&
restHTTPSConfig . SSLPolicy != nil &&
restHTTPSConfig . SSLPolicy . IsOn &&
len ( restHTTPSConfig . SSLPolicy . Certs ) > 0 {
for _ , listen := range restHTTPSConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
continue
}
go func ( ) {
remotelogs . Println ( "API_NODE" , "listening REST https://" + addr + " ..." )
server := & RestServer { }
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
err := server . ListenHTTPS ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
return
}
} ( )
isListening = true
}
}
}
return
}
2021-01-12 11:49:14 +08:00
// 监听本地sock
func ( this * APINode ) listenSock ( ) error {
2021-07-25 17:46:47 +08:00
// 检查是否在运行
if this . sock . IsListening ( ) {
reply , err := this . sock . Send ( & gosock . Command { Code : "pid" } )
if err == nil {
return errors . New ( "error: the process is already running, pid: " + maps . NewMap ( reply . Params ) . GetString ( "pid" ) )
2021-01-12 11:49:14 +08:00
} else {
2021-07-25 17:46:47 +08:00
return errors . New ( "error: the process is already running" )
2021-01-12 11:49:14 +08:00
}
}
2021-07-25 17:46:47 +08:00
// 启动监听
2021-01-12 11:49:14 +08:00
go func ( ) {
2021-07-25 17:46:47 +08:00
this . sock . OnCommand ( func ( cmd * gosock . Command ) {
switch cmd . Code {
case "pid" :
_ = cmd . Reply ( & gosock . Command {
Code : "pid" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
} ,
} )
case "stop" :
_ = cmd . ReplyOk ( )
// 退出主进程
events . Notify ( events . EventQuit )
os . Exit ( 0 )
2021-01-12 11:49:14 +08:00
}
2021-07-25 17:46:47 +08:00
} )
err := this . sock . Listen ( )
if err != nil {
logs . Println ( "NODE" , err . Error ( ) )
2021-01-12 11:49:14 +08:00
}
} ( )
2021-07-25 17:46:47 +08:00
events . On ( events . EventQuit , func ( ) {
logs . Println ( "NODE" , "quit unix sock" )
_ = this . sock . Close ( )
} )
2021-01-12 11:49:14 +08:00
return nil
}