2020-10-04 14:30:42 +08:00
package nodes
import (
2021-06-10 19:19:15 +08:00
"context"
2021-08-26 15:48:09 +08:00
"crypto/tls"
2020-10-04 14:30:42 +08:00
"encoding/json"
2020-10-04 16:10:01 +08:00
"fmt"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
2021-11-20 18:57:46 +08:00
"github.com/TeaOSLab/EdgeNode/internal/configs"
2021-01-11 18:16:15 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeNode/internal/errors"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2021-12-08 15:17:45 +08:00
"github.com/TeaOSLab/EdgeNode/internal/goman"
2020-12-17 17:36:10 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2021-01-11 18:16:15 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2021-11-20 18:57:46 +08:00
"github.com/iwind/TeaGo/Tea"
2020-10-04 16:10:01 +08:00
"io"
2021-08-26 15:48:09 +08:00
"net"
2020-10-04 16:10:01 +08:00
"net/http"
2021-11-20 18:57:46 +08:00
"net/url"
2021-01-11 18:16:15 +08:00
"os/exec"
2020-10-04 14:30:42 +08:00
"strconv"
2020-10-04 16:10:01 +08:00
"strings"
"sync"
2020-10-04 14:30:42 +08:00
"time"
)
type APIStream struct {
stream pb . NodeService_NodeStreamClient
}
func NewAPIStream ( ) * APIStream {
return & APIStream { }
}
func ( this * APIStream ) Start ( ) {
2020-10-28 11:19:06 +08:00
isQuiting := false
events . On ( events . EventQuit , func ( ) {
isQuiting = true
} )
2020-10-04 14:30:42 +08:00
for {
2020-10-28 11:19:06 +08:00
if isQuiting {
return
}
2020-10-04 14:30:42 +08:00
err := this . loop ( )
if err != nil {
2021-11-04 11:14:27 +08:00
remotelogs . Warn ( "API_STREAM" , err . Error ( ) )
2020-10-04 14:30:42 +08:00
time . Sleep ( 10 * time . Second )
continue
}
time . Sleep ( 1 * time . Second )
}
}
func ( this * APIStream ) loop ( ) error {
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return errors . Wrap ( err )
}
2020-10-28 11:19:06 +08:00
isQuiting := false
2021-06-10 19:19:15 +08:00
ctx , cancelFunc := context . WithCancel ( rpcClient . Context ( ) )
nodeStream , err := rpcClient . NodeRPC ( ) . NodeStream ( ctx )
2020-10-28 11:19:06 +08:00
events . On ( events . EventQuit , func ( ) {
isQuiting = true
2021-06-10 19:19:15 +08:00
remotelogs . Println ( "API_STREAM" , "quiting" )
if nodeStream != nil {
cancelFunc ( )
}
2020-10-28 11:19:06 +08:00
} )
2020-10-04 14:30:42 +08:00
if err != nil {
2020-10-28 11:19:06 +08:00
if isQuiting {
return nil
}
2020-10-04 14:30:42 +08:00
return errors . Wrap ( err )
}
this . stream = nodeStream
2020-10-17 11:14:40 +08:00
2020-10-04 14:30:42 +08:00
for {
2020-10-28 11:19:06 +08:00
if isQuiting {
2021-11-04 11:14:27 +08:00
remotelogs . Println ( "API_STREAM" , "quit" )
2020-10-28 11:19:06 +08:00
break
}
2020-10-04 14:30:42 +08:00
message , err := nodeStream . Recv ( )
if err != nil {
2020-10-28 11:19:06 +08:00
if isQuiting {
2021-06-10 19:19:15 +08:00
remotelogs . Println ( "API_STREAM" , "quit" )
2020-10-28 11:19:06 +08:00
return nil
}
2020-10-04 14:30:42 +08:00
return errors . Wrap ( err )
}
// 处理消息
switch message . Code {
case messageconfigs . MessageCodeConnectedAPINode : // 连接API节点成功
err = this . handleConnectedAPINode ( message )
case messageconfigs . MessageCodeWriteCache : // 写入缓存
err = this . handleWriteCache ( message )
case messageconfigs . MessageCodeReadCache : // 读取缓存
err = this . handleReadCache ( message )
2020-10-04 16:10:01 +08:00
case messageconfigs . MessageCodeStatCache : // 统计缓存
err = this . handleStatCache ( message )
case messageconfigs . MessageCodeCleanCache : // 清理缓存
err = this . handleCleanCache ( message )
case messageconfigs . MessageCodePurgeCache : // 删除缓存
err = this . handlePurgeCache ( message )
case messageconfigs . MessageCodePreheatCache : // 预热缓存
err = this . handlePreheatCache ( message )
2021-01-17 16:47:37 +08:00
case messageconfigs . MessageCodeNewNodeTask : // 有新的任务
err = this . handleNewNodeTask ( message )
2021-01-11 18:16:15 +08:00
case messageconfigs . MessageCodeCheckSystemdService : // 检查Systemd服务
err = this . handleCheckSystemdService ( message )
2021-11-20 18:57:46 +08:00
case messageconfigs . MessageCodeChangeAPINode : // 修改API节点地址
err = this . handleChangeAPINode ( message )
2020-10-04 14:30:42 +08:00
default :
err = this . handleUnknownMessage ( message )
}
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "API_STREAM" , "handle message failed: " + err . Error ( ) )
2020-10-04 14:30:42 +08:00
}
}
2020-10-28 11:19:06 +08:00
return nil
2020-10-04 14:30:42 +08:00
}
// 连接API节点成功
func ( this * APIStream ) handleConnectedAPINode ( message * pb . NodeStreamMessage ) error {
// 更改连接的APINode信息
if len ( message . DataJSON ) == 0 {
return nil
}
msg := & messageconfigs . ConnectedAPINodeMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
return errors . Wrap ( err )
}
2021-12-15 20:46:10 +08:00
_ , err = rpc . SharedRPC ( )
2020-10-04 14:30:42 +08:00
if err != nil {
return errors . Wrap ( err )
}
2020-12-17 17:36:10 +08:00
remotelogs . Println ( "API_STREAM" , "connected to api node '" + strconv . FormatInt ( msg . APINodeId , 10 ) + "'" )
2021-10-01 11:13:36 +08:00
// 重新读取配置
if nodeConfigUpdatedAt == 0 {
select {
case nodeConfigChangedNotify <- true :
default :
}
}
2020-10-04 14:30:42 +08:00
return nil
}
// 写入缓存
func ( this * APIStream ) handleWriteCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . WriteCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
2020-10-04 16:10:01 +08:00
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
2020-10-04 14:30:42 +08:00
if err != nil {
return err
}
2020-10-04 16:10:01 +08:00
if shouldStop {
2020-10-04 14:30:42 +08:00
defer func ( ) {
storage . Stop ( )
} ( )
}
2020-10-05 16:55:14 +08:00
expiredAt := time . Now ( ) . Unix ( ) + msg . LifeSeconds
2021-01-13 12:02:50 +08:00
writer , err := storage . OpenWriter ( msg . Key , expiredAt , 200 )
2020-10-04 14:30:42 +08:00
if err != nil {
this . replyFail ( message . RequestId , "prepare writing failed: " + err . Error ( ) )
return err
}
2021-01-13 12:02:50 +08:00
// 写入一个空的Header
_ , err = writer . WriteHeader ( [ ] byte ( ":" ) )
if err != nil {
this . replyFail ( message . RequestId , "write failed: " + err . Error ( ) )
return err
}
// 写入数据
2020-10-05 16:55:14 +08:00
_ , err = writer . Write ( msg . Value )
2020-10-04 14:30:42 +08:00
if err != nil {
this . replyFail ( message . RequestId , "write failed: " + err . Error ( ) )
return err
}
2021-01-13 12:02:50 +08:00
2020-10-05 16:55:14 +08:00
err = writer . Close ( )
2021-01-13 12:02:50 +08:00
if err != nil {
this . replyFail ( message . RequestId , "write failed: " + err . Error ( ) )
return err
2020-10-05 16:55:14 +08:00
}
2021-01-13 12:02:50 +08:00
storage . AddToList ( & caches . Item {
2021-03-02 19:43:05 +08:00
Type : writer . ItemType ( ) ,
2021-01-13 12:02:50 +08:00
Key : msg . Key ,
ExpiredAt : expiredAt ,
HeaderSize : writer . HeaderSize ( ) ,
BodySize : writer . BodySize ( ) ,
} )
2020-10-04 14:30:42 +08:00
this . replyOk ( message . RequestId , "write ok" )
return nil
}
// 读取缓存
func ( this * APIStream ) handleReadCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
2020-10-04 16:10:01 +08:00
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
2020-10-04 14:30:42 +08:00
if err != nil {
return err
}
2020-10-04 16:10:01 +08:00
if shouldStop {
2020-10-04 14:30:42 +08:00
defer func ( ) {
storage . Stop ( )
} ( )
}
2021-01-13 12:02:50 +08:00
reader , err := storage . OpenReader ( msg . Key )
2020-10-04 14:30:42 +08:00
if err != nil {
if err == caches . ErrNotFound {
this . replyFail ( message . RequestId , "key not found" )
return nil
}
this . replyFail ( message . RequestId , "read key failed: " + err . Error ( ) )
2021-01-13 12:02:50 +08:00
return nil
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
defer func ( ) {
_ = reader . Close ( )
} ( )
2020-10-04 14:30:42 +08:00
2021-01-13 12:02:50 +08:00
this . replyOk ( message . RequestId , "value " + strconv . FormatInt ( reader . BodySize ( ) , 10 ) + " bytes" )
2020-10-04 14:30:42 +08:00
return nil
}
2020-10-04 16:10:01 +08:00
// 统计缓存
func ( this * APIStream ) handleStatCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
stat , err := storage . Stat ( )
if err != nil {
this . replyFail ( message . RequestId , "stat failed: " + err . Error ( ) )
return err
}
sizeFormat := ""
if stat . Size < 1024 {
sizeFormat = strconv . FormatInt ( stat . Size , 10 ) + " Bytes"
} else if stat . Size < 1024 * 1024 {
sizeFormat = fmt . Sprintf ( "%.2f KB" , float64 ( stat . Size ) / 1024 )
} else if stat . Size < 1024 * 1024 * 1024 {
sizeFormat = fmt . Sprintf ( "%.2f MB" , float64 ( stat . Size ) / 1024 / 1024 )
} else {
sizeFormat = fmt . Sprintf ( "%.2f GB" , float64 ( stat . Size ) / 1024 / 1024 / 1024 )
}
this . replyOk ( message . RequestId , "size:" + sizeFormat + ", count:" + strconv . Itoa ( stat . Count ) )
return nil
}
// 清理缓存
func ( this * APIStream ) handleCleanCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
err = storage . CleanAll ( )
if err != nil {
this . replyFail ( message . RequestId , "clean cache failed: " + err . Error ( ) )
return err
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
// 删除缓存
func ( this * APIStream ) handlePurgeCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . PurgeCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
2021-10-03 18:00:57 +08:00
// WEBP缓存
if msg . Type == "file" {
var keys = msg . Keys
for _ , key := range keys {
keys = append ( keys , key + webpSuffix )
}
msg . Keys = keys
}
2020-12-23 21:28:50 +08:00
err = storage . Purge ( msg . Keys , msg . Type )
2020-10-04 16:10:01 +08:00
if err != nil {
this . replyFail ( message . RequestId , "purge keys failed: " + err . Error ( ) )
return err
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
// 预热缓存
func ( this * APIStream ) handlePreheatCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . PreheatCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
if len ( msg . Keys ) == 0 {
this . replyOk ( message . RequestId , "ok" )
return nil
}
wg := sync . WaitGroup { }
wg . Add ( len ( msg . Keys ) )
2021-08-26 15:48:09 +08:00
client := & http . Client {
Timeout : 30 * time . Second , // TODO 可以设置请求超时时间
Transport : & http . Transport {
DialContext : func ( ctx context . Context , network , addr string ) ( net . Conn , error ) {
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
return nil , err
}
return net . Dial ( network , "127.0.0.1:" + port )
} ,
MaxIdleConns : 4096 ,
MaxIdleConnsPerHost : 32 ,
MaxConnsPerHost : 32 ,
IdleConnTimeout : 2 * time . Minute ,
ExpectContinueTimeout : 1 * time . Second ,
TLSHandshakeTimeout : 0 ,
TLSClientConfig : & tls . Config {
InsecureSkipVerify : true ,
} ,
} ,
}
defer client . CloseIdleConnections ( )
2020-10-04 16:10:01 +08:00
errorMessages := [ ] string { }
locker := sync . Mutex { }
for _ , key := range msg . Keys {
go func ( key string ) {
defer wg . Done ( )
req , err := http . NewRequest ( http . MethodGet , key , nil )
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "invalid url: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
2021-08-26 15:48:09 +08:00
2020-10-05 16:55:14 +08:00
// TODO 可以在管理界面自定义Header
2021-08-26 15:48:09 +08:00
req . Header . Set ( "X-Cache-Action" , "preheat" )
2020-10-04 16:10:01 +08:00
req . Header . Set ( "User-Agent" , "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36" )
req . Header . Set ( "Accept-Encoding" , "gzip, deflate, br" ) // TODO 这里需要记录下缓存是否为gzip的
resp , err := client . Do ( req )
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "request failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
2021-01-13 12:02:50 +08:00
if resp . StatusCode != 200 {
locker . Lock ( )
errorMessages = append ( errorMessages , "request failed: " + key + ": status code '" + strconv . Itoa ( resp . StatusCode ) + "'" )
locker . Unlock ( )
return
}
2020-10-04 16:10:01 +08:00
defer func ( ) {
_ = resp . Body . Close ( )
} ( )
2021-01-11 11:51:05 +08:00
// 检查最大内容长度
2021-01-11 18:16:15 +08:00
// TODO 需要解决Chunked Transfer Encoding的长度判断问题
2021-01-11 11:51:05 +08:00
maxSize := storage . Policy ( ) . MaxSizeBytes ( )
if maxSize > 0 && resp . ContentLength > maxSize {
locker . Lock ( )
errorMessages = append ( errorMessages , "request failed: the content is too larger than policy setting" )
locker . Unlock ( )
return
}
2020-10-05 16:55:14 +08:00
expiredAt := time . Now ( ) . Unix ( ) + 8600
2021-01-13 12:02:50 +08:00
writer , err := storage . OpenWriter ( key , expiredAt , 200 ) // TODO 可以设置缓存过期时间
2020-10-04 16:10:01 +08:00
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "open cache writer failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
buf := make ( [ ] byte , 16 * 1024 )
isClosed := false
2021-01-13 12:02:50 +08:00
// 写入Header
for k , v := range resp . Header {
for _ , v1 := range v {
_ , err = writer . WriteHeader ( [ ] byte ( k + ":" + v1 + "\n" ) )
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "write failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
}
}
// 写入Body
2020-10-04 16:10:01 +08:00
for {
n , err := resp . Body . Read ( buf )
if n > 0 {
2020-10-05 16:55:14 +08:00
_ , writerErr := writer . Write ( buf [ : n ] )
2020-10-04 16:10:01 +08:00
if writerErr != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "write failed: " + key + ": " + writerErr . Error ( ) )
locker . Unlock ( )
break
}
}
if err != nil {
if err == io . EOF {
2021-01-13 12:02:50 +08:00
2020-10-05 16:55:14 +08:00
err = writer . Close ( )
if err == nil {
storage . AddToList ( & caches . Item {
2021-03-02 19:43:05 +08:00
Type : writer . ItemType ( ) ,
2020-10-05 16:55:14 +08:00
Key : key ,
ExpiredAt : expiredAt ,
} )
}
2020-10-04 16:10:01 +08:00
isClosed = true
} else {
locker . Lock ( )
errorMessages = append ( errorMessages , "read url failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
}
break
}
}
if ! isClosed {
_ = writer . Close ( )
}
} ( key )
}
wg . Wait ( )
if len ( errorMessages ) > 0 {
this . replyFail ( message . RequestId , strings . Join ( errorMessages , ", " ) )
return nil
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
2020-10-09 12:03:53 +08:00
// 处理配置变化
2021-01-17 16:47:37 +08:00
func ( this * APIStream ) handleNewNodeTask ( message * pb . NodeStreamMessage ) error {
2020-10-09 12:03:53 +08:00
select {
2021-01-17 16:47:37 +08:00
case nodeTaskNotify <- true :
2020-11-10 09:22:17 +08:00
default :
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
2021-01-11 18:16:15 +08:00
// 检查Systemd服务
func ( this * APIStream ) handleCheckSystemdService ( message * pb . NodeStreamMessage ) error {
systemctl , err := exec . LookPath ( "systemctl" )
if err != nil {
this . replyFail ( message . RequestId , "'systemctl' not found" )
return nil
}
if len ( systemctl ) == 0 {
this . replyFail ( message . RequestId , "'systemctl' not found" )
return nil
}
cmd := utils . NewCommandExecutor ( )
shortName := teaconst . SystemdServiceName
cmd . Add ( systemctl , "is-enabled" , shortName )
output , err := cmd . Run ( )
if err != nil {
2021-01-13 12:02:50 +08:00
this . replyFail ( message . RequestId , "'systemctl' command error: " + err . Error ( ) )
2021-01-11 18:16:15 +08:00
return nil
}
if output == "enabled" {
this . replyOk ( message . RequestId , "ok" )
} else {
this . replyFail ( message . RequestId , "not installed" )
}
return nil
}
2021-11-20 18:57:46 +08:00
// 修改API地址
func ( this * APIStream ) handleChangeAPINode ( message * pb . NodeStreamMessage ) error {
config , err := configs . LoadAPIConfig ( )
if err != nil {
this . replyFail ( message . RequestId , "read config error: " + err . Error ( ) )
return nil
}
var messageData = & messageconfigs . ChangeAPINodeMessage { }
err = json . Unmarshal ( message . DataJSON , messageData )
if err != nil {
this . replyFail ( message . RequestId , "unmarshal message failed: " + err . Error ( ) )
return nil
}
_ , err = url . Parse ( messageData . Addr )
if err != nil {
this . replyFail ( message . RequestId , "invalid new api node address: '" + messageData . Addr + "'" )
return nil
}
config . RPC . Endpoints = [ ] string { messageData . Addr }
// 保存到文件
err = config . WriteFile ( Tea . ConfigFile ( "api.yaml" ) )
if err != nil {
this . replyFail ( message . RequestId , "save config file failed: " + err . Error ( ) )
return nil
}
this . replyOk ( message . RequestId , "" )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-11-20 18:57:46 +08:00
// 延后生效, 防止变更前的API无法读取到状态
time . Sleep ( 1 * time . Second )
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
remotelogs . Error ( "API_STREAM" , "change rpc endpoint to '" +
messageData . Addr + "' failed: " + err . Error ( ) )
return
}
rpcClient . Close ( )
err = rpcClient . UpdateConfig ( config )
if err != nil {
remotelogs . Error ( "API_STREAM" , "change rpc endpoint to '" +
messageData . Addr + "' failed: " + err . Error ( ) )
return
}
remotelogs . Println ( "API_STREAM" , "change rpc endpoint to '" +
messageData . Addr + "' successfully" )
2021-12-08 15:17:45 +08:00
} )
2021-11-20 18:57:46 +08:00
return nil
}
2020-10-04 14:30:42 +08:00
// 处理未知消息
func ( this * APIStream ) handleUnknownMessage ( message * pb . NodeStreamMessage ) error {
this . replyFail ( message . RequestId , "unknown message code '" + message . Code + "'" )
return nil
}
// 回复失败
func ( this * APIStream ) replyFail ( requestId int64 , message string ) {
_ = this . stream . Send ( & pb . NodeStreamMessage { RequestId : requestId , IsOk : false , Message : message } )
}
// 回复成功
func ( this * APIStream ) replyOk ( requestId int64 , message string ) {
_ = this . stream . Send ( & pb . NodeStreamMessage { RequestId : requestId , IsOk : true , Message : message } )
}
2020-10-04 16:10:01 +08:00
// 获取缓存存取对象
func ( this * APIStream ) cacheStorage ( message * pb . NodeStreamMessage , cachePolicyJSON [ ] byte ) ( storage caches . StorageInterface , shouldStop bool , err error ) {
cachePolicy := & serverconfigs . HTTPCachePolicy { }
err = json . Unmarshal ( cachePolicyJSON , cachePolicy )
if err != nil {
this . replyFail ( message . RequestId , "decode cache policy config failed: " + err . Error ( ) )
return nil , false , err
}
storage = caches . SharedManager . FindStorageWithPolicy ( cachePolicy . Id )
if storage == nil {
storage = caches . SharedManager . NewStorageWithPolicy ( cachePolicy )
if storage == nil {
this . replyFail ( message . RequestId , "invalid storage type '" + cachePolicy . Type + "'" )
2020-10-05 19:15:35 +08:00
return nil , false , errors . New ( "invalid storage type '" + cachePolicy . Type + "'" )
2020-10-04 16:10:01 +08:00
}
err = storage . Init ( )
if err != nil {
this . replyFail ( message . RequestId , "storage init failed: " + err . Error ( ) )
return nil , false , err
}
shouldStop = true
}
return
}