2020-10-04 14:27:14 +08:00
package nodes
import (
2022-01-19 16:53:52 +08:00
"context"
2020-10-04 14:27:14 +08:00
"crypto/tls"
2022-06-08 15:13:24 +08:00
"encoding/json"
2020-10-04 14:27:14 +08:00
"errors"
2021-04-15 11:16:58 +08:00
"fmt"
2021-07-29 16:50:59 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/configs"
2020-11-17 10:26:31 +08:00
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2021-01-12 11:49:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/events"
2021-12-14 10:49:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2020-12-30 22:01:01 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2022-01-19 16:53:52 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/rpc"
2020-11-17 10:26:31 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/setup"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2022-06-08 15:13:24 +08:00
"github.com/go-sql-driver/mysql"
2020-11-17 15:42:42 +08:00
"github.com/iwind/TeaGo/Tea"
2020-10-13 20:05:13 +08:00
"github.com/iwind/TeaGo/dbs"
2021-01-12 11:49:14 +08:00
"github.com/iwind/TeaGo/lists"
2020-10-04 14:27:14 +08:00
"github.com/iwind/TeaGo/logs"
2021-07-25 17:46:47 +08:00
"github.com/iwind/TeaGo/maps"
2021-03-25 11:48:45 +08:00
"github.com/iwind/TeaGo/types"
2020-11-17 10:26:31 +08:00
stringutil "github.com/iwind/TeaGo/utils/string"
2021-07-25 17:46:47 +08:00
"github.com/iwind/gosock/pkg/gosock"
2020-10-04 14:27:14 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
2022-03-04 12:31:44 +08:00
"gopkg.in/yaml.v3"
2020-11-17 15:42:42 +08:00
"io/ioutil"
2021-01-12 11:49:14 +08:00
"log"
2020-10-04 14:27:14 +08:00
"net"
"os"
2021-01-12 11:49:14 +08:00
"os/exec"
2021-03-25 11:48:45 +08:00
"regexp"
2021-12-14 10:49:29 +08:00
"runtime"
"sort"
2020-10-04 14:27:14 +08:00
"strconv"
2022-06-08 15:13:24 +08:00
"strings"
2021-06-20 19:22:24 +08:00
"sync"
2021-01-12 11:49:14 +08:00
"time"
2022-03-20 11:28:45 +08:00
// grpc decompression
_ "google.golang.org/grpc/encoding/gzip"
2020-10-04 14:27:14 +08:00
)
var sharedAPIConfig * configs . APIConfig = nil
type APINode struct {
2021-06-20 19:22:24 +08:00
serviceInstanceMap map [ string ] interface { }
serviceInstanceLocker sync . Mutex
2021-07-25 17:46:47 +08:00
sock * gosock . Sock
2021-11-21 19:27:27 +08:00
isStarting bool
2022-06-08 15:13:24 +08:00
issues [ ] * StartIssue
issuesFile string
2020-10-04 14:27:14 +08:00
}
func NewAPINode ( ) * APINode {
2021-06-20 19:22:24 +08:00
return & APINode {
serviceInstanceMap : map [ string ] interface { } { } ,
2021-07-25 17:46:47 +08:00
sock : gosock . NewTmpSock ( teaconst . ProcessName ) ,
2022-06-08 15:13:24 +08:00
issues : [ ] * StartIssue { } ,
issuesFile : Tea . LogFile ( "issues.log" ) ,
2021-06-20 19:22:24 +08:00
}
2020-10-04 14:27:14 +08:00
}
func ( this * APINode ) Start ( ) {
2021-11-21 19:27:27 +08:00
this . isStarting = true
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start api node, pid: " + strconv . Itoa ( os . Getpid ( ) ) )
2022-06-08 15:13:24 +08:00
// 保存启动过程中的问题,以便于查看
defer func ( ) {
this . saveIssues ( )
} ( )
// 本地Sock
logs . Println ( "[API_NODE]listening sock ..." )
err := this . listenSock ( )
2021-04-15 11:16:58 +08:00
if err != nil {
2022-06-08 15:13:24 +08:00
var errString = "start local sock failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "sock" , errString , "" )
2021-04-15 11:16:58 +08:00
return
}
2022-06-08 15:13:24 +08:00
// 检查数据库连接
err = this . checkDB ( )
2021-01-12 11:49:14 +08:00
if err != nil {
2022-06-08 15:13:24 +08:00
var errString = "check database connection failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "db" , errString , this . dbIssueSuggestion ( err . Error ( ) ) )
2021-01-12 11:49:14 +08:00
return
}
2020-11-17 10:26:31 +08:00
// 自动升级
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]auto upgrading ..." )
2021-01-12 11:49:14 +08:00
err = this . autoUpgrade ( )
2020-11-17 10:26:31 +08:00
if err != nil {
2022-06-08 15:13:24 +08:00
var errString = "auto upgrade failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "db" , errString , this . dbIssueSuggestion ( err . Error ( ) ) )
2020-11-17 10:26:31 +08:00
return
}
2020-10-04 14:27:14 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]setup database ..." )
2021-03-25 11:48:45 +08:00
err = this . setupDB ( )
if err != nil {
logs . Println ( "[API_NODE]setup database '" + err . Error ( ) + "'" )
// 不阻断执行
}
2020-10-13 20:05:13 +08:00
// 数据库通知启动
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]notify ready ..." )
2020-10-13 20:05:13 +08:00
dbs . NotifyReady ( )
2020-10-04 14:27:14 +08:00
// 读取配置
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]reading api config ..." )
2020-10-04 14:27:14 +08:00
config , err := configs . SharedAPIConfig ( )
if err != nil {
2022-06-08 15:13:24 +08:00
var errString = "read api config failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "config" , errString , "" )
2020-10-04 14:27:14 +08:00
return
}
sharedAPIConfig = config
// 校验
2021-01-01 23:31:30 +08:00
apiNode , err := models . SharedAPINodeDAO . FindEnabledAPINodeWithUniqueIdAndSecret ( nil , config . NodeId , config . Secret )
2020-10-04 14:27:14 +08:00
if err != nil {
2022-06-08 15:13:24 +08:00
var errString = "start failed: read api node from database failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "db" , errString , "" )
2020-10-04 14:27:14 +08:00
return
}
if apiNode == nil {
2022-06-08 15:13:24 +08:00
var errString = "can not start node, wrong 'nodeId' or 'secret'"
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "config" , errString , "请在api.yaml配置文件中填写正确的`nodeId`和`secret`, 如果数据库或者管理节点或API节点是从别的服务器迁移过来的, 请将老的系统配置拷贝到当前节点配置下" )
2020-10-04 14:27:14 +08:00
return
}
config . SetNumberId ( int64 ( apiNode . Id ) )
2022-06-08 15:13:24 +08:00
// 清除上一次启动错误
// 这个错误文件可能不存在,不需要处理错误
_ = os . Remove ( this . issuesFile )
2020-10-04 14:27:14 +08:00
// 设置rlimit
_ = utils . SetRLimit ( 1024 * 1024 )
2020-12-29 18:28:07 +08:00
// 状态变更计时器
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
NewNodeStatusExecutor ( ) . Listen ( )
} )
2020-12-29 18:28:07 +08:00
2021-07-29 16:50:59 +08:00
// 访问日志存储管理器
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
accesslogs . SharedStorageManager . Start ( )
} )
2021-07-29 16:50:59 +08:00
2020-10-04 14:27:14 +08:00
// 监听RPC服务
2020-12-30 22:01:01 +08:00
remotelogs . Println ( "API_NODE" , "starting RPC server ..." )
2020-10-04 14:27:14 +08:00
2022-06-08 15:13:24 +08:00
var isListening = this . listenPorts ( apiNode )
2020-12-29 18:28:07 +08:00
2020-10-04 14:27:14 +08:00
if ! isListening {
2022-06-08 15:13:24 +08:00
var errString = "the api node require at least one listening address"
remotelogs . Error ( "API_NODE" , errString )
this . addStartIssue ( "config" , errString , "请给当前API节点设置一个监听端口" )
2020-10-04 14:27:14 +08:00
return
}
2021-11-21 19:27:27 +08:00
// 结束启动
this . isStarting = false
2020-10-04 14:27:14 +08:00
// 保持进程
select { }
}
2021-04-15 11:16:58 +08:00
// Daemon 实现守护进程
2021-01-12 11:49:14 +08:00
func ( this * APINode ) Daemon ( ) {
2022-06-08 15:13:24 +08:00
var path = os . TempDir ( ) + "/" + teaconst . ProcessName + ".sock"
var isDebug = lists . ContainsString ( os . Args , "debug" )
2021-01-12 11:49:14 +08:00
for {
conn , err := net . DialTimeout ( "unix" , path , 1 * time . Second )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]starting ..." )
}
// 尝试启动
err = func ( ) error {
exe , err := os . Executable ( )
if err != nil {
return err
}
cmd := exec . Command ( exe )
err = cmd . Start ( )
if err != nil {
return err
}
err = cmd . Wait ( )
if err != nil {
return err
}
return nil
} ( )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]" , err )
}
time . Sleep ( 1 * time . Second )
} else {
time . Sleep ( 5 * time . Second )
}
} else {
_ = conn . Close ( )
time . Sleep ( 5 * time . Second )
}
}
}
2021-04-15 11:16:58 +08:00
// InstallSystemService 安装系统服务
2021-01-12 11:49:14 +08:00
func ( this * APINode ) InstallSystemService ( ) error {
2022-06-08 15:13:24 +08:00
var shortName = teaconst . SystemdServiceName
2021-01-12 11:49:14 +08:00
exe , err := os . Executable ( )
if err != nil {
return err
}
2022-06-08 15:13:24 +08:00
var manager = utils . NewServiceManager ( shortName , teaconst . ProductName )
2021-01-12 11:49:14 +08:00
err = manager . Install ( exe , [ ] string { } )
if err != nil {
return err
}
return nil
}
2020-10-04 14:27:14 +08:00
// 启动RPC监听
func ( this * APINode ) listenRPC ( listener net . Listener , tlsConfig * tls . Config ) error {
var rpcServer * grpc . Server
if tlsConfig == nil {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening GRPC http://" + listener . Addr ( ) . String ( ) + " ..." )
2022-01-19 16:53:52 +08:00
rpcServer = grpc . NewServer ( grpc . MaxRecvMsgSize ( 128 * 1024 * 1024 ) , grpc . UnaryInterceptor ( this . unaryInterceptor ) )
2020-10-04 14:27:14 +08:00
} else {
2021-01-01 20:49:09 +08:00
logs . Println ( "[API_NODE]listening GRPC https://" + listener . Addr ( ) . String ( ) + " ..." )
2022-01-19 16:53:52 +08:00
rpcServer = grpc . NewServer ( grpc . Creds ( credentials . NewTLS ( tlsConfig ) ) , grpc . MaxRecvMsgSize ( 128 * 1024 * 1024 ) , grpc . UnaryInterceptor ( this . unaryInterceptor ) )
2020-10-04 14:27:14 +08:00
}
2021-05-25 15:49:13 +08:00
this . registerServices ( rpcServer )
2020-10-04 14:27:14 +08:00
err := rpcServer . Serve ( listener )
if err != nil {
2020-11-17 10:26:31 +08:00
return errors . New ( "[API_NODE]start rpc failed: " + err . Error ( ) )
}
return nil
}
2021-04-15 11:16:58 +08:00
// 检查数据库
func ( this * APINode ) checkDB ( ) error {
2021-04-15 11:18:40 +08:00
logs . Println ( "[API_NODE]checking database connection ..." )
2021-04-15 11:16:58 +08:00
db , err := dbs . Default ( )
if err != nil {
return err
}
2022-06-08 15:13:24 +08:00
// 第一次测试连接
_ , err = db . Exec ( "SELECT 1" )
if err != nil {
var errString = "check database connection failed: " + err . Error ( )
logs . Println ( "[API_NODE]" + errString )
this . addStartIssue ( "db" , errString , this . dbIssueSuggestion ( errString ) )
// 多次尝试
var maxTries = 600
if Tea . IsTesting ( ) {
maxTries = 600
}
for i := 0 ; i <= maxTries ; i ++ {
_ , err := db . Exec ( "SELECT 1" )
if err != nil {
if i == maxTries - 1 {
return err
} else {
if i % 10 == 0 { // 这让提示不会太多
logs . Println ( "[API_NODE]reconnecting to database (" + fmt . Sprintf ( "%.1f" , float32 ( i * 100 ) / float32 ( maxTries + 1 ) ) + "%) ..." )
}
time . Sleep ( 1 * time . Second )
2021-04-15 11:16:58 +08:00
}
2022-06-08 15:13:24 +08:00
} else {
logs . Println ( "[API_NODE]database connected" )
return nil
2021-04-15 11:16:58 +08:00
}
}
}
return nil
}
2020-11-17 10:26:31 +08:00
// 自动升级
func ( this * APINode ) autoUpgrade ( ) error {
2020-11-17 15:42:42 +08:00
if Tea . IsTesting ( ) {
return nil
}
// 执行SQL
2022-06-08 15:13:24 +08:00
var config = & dbs . Config { }
2020-11-17 15:42:42 +08:00
configData , err := ioutil . ReadFile ( Tea . ConfigFile ( "db.yaml" ) )
2020-11-17 10:26:31 +08:00
if err != nil {
2020-11-17 15:42:42 +08:00
return errors . New ( "read database config file failed: " + err . Error ( ) )
}
err = yaml . Unmarshal ( configData , config )
if err != nil {
return errors . New ( "decode database config failed: " + err . Error ( ) )
}
2022-03-14 16:06:25 +08:00
var dbConfig = config . DBs [ Tea . Env ]
2020-11-17 15:42:42 +08:00
db , err := dbs . NewInstanceFromConfig ( dbConfig )
if err != nil {
return errors . New ( "load database failed: " + err . Error ( ) )
2020-11-17 10:26:31 +08:00
}
2022-04-08 14:15:45 +08:00
defer func ( ) {
_ = db . Close ( )
} ( )
2020-11-17 10:26:31 +08:00
one , err := db . FindOne ( "SELECT version FROM edgeVersions LIMIT 1" )
if err != nil {
return errors . New ( "query version failed: " + err . Error ( ) )
}
if one != nil {
// 如果是同样的版本,则直接认为是最新版本
2022-03-14 16:06:25 +08:00
var version = one . GetString ( "version" )
if stringutil . VersionCompare ( version , setup . ComposeSQLVersion ( ) ) >= 0 {
2020-11-17 10:26:31 +08:00
return nil
}
2020-10-04 14:27:14 +08:00
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog(),因为此时还没有启动完成
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database starting ..." )
2021-08-05 19:53:54 +08:00
err = setup . NewSQLExecutor ( dbConfig ) . Run ( false )
2020-11-17 10:26:31 +08:00
if err != nil {
return errors . New ( "execute sql failed: " + err . Error ( ) )
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database done" )
2020-10-04 14:27:14 +08:00
return nil
}
2021-01-01 20:49:09 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
func ( this * APINode ) setupDB ( ) error {
db , err := dbs . Default ( )
if err != nil {
return err
}
// 调整预处理语句数量
{
result , err := db . FindOne ( "SHOW VARIABLES WHERE variable_name='max_prepared_stmt_count'" )
if err != nil {
return err
}
value := result . GetString ( "Value" )
if regexp . MustCompile ( ` ^\d+$ ` ) . MatchString ( value ) {
valueInt := types . Int ( value )
if valueInt < 65535 {
_ , err := db . Exec ( "SET GLOBAL max_prepared_stmt_count=65535" )
if err != nil {
2021-07-21 09:01:37 +08:00
return errors . New ( "run 'SET GLOBAL max_prepared_stmt_count' on database failed: " + err . Error ( ) + ", \nyou can change the variable in 'my.cnf': \n~~~\n" + ` [ mysqld ]
max_prepared_stmt_count = 65535
~ ~ ~
then restart mysqld . ` )
2021-03-25 11:48:45 +08:00
}
}
}
}
return nil
}
2021-01-01 20:49:09 +08:00
// 启动端口
func ( this * APINode ) listenPorts ( apiNode * models . APINode ) ( isListening bool ) {
// HTTP
httpConfig , err := apiNode . DecodeHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode http config: " + err . Error ( ) )
return
}
2022-04-19 19:35:50 +08:00
var ports = [ ] int { }
2021-01-01 20:49:09 +08:00
isListening = false
if httpConfig != nil && httpConfig . IsOn && len ( httpConfig . Listen ) > 0 {
for _ , listen := range httpConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
err := this . listenRPC ( listener , nil )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// HTTPS
2021-08-22 11:35:33 +08:00
httpsConfig , err := apiNode . DecodeHTTPS ( nil , nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode https config: " + err . Error ( ) )
return
}
if httpsConfig != nil &&
httpsConfig . IsOn &&
len ( httpsConfig . Listen ) > 0 &&
httpsConfig . SSLPolicy != nil &&
httpsConfig . SSLPolicy . IsOn &&
len ( httpsConfig . SSLPolicy . Certs ) > 0 {
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
for _ , listen := range httpsConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
err := this . listenRPC ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// Rest HTTP
restHTTPConfig , err := apiNode . DecodeRestHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST http config: " + err . Error ( ) )
return
}
if restHTTPConfig != nil && restHTTPConfig . IsOn && len ( restHTTPConfig . Listen ) > 0 {
for _ , listen := range restHTTPConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
continue
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening REST http://" + addr + " ..." )
server := & RestServer { }
err := server . Listen ( listener )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// Rest HTTPS
2021-08-22 11:35:33 +08:00
restHTTPSConfig , err := apiNode . DecodeRestHTTPS ( nil , nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST https config: " + err . Error ( ) )
return
}
if restHTTPSConfig != nil &&
restHTTPSConfig . IsOn &&
len ( restHTTPSConfig . Listen ) > 0 &&
restHTTPSConfig . SSLPolicy != nil &&
restHTTPSConfig . SSLPolicy . IsOn &&
len ( restHTTPSConfig . SSLPolicy . Certs ) > 0 {
for _ , listen := range restHTTPSConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
continue
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening REST https://" + addr + " ..." )
server := & RestServer { }
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
err := server . ListenHTTPS ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
2022-04-19 19:35:50 +08:00
// add to local firewall
if len ( ports ) > 0 {
utils . AddPortsToFirewall ( ports )
}
2021-01-01 20:49:09 +08:00
return
}
2021-01-12 11:49:14 +08:00
// 监听本地sock
func ( this * APINode ) listenSock ( ) error {
2021-07-25 17:46:47 +08:00
// 检查是否在运行
if this . sock . IsListening ( ) {
reply , err := this . sock . Send ( & gosock . Command { Code : "pid" } )
if err == nil {
return errors . New ( "error: the process is already running, pid: " + maps . NewMap ( reply . Params ) . GetString ( "pid" ) )
2021-01-12 11:49:14 +08:00
} else {
2021-07-25 17:46:47 +08:00
return errors . New ( "error: the process is already running" )
2021-01-12 11:49:14 +08:00
}
}
2021-07-25 17:46:47 +08:00
// 启动监听
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-07-25 17:46:47 +08:00
this . sock . OnCommand ( func ( cmd * gosock . Command ) {
switch cmd . Code {
case "pid" :
_ = cmd . Reply ( & gosock . Command {
Code : "pid" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
} ,
} )
2021-11-04 11:15:22 +08:00
case "info" :
exePath , _ := os . Executable ( )
_ = cmd . Reply ( & gosock . Command {
Code : "info" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
"version" : teaconst . Version ,
"path" : exePath ,
} ,
} )
2021-07-25 17:46:47 +08:00
case "stop" :
_ = cmd . ReplyOk ( )
// 退出主进程
events . Notify ( events . EventQuit )
os . Exit ( 0 )
2021-11-21 19:27:27 +08:00
case "starting" : // 是否正在启动
_ = cmd . Reply ( & gosock . Command {
Code : "starting" ,
Params : map [ string ] interface { } {
"isStarting" : this . isStarting ,
} ,
} )
2021-12-14 10:49:29 +08:00
case "goman" :
var posMap = map [ string ] maps . Map { } // file#line => Map
for _ , instance := range goman . List ( ) {
var pos = instance . File + "#" + types . String ( instance . Line )
m , ok := posMap [ pos ]
if ok {
m [ "count" ] = m [ "count" ] . ( int ) + 1
} else {
m = maps . Map {
"pos" : pos ,
"count" : 1 ,
}
posMap [ pos ] = m
}
}
var result = [ ] maps . Map { }
for _ , m := range posMap {
result = append ( result , m )
}
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] [ "count" ] . ( int ) > result [ j ] [ "count" ] . ( int )
} )
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"total" : runtime . NumGoroutine ( ) ,
"result" : result ,
} ,
} )
2022-01-19 16:53:52 +08:00
case "debug" :
teaconst . Debug = ! teaconst . Debug
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "debug" : teaconst . Debug } ,
} )
2022-04-08 15:09:33 +08:00
case "db.stmt.prepare" :
dbs . ShowPreparedStatements = ! dbs . ShowPreparedStatements
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "isOn" : dbs . ShowPreparedStatements } ,
} )
case "db.stmt.count" :
db , _ := dbs . Default ( )
if db != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "count" : db . StmtManager ( ) . Len ( ) } ,
} )
} else {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "count" : 0 } ,
} )
}
2021-01-12 11:49:14 +08:00
}
2021-07-25 17:46:47 +08:00
} )
err := this . sock . Listen ( )
if err != nil {
2021-07-26 11:23:21 +08:00
logs . Println ( "API_NODE" , err . Error ( ) )
2021-01-12 11:49:14 +08:00
}
2021-12-14 10:49:29 +08:00
} )
2021-01-12 11:49:14 +08:00
2021-07-25 17:46:47 +08:00
events . On ( events . EventQuit , func ( ) {
2021-07-26 11:23:21 +08:00
logs . Println ( "API_NODE" , "quit unix sock" )
2021-07-25 17:46:47 +08:00
_ = this . sock . Close ( )
} )
2021-01-12 11:49:14 +08:00
return nil
}
2022-01-19 16:53:52 +08:00
// 服务过滤器
func ( this * APINode ) unaryInterceptor ( ctx context . Context , req interface { } , info * grpc . UnaryServerInfo , handler grpc . UnaryHandler ) ( resp interface { } , err error ) {
if teaconst . Debug {
var before = time . Now ( )
var traceCtx = rpc . NewContext ( ctx )
resp , err = handler ( traceCtx , req )
var costMs = time . Since ( before ) . Seconds ( ) * 1000
statErr := models . SharedAPIMethodStatDAO . CreateStat ( nil , info . FullMethod , "" , costMs )
if statErr != nil {
remotelogs . Error ( "API_NODE" , "create method stat failed: " + statErr . Error ( ) )
}
var tagMap = traceCtx . TagMap ( )
for tag , tagCostMs := range tagMap {
statErr = models . SharedAPIMethodStatDAO . CreateStat ( nil , info . FullMethod , tag , tagCostMs )
if statErr != nil {
remotelogs . Error ( "API_NODE" , "create method stat failed: " + statErr . Error ( ) )
}
}
return
}
2022-03-31 15:30:04 +08:00
result , err := handler ( ctx , req )
if err != nil {
err = errors . New ( "'" + info . FullMethod + "()' says: " + err . Error ( ) )
}
return result , err
2022-01-19 16:53:52 +08:00
}
2022-06-08 15:13:24 +08:00
// 添加启动相关的Issue
func ( this * APINode ) addStartIssue ( code string , message string , suggestion string ) {
this . issues = append ( this . issues , NewStartIssue ( code , message , suggestion ) )
this . saveIssues ( )
}
// 增加数据库建议
func ( this * APINode ) dbIssueSuggestion ( errString string ) string {
// 数据库配置
db , err := dbs . Default ( )
if err != nil {
return ""
}
config , err := db . Config ( )
if err != nil {
return ""
}
var dsn = config . Dsn
dsnConfig , err := mysql . ParseDSN ( dsn )
if err != nil {
return ""
}
var addr = dsnConfig . Addr
// 配置文件位置
var dbConfigPath = Tea . ConfigFile ( "db.yaml" )
// 连接被拒绝
if strings . Contains ( errString , "connection refused" ) {
// 本机
if strings . HasPrefix ( addr , "127.0.0.1:" ) || strings . HasPrefix ( addr , "localhost:" ) {
return "试图连接到数据库被拒绝, 请检查: 1) 本地数据库服务是否已经启动; 2) 数据库IP和端口( " + addr + ")是否正确;(当前数据库配置为:" + dsn + ",配置文件位置:" + dbConfigPath + ")。"
} else {
return "试图连接到数据库被拒绝, 请检查: 1) 数据库服务是否已经启动; 2) 数据库IP和端口( " + addr + ") 是否正确; 3) 防火墙设置; ( 当前数据库配置为: " + dsn + ",配置文件位置:" + dbConfigPath + ")。"
}
}
// 权限错误
if strings . Contains ( errString , "Error 1045" ) {
return "使用的用户和密码没有权限连接到指定数据库,请检查:数据库配置文件中的用户名(" + dsnConfig . User + ")和密码(" + dsnConfig . Passwd + ")是否正确;(当前数据库配置为:" + dsn + ",配置文件位置:" + dbConfigPath + ")。"
}
// 数据库名称错误
if strings . Contains ( errString , "Error 1049" ) {
return "数据库名称配置错误,请检查:数据库配置文件中数据库名称(" + dsnConfig . DBName + ")是否正确;(当前数据库配置为:" + dsn + ",配置文件位置:" + dbConfigPath + ")。"
}
return ""
}
// 保存issues
func ( this * APINode ) saveIssues ( ) {
issuesJSON , err := json . Marshal ( this . issues )
if err == nil {
_ = ioutil . WriteFile ( this . issuesFile , issuesJSON , 0666 )
}
}