2020-10-04 14:27:14 +08:00
package nodes
import (
2022-01-19 16:53:52 +08:00
"context"
2020-10-04 14:27:14 +08:00
"crypto/tls"
"errors"
2021-04-15 11:16:58 +08:00
"fmt"
2021-07-29 16:50:59 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/configs"
2020-11-17 10:26:31 +08:00
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2021-01-12 11:49:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/events"
2021-12-14 10:49:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2020-12-30 22:01:01 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2022-01-19 16:53:52 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/rpc"
2020-11-17 10:26:31 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/setup"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2020-11-17 15:42:42 +08:00
"github.com/iwind/TeaGo/Tea"
2020-10-13 20:05:13 +08:00
"github.com/iwind/TeaGo/dbs"
2021-01-12 11:49:14 +08:00
"github.com/iwind/TeaGo/lists"
2020-10-04 14:27:14 +08:00
"github.com/iwind/TeaGo/logs"
2021-07-25 17:46:47 +08:00
"github.com/iwind/TeaGo/maps"
2021-03-25 11:48:45 +08:00
"github.com/iwind/TeaGo/types"
2020-11-17 10:26:31 +08:00
stringutil "github.com/iwind/TeaGo/utils/string"
2021-07-25 17:46:47 +08:00
"github.com/iwind/gosock/pkg/gosock"
2020-10-04 14:27:14 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
2022-03-04 12:31:44 +08:00
"gopkg.in/yaml.v3"
2020-11-17 15:42:42 +08:00
"io/ioutil"
2021-01-12 11:49:14 +08:00
"log"
2020-10-04 14:27:14 +08:00
"net"
"os"
2021-01-12 11:49:14 +08:00
"os/exec"
2021-03-25 11:48:45 +08:00
"regexp"
2021-12-14 10:49:29 +08:00
"runtime"
"sort"
2020-10-04 14:27:14 +08:00
"strconv"
2021-06-20 19:22:24 +08:00
"sync"
2021-01-12 11:49:14 +08:00
"time"
2022-03-20 11:28:45 +08:00
// grpc decompression
_ "google.golang.org/grpc/encoding/gzip"
2020-10-04 14:27:14 +08:00
)
var sharedAPIConfig * configs . APIConfig = nil
type APINode struct {
2021-06-20 19:22:24 +08:00
serviceInstanceMap map [ string ] interface { }
serviceInstanceLocker sync . Mutex
2021-07-25 17:46:47 +08:00
sock * gosock . Sock
2021-11-21 19:27:27 +08:00
isStarting bool
2020-10-04 14:27:14 +08:00
}
func NewAPINode ( ) * APINode {
2021-06-20 19:22:24 +08:00
return & APINode {
serviceInstanceMap : map [ string ] interface { } { } ,
2021-07-25 17:46:47 +08:00
sock : gosock . NewTmpSock ( teaconst . ProcessName ) ,
2021-06-20 19:22:24 +08:00
}
2020-10-04 14:27:14 +08:00
}
func ( this * APINode ) Start ( ) {
2021-11-21 19:27:27 +08:00
this . isStarting = true
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start api node, pid: " + strconv . Itoa ( os . Getpid ( ) ) )
2021-04-15 11:16:58 +08:00
// 检查数据库连接
err := this . checkDB ( )
if err != nil {
logs . Println ( "[API_NODE]" + err . Error ( ) )
return
}
2021-01-12 11:49:14 +08:00
// 本地Sock
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]listening sock ..." )
2021-04-15 11:16:58 +08:00
err = this . listenSock ( )
2021-01-12 11:49:14 +08:00
if err != nil {
logs . Println ( "[API_NODE]" + err . Error ( ) )
return
}
2020-11-17 10:26:31 +08:00
// 自动升级
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]auto upgrading ..." )
2021-01-12 11:49:14 +08:00
err = this . autoUpgrade ( )
2020-11-17 10:26:31 +08:00
if err != nil {
logs . Println ( "[API_NODE]auto upgrade failed: " + err . Error ( ) )
return
}
2020-10-04 14:27:14 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]setup database ..." )
2021-03-25 11:48:45 +08:00
err = this . setupDB ( )
if err != nil {
logs . Println ( "[API_NODE]setup database '" + err . Error ( ) + "'" )
// 不阻断执行
}
2020-10-13 20:05:13 +08:00
// 数据库通知启动
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]notify ready ..." )
2020-10-13 20:05:13 +08:00
dbs . NotifyReady ( )
2020-10-04 14:27:14 +08:00
// 读取配置
2021-11-21 19:27:27 +08:00
logs . Println ( "[API_NODE]reading api config ..." )
2020-10-04 14:27:14 +08:00
config , err := configs . SharedAPIConfig ( )
if err != nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start failed: " + err . Error ( ) )
2020-10-04 14:27:14 +08:00
return
}
sharedAPIConfig = config
// 校验
2021-01-01 23:31:30 +08:00
apiNode , err := models . SharedAPINodeDAO . FindEnabledAPINodeWithUniqueIdAndSecret ( nil , config . NodeId , config . Secret )
2020-10-04 14:27:14 +08:00
if err != nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]start failed: read api node from database failed: " + err . Error ( ) )
2020-10-04 14:27:14 +08:00
return
}
if apiNode == nil {
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]can not start node, wrong 'nodeId' or 'secret'" )
2020-10-04 14:27:14 +08:00
return
}
config . SetNumberId ( int64 ( apiNode . Id ) )
// 设置rlimit
_ = utils . SetRLimit ( 1024 * 1024 )
2020-12-29 18:28:07 +08:00
// 状态变更计时器
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
NewNodeStatusExecutor ( ) . Listen ( )
} )
2020-12-29 18:28:07 +08:00
2021-07-29 16:50:59 +08:00
// 访问日志存储管理器
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
accesslogs . SharedStorageManager . Start ( )
} )
2021-07-29 16:50:59 +08:00
2020-10-04 14:27:14 +08:00
// 监听RPC服务
2020-12-30 22:01:01 +08:00
remotelogs . Println ( "API_NODE" , "starting RPC server ..." )
2020-10-04 14:27:14 +08:00
2021-01-01 20:49:09 +08:00
isListening := this . listenPorts ( apiNode )
2020-12-29 18:28:07 +08:00
2020-10-04 14:27:14 +08:00
if ! isListening {
2020-12-30 22:01:01 +08:00
remotelogs . Error ( "API_NODE" , "the api node require at least one listening address" )
2020-10-04 14:27:14 +08:00
return
}
2021-11-21 19:27:27 +08:00
// 结束启动
this . isStarting = false
2020-10-04 14:27:14 +08:00
// 保持进程
select { }
}
2021-04-15 11:16:58 +08:00
// Daemon 实现守护进程
2021-01-12 11:49:14 +08:00
func ( this * APINode ) Daemon ( ) {
2022-05-07 20:41:53 +08:00
path := os . TempDir ( ) + "/" + teaconst . ProcessName + ".sock"
2021-01-12 11:49:14 +08:00
isDebug := lists . ContainsString ( os . Args , "debug" )
isDebug = true
for {
conn , err := net . DialTimeout ( "unix" , path , 1 * time . Second )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]starting ..." )
}
// 尝试启动
err = func ( ) error {
exe , err := os . Executable ( )
if err != nil {
return err
}
cmd := exec . Command ( exe )
err = cmd . Start ( )
if err != nil {
return err
}
err = cmd . Wait ( )
if err != nil {
return err
}
return nil
} ( )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]" , err )
}
time . Sleep ( 1 * time . Second )
} else {
time . Sleep ( 5 * time . Second )
}
} else {
_ = conn . Close ( )
time . Sleep ( 5 * time . Second )
}
}
}
2021-04-15 11:16:58 +08:00
// InstallSystemService 安装系统服务
2021-01-12 11:49:14 +08:00
func ( this * APINode ) InstallSystemService ( ) error {
shortName := teaconst . SystemdServiceName
exe , err := os . Executable ( )
if err != nil {
return err
}
manager := utils . NewServiceManager ( shortName , teaconst . ProductName )
err = manager . Install ( exe , [ ] string { } )
if err != nil {
return err
}
return nil
}
2020-10-04 14:27:14 +08:00
// 启动RPC监听
func ( this * APINode ) listenRPC ( listener net . Listener , tlsConfig * tls . Config ) error {
var rpcServer * grpc . Server
if tlsConfig == nil {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening GRPC http://" + listener . Addr ( ) . String ( ) + " ..." )
2022-01-19 16:53:52 +08:00
rpcServer = grpc . NewServer ( grpc . MaxRecvMsgSize ( 128 * 1024 * 1024 ) , grpc . UnaryInterceptor ( this . unaryInterceptor ) )
2020-10-04 14:27:14 +08:00
} else {
2021-01-01 20:49:09 +08:00
logs . Println ( "[API_NODE]listening GRPC https://" + listener . Addr ( ) . String ( ) + " ..." )
2022-01-19 16:53:52 +08:00
rpcServer = grpc . NewServer ( grpc . Creds ( credentials . NewTLS ( tlsConfig ) ) , grpc . MaxRecvMsgSize ( 128 * 1024 * 1024 ) , grpc . UnaryInterceptor ( this . unaryInterceptor ) )
2020-10-04 14:27:14 +08:00
}
2021-05-25 15:49:13 +08:00
this . registerServices ( rpcServer )
2020-10-04 14:27:14 +08:00
err := rpcServer . Serve ( listener )
if err != nil {
2020-11-17 10:26:31 +08:00
return errors . New ( "[API_NODE]start rpc failed: " + err . Error ( ) )
}
return nil
}
2021-04-15 11:16:58 +08:00
// 检查数据库
func ( this * APINode ) checkDB ( ) error {
2021-04-15 11:18:40 +08:00
logs . Println ( "[API_NODE]checking database connection ..." )
2021-04-15 11:16:58 +08:00
db , err := dbs . Default ( )
if err != nil {
return err
}
maxTries := 600
for i := 0 ; i <= maxTries ; i ++ {
_ , err := db . Exec ( "SELECT 1" )
if err != nil {
if i == maxTries - 1 {
return err
} else {
if i % 10 == 0 { // 这让提示不会太多
2021-04-29 16:48:09 +08:00
logs . Println ( "[API_NODE]reconnecting to database (" + fmt . Sprintf ( "%.1f" , float32 ( i * 100 ) / float32 ( maxTries + 1 ) ) + "%) ..." )
2021-04-15 11:16:58 +08:00
}
time . Sleep ( 1 * time . Second )
}
} else {
2021-04-15 11:18:40 +08:00
logs . Println ( "[API_NODE]database connected" )
2021-04-15 11:16:58 +08:00
return nil
}
}
return nil
}
2020-11-17 10:26:31 +08:00
// 自动升级
func ( this * APINode ) autoUpgrade ( ) error {
2020-11-17 15:42:42 +08:00
if Tea . IsTesting ( ) {
return nil
}
// 执行SQL
config := & dbs . Config { }
configData , err := ioutil . ReadFile ( Tea . ConfigFile ( "db.yaml" ) )
2020-11-17 10:26:31 +08:00
if err != nil {
2020-11-17 15:42:42 +08:00
return errors . New ( "read database config file failed: " + err . Error ( ) )
}
err = yaml . Unmarshal ( configData , config )
if err != nil {
return errors . New ( "decode database config failed: " + err . Error ( ) )
}
2022-03-14 16:06:25 +08:00
var dbConfig = config . DBs [ Tea . Env ]
2020-11-17 15:42:42 +08:00
db , err := dbs . NewInstanceFromConfig ( dbConfig )
if err != nil {
return errors . New ( "load database failed: " + err . Error ( ) )
2020-11-17 10:26:31 +08:00
}
2022-04-08 14:15:45 +08:00
defer func ( ) {
_ = db . Close ( )
} ( )
2020-11-17 10:26:31 +08:00
one , err := db . FindOne ( "SELECT version FROM edgeVersions LIMIT 1" )
if err != nil {
return errors . New ( "query version failed: " + err . Error ( ) )
}
if one != nil {
// 如果是同样的版本,则直接认为是最新版本
2022-03-14 16:06:25 +08:00
var version = one . GetString ( "version" )
if stringutil . VersionCompare ( version , setup . ComposeSQLVersion ( ) ) >= 0 {
2020-11-17 10:26:31 +08:00
return nil
}
2020-10-04 14:27:14 +08:00
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog(),因为此时还没有启动完成
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database starting ..." )
2021-08-05 19:53:54 +08:00
err = setup . NewSQLExecutor ( dbConfig ) . Run ( false )
2020-11-17 10:26:31 +08:00
if err != nil {
return errors . New ( "execute sql failed: " + err . Error ( ) )
}
2021-07-21 08:08:31 +08:00
// 不使用remotelog
2020-11-17 10:26:31 +08:00
logs . Println ( "[API_NODE]upgrade database done" )
2020-10-04 14:27:14 +08:00
return nil
}
2021-01-01 20:49:09 +08:00
2021-03-25 11:48:45 +08:00
// 自动设置数据库
func ( this * APINode ) setupDB ( ) error {
db , err := dbs . Default ( )
if err != nil {
return err
}
// 调整预处理语句数量
{
result , err := db . FindOne ( "SHOW VARIABLES WHERE variable_name='max_prepared_stmt_count'" )
if err != nil {
return err
}
value := result . GetString ( "Value" )
if regexp . MustCompile ( ` ^\d+$ ` ) . MatchString ( value ) {
valueInt := types . Int ( value )
if valueInt < 65535 {
_ , err := db . Exec ( "SET GLOBAL max_prepared_stmt_count=65535" )
if err != nil {
2021-07-21 09:01:37 +08:00
return errors . New ( "run 'SET GLOBAL max_prepared_stmt_count' on database failed: " + err . Error ( ) + ", \nyou can change the variable in 'my.cnf': \n~~~\n" + ` [ mysqld ]
max_prepared_stmt_count = 65535
~ ~ ~
then restart mysqld . ` )
2021-03-25 11:48:45 +08:00
}
}
}
}
return nil
}
2021-01-01 20:49:09 +08:00
// 启动端口
func ( this * APINode ) listenPorts ( apiNode * models . APINode ) ( isListening bool ) {
// HTTP
httpConfig , err := apiNode . DecodeHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode http config: " + err . Error ( ) )
return
}
2022-04-19 19:35:50 +08:00
var ports = [ ] int { }
2021-01-01 20:49:09 +08:00
isListening = false
if httpConfig != nil && httpConfig . IsOn && len ( httpConfig . Listen ) > 0 {
for _ , listen := range httpConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
err := this . listenRPC ( listener , nil )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// HTTPS
2021-08-22 11:35:33 +08:00
httpsConfig , err := apiNode . DecodeHTTPS ( nil , nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode https config: " + err . Error ( ) )
return
}
if httpsConfig != nil &&
httpsConfig . IsOn &&
len ( httpsConfig . Listen ) > 0 &&
httpsConfig . SSLPolicy != nil &&
httpsConfig . SSLPolicy . IsOn &&
len ( httpsConfig . SSLPolicy . Certs ) > 0 {
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
for _ , listen := range httpsConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
2021-07-21 09:01:37 +08:00
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' failed: " + err . Error ( ) + ", we will try to listen port only" )
2021-07-06 15:19:39 +08:00
// 试着只监听端口
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ..." )
listener , err = net . Listen ( "tcp" , ":" + port )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening ':" + port + "' failed: " + err . Error ( ) )
continue
}
remotelogs . Println ( "API_NODE" , "retry listening port ':" + port + "' only ok" )
2021-01-01 20:49:09 +08:00
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
err := this . listenRPC ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening '" + addr + "' rpc: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// Rest HTTP
restHTTPConfig , err := apiNode . DecodeRestHTTP ( )
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST http config: " + err . Error ( ) )
return
}
if restHTTPConfig != nil && restHTTPConfig . IsOn && len ( restHTTPConfig . Listen ) > 0 {
for _ , listen := range restHTTPConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
continue
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening REST http://" + addr + " ..." )
server := & RestServer { }
err := server . Listen ( listener )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'http://" + addr + "' failed: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
// Rest HTTPS
2021-08-22 11:35:33 +08:00
restHTTPSConfig , err := apiNode . DecodeRestHTTPS ( nil , nil )
2021-01-01 20:49:09 +08:00
if err != nil {
remotelogs . Error ( "API_NODE" , "decode REST https config: " + err . Error ( ) )
return
}
if restHTTPSConfig != nil &&
restHTTPSConfig . IsOn &&
len ( restHTTPSConfig . Listen ) > 0 &&
restHTTPSConfig . SSLPolicy != nil &&
restHTTPSConfig . SSLPolicy . IsOn &&
len ( restHTTPSConfig . SSLPolicy . Certs ) > 0 {
for _ , listen := range restHTTPSConfig . Listen {
for _ , addr := range listen . Addresses ( ) {
2022-04-19 19:35:50 +08:00
// 收集Port
2022-04-19 19:48:37 +08:00
_ , portString , _ := net . SplitHostPort ( addr )
var port = types . Int ( portString )
if port > 0 && ! lists . ContainsInt ( ports , port ) {
ports = append ( ports , port )
2022-04-19 19:35:50 +08:00
}
2021-01-01 20:49:09 +08:00
listener , err := net . Listen ( "tcp" , addr )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
continue
}
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-01-01 20:49:09 +08:00
remotelogs . Println ( "API_NODE" , "listening REST https://" + addr + " ..." )
server := & RestServer { }
certs := [ ] tls . Certificate { }
for _ , cert := range httpsConfig . SSLPolicy . Certs {
certs = append ( certs , * cert . CertObject ( ) )
}
err := server . ListenHTTPS ( listener , & tls . Config {
Certificates : certs ,
} )
if err != nil {
remotelogs . Error ( "API_NODE" , "listening REST 'https://" + addr + "' failed: " + err . Error ( ) )
return
}
2021-12-14 10:49:29 +08:00
} )
2021-01-01 20:49:09 +08:00
isListening = true
}
}
}
2022-04-19 19:35:50 +08:00
// add to local firewall
if len ( ports ) > 0 {
utils . AddPortsToFirewall ( ports )
}
2021-01-01 20:49:09 +08:00
return
}
2021-01-12 11:49:14 +08:00
// 监听本地sock
func ( this * APINode ) listenSock ( ) error {
2021-07-25 17:46:47 +08:00
// 检查是否在运行
if this . sock . IsListening ( ) {
reply , err := this . sock . Send ( & gosock . Command { Code : "pid" } )
if err == nil {
return errors . New ( "error: the process is already running, pid: " + maps . NewMap ( reply . Params ) . GetString ( "pid" ) )
2021-01-12 11:49:14 +08:00
} else {
2021-07-25 17:46:47 +08:00
return errors . New ( "error: the process is already running" )
2021-01-12 11:49:14 +08:00
}
}
2021-07-25 17:46:47 +08:00
// 启动监听
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-07-25 17:46:47 +08:00
this . sock . OnCommand ( func ( cmd * gosock . Command ) {
switch cmd . Code {
case "pid" :
_ = cmd . Reply ( & gosock . Command {
Code : "pid" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
} ,
} )
2021-11-04 11:15:22 +08:00
case "info" :
exePath , _ := os . Executable ( )
_ = cmd . Reply ( & gosock . Command {
Code : "info" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
"version" : teaconst . Version ,
"path" : exePath ,
} ,
} )
2021-07-25 17:46:47 +08:00
case "stop" :
_ = cmd . ReplyOk ( )
// 退出主进程
events . Notify ( events . EventQuit )
os . Exit ( 0 )
2021-11-21 19:27:27 +08:00
case "starting" : // 是否正在启动
_ = cmd . Reply ( & gosock . Command {
Code : "starting" ,
Params : map [ string ] interface { } {
"isStarting" : this . isStarting ,
} ,
} )
2021-12-14 10:49:29 +08:00
case "goman" :
var posMap = map [ string ] maps . Map { } // file#line => Map
for _ , instance := range goman . List ( ) {
var pos = instance . File + "#" + types . String ( instance . Line )
m , ok := posMap [ pos ]
if ok {
m [ "count" ] = m [ "count" ] . ( int ) + 1
} else {
m = maps . Map {
"pos" : pos ,
"count" : 1 ,
}
posMap [ pos ] = m
}
}
var result = [ ] maps . Map { }
for _ , m := range posMap {
result = append ( result , m )
}
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] [ "count" ] . ( int ) > result [ j ] [ "count" ] . ( int )
} )
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"total" : runtime . NumGoroutine ( ) ,
"result" : result ,
} ,
} )
2022-01-19 16:53:52 +08:00
case "debug" :
teaconst . Debug = ! teaconst . Debug
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "debug" : teaconst . Debug } ,
} )
2022-04-08 15:09:33 +08:00
case "db.stmt.prepare" :
dbs . ShowPreparedStatements = ! dbs . ShowPreparedStatements
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "isOn" : dbs . ShowPreparedStatements } ,
} )
case "db.stmt.count" :
db , _ := dbs . Default ( )
if db != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "count" : db . StmtManager ( ) . Len ( ) } ,
} )
} else {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } { "count" : 0 } ,
} )
}
2021-01-12 11:49:14 +08:00
}
2021-07-25 17:46:47 +08:00
} )
err := this . sock . Listen ( )
if err != nil {
2021-07-26 11:23:21 +08:00
logs . Println ( "API_NODE" , err . Error ( ) )
2021-01-12 11:49:14 +08:00
}
2021-12-14 10:49:29 +08:00
} )
2021-01-12 11:49:14 +08:00
2021-07-25 17:46:47 +08:00
events . On ( events . EventQuit , func ( ) {
2021-07-26 11:23:21 +08:00
logs . Println ( "API_NODE" , "quit unix sock" )
2021-07-25 17:46:47 +08:00
_ = this . sock . Close ( )
} )
2021-01-12 11:49:14 +08:00
return nil
}
2022-01-19 16:53:52 +08:00
// 服务过滤器
func ( this * APINode ) unaryInterceptor ( ctx context . Context , req interface { } , info * grpc . UnaryServerInfo , handler grpc . UnaryHandler ) ( resp interface { } , err error ) {
if teaconst . Debug {
var before = time . Now ( )
var traceCtx = rpc . NewContext ( ctx )
resp , err = handler ( traceCtx , req )
var costMs = time . Since ( before ) . Seconds ( ) * 1000
statErr := models . SharedAPIMethodStatDAO . CreateStat ( nil , info . FullMethod , "" , costMs )
if statErr != nil {
remotelogs . Error ( "API_NODE" , "create method stat failed: " + statErr . Error ( ) )
}
var tagMap = traceCtx . TagMap ( )
for tag , tagCostMs := range tagMap {
statErr = models . SharedAPIMethodStatDAO . CreateStat ( nil , info . FullMethod , tag , tagCostMs )
if statErr != nil {
remotelogs . Error ( "API_NODE" , "create method stat failed: " + statErr . Error ( ) )
}
}
return
}
2022-03-31 15:30:04 +08:00
result , err := handler ( ctx , req )
if err != nil {
err = errors . New ( "'" + info . FullMethod + "()' says: " + err . Error ( ) )
}
return result , err
2022-01-19 16:53:52 +08:00
}