2020-10-04 14:30:42 +08:00
package nodes
import (
"encoding/json"
2020-10-04 16:10:01 +08:00
"fmt"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/errors"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2020-10-09 11:06:43 +08:00
"github.com/TeaOSLab/EdgeNode/internal/logs"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2020-10-04 16:10:01 +08:00
"io"
"net/http"
2020-10-04 14:30:42 +08:00
"strconv"
2020-10-04 16:10:01 +08:00
"strings"
"sync"
2020-10-04 14:30:42 +08:00
"time"
)
type APIStream struct {
stream pb . NodeService_NodeStreamClient
}
func NewAPIStream ( ) * APIStream {
return & APIStream { }
}
func ( this * APIStream ) Start ( ) {
2020-10-28 11:19:06 +08:00
isQuiting := false
events . On ( events . EventQuit , func ( ) {
isQuiting = true
} )
2020-10-04 14:30:42 +08:00
for {
2020-10-28 11:19:06 +08:00
if isQuiting {
return
}
2020-10-04 14:30:42 +08:00
err := this . loop ( )
if err != nil {
2020-10-09 11:06:43 +08:00
logs . Error ( "API_STREAM" , err . Error ( ) )
2020-10-04 14:30:42 +08:00
time . Sleep ( 10 * time . Second )
continue
}
time . Sleep ( 1 * time . Second )
}
}
func ( this * APIStream ) loop ( ) error {
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return errors . Wrap ( err )
}
2020-10-28 11:19:06 +08:00
isQuiting := false
events . On ( events . EventQuit , func ( ) {
isQuiting = true
} )
2020-10-04 14:30:42 +08:00
nodeStream , err := rpcClient . NodeRPC ( ) . NodeStream ( rpcClient . Context ( ) )
if err != nil {
2020-10-28 11:19:06 +08:00
if isQuiting {
return nil
}
2020-10-04 14:30:42 +08:00
return errors . Wrap ( err )
}
this . stream = nodeStream
2020-10-17 11:14:40 +08:00
2020-10-04 14:30:42 +08:00
for {
2020-10-28 11:19:06 +08:00
if isQuiting {
break
}
2020-10-04 14:30:42 +08:00
message , err := nodeStream . Recv ( )
if err != nil {
2020-10-28 11:19:06 +08:00
if isQuiting {
return nil
}
2020-10-04 14:30:42 +08:00
return errors . Wrap ( err )
}
// 处理消息
switch message . Code {
case messageconfigs . MessageCodeConnectedAPINode : // 连接API节点成功
err = this . handleConnectedAPINode ( message )
case messageconfigs . MessageCodeWriteCache : // 写入缓存
err = this . handleWriteCache ( message )
case messageconfigs . MessageCodeReadCache : // 读取缓存
err = this . handleReadCache ( message )
2020-10-04 16:10:01 +08:00
case messageconfigs . MessageCodeStatCache : // 统计缓存
err = this . handleStatCache ( message )
case messageconfigs . MessageCodeCleanCache : // 清理缓存
err = this . handleCleanCache ( message )
case messageconfigs . MessageCodePurgeCache : // 删除缓存
err = this . handlePurgeCache ( message )
case messageconfigs . MessageCodePreheatCache : // 预热缓存
err = this . handlePreheatCache ( message )
2020-10-09 12:03:53 +08:00
case messageconfigs . MessageCodeConfigChanged : // 配置变化
err = this . handleConfigChanged ( message )
2020-10-04 14:30:42 +08:00
default :
err = this . handleUnknownMessage ( message )
}
if err != nil {
2020-10-09 11:06:43 +08:00
logs . Error ( "API_STREAM" , "handle message failed: " + err . Error ( ) )
2020-10-04 14:30:42 +08:00
}
}
2020-10-28 11:19:06 +08:00
return nil
2020-10-04 14:30:42 +08:00
}
// 连接API节点成功
func ( this * APIStream ) handleConnectedAPINode ( message * pb . NodeStreamMessage ) error {
// 更改连接的APINode信息
if len ( message . DataJSON ) == 0 {
return nil
}
msg := & messageconfigs . ConnectedAPINodeMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
return errors . Wrap ( err )
}
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return errors . Wrap ( err )
}
_ , err = rpcClient . NodeRPC ( ) . UpdateNodeConnectedAPINodes ( rpcClient . Context ( ) , & pb . UpdateNodeConnectedAPINodesRequest { ApiNodeIds : [ ] int64 { msg . APINodeId } } )
if err != nil {
return errors . Wrap ( err )
}
2020-10-09 11:06:43 +08:00
logs . Println ( "API_STREAM" , "connected to api node '" + strconv . FormatInt ( msg . APINodeId , 10 ) + "'" )
2020-10-04 14:30:42 +08:00
return nil
}
// 写入缓存
func ( this * APIStream ) handleWriteCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . WriteCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
2020-10-04 16:10:01 +08:00
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
2020-10-04 14:30:42 +08:00
if err != nil {
return err
}
2020-10-04 16:10:01 +08:00
if shouldStop {
2020-10-04 14:30:42 +08:00
defer func ( ) {
storage . Stop ( )
} ( )
}
2020-10-05 16:55:14 +08:00
expiredAt := time . Now ( ) . Unix ( ) + msg . LifeSeconds
writer , err := storage . Open ( msg . Key , expiredAt )
2020-10-04 14:30:42 +08:00
if err != nil {
this . replyFail ( message . RequestId , "prepare writing failed: " + err . Error ( ) )
return err
}
2020-10-05 16:55:14 +08:00
_ , err = writer . Write ( msg . Value )
2020-10-04 14:30:42 +08:00
if err != nil {
2020-10-05 16:55:14 +08:00
_ = writer . Discard ( )
2020-10-04 14:30:42 +08:00
this . replyFail ( message . RequestId , "write failed: " + err . Error ( ) )
return err
}
2020-10-05 16:55:14 +08:00
err = writer . Close ( )
if err == nil {
storage . AddToList ( & caches . Item {
Key : msg . Key ,
ExpiredAt : expiredAt ,
} )
}
2020-10-04 14:30:42 +08:00
this . replyOk ( message . RequestId , "write ok" )
return nil
}
// 读取缓存
func ( this * APIStream ) handleReadCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
2020-10-04 16:10:01 +08:00
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
2020-10-04 14:30:42 +08:00
if err != nil {
return err
}
2020-10-04 16:10:01 +08:00
if shouldStop {
2020-10-04 14:30:42 +08:00
defer func ( ) {
storage . Stop ( )
} ( )
}
buf := make ( [ ] byte , 1024 )
size := 0
2020-10-05 16:55:14 +08:00
err = storage . Read ( msg . Key , buf , func ( data [ ] byte , valueSize int64 , expiredAt int64 , isEOF bool ) {
2020-10-04 14:30:42 +08:00
size += len ( data )
} )
if err != nil {
if err == caches . ErrNotFound {
this . replyFail ( message . RequestId , "key not found" )
return nil
}
this . replyFail ( message . RequestId , "read key failed: " + err . Error ( ) )
return err
}
this . replyOk ( message . RequestId , "value " + strconv . Itoa ( size ) + " bytes" )
return nil
}
2020-10-04 16:10:01 +08:00
// 统计缓存
func ( this * APIStream ) handleStatCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
stat , err := storage . Stat ( )
if err != nil {
this . replyFail ( message . RequestId , "stat failed: " + err . Error ( ) )
return err
}
sizeFormat := ""
if stat . Size < 1024 {
sizeFormat = strconv . FormatInt ( stat . Size , 10 ) + " Bytes"
} else if stat . Size < 1024 * 1024 {
sizeFormat = fmt . Sprintf ( "%.2f KB" , float64 ( stat . Size ) / 1024 )
} else if stat . Size < 1024 * 1024 * 1024 {
sizeFormat = fmt . Sprintf ( "%.2f MB" , float64 ( stat . Size ) / 1024 / 1024 )
} else {
sizeFormat = fmt . Sprintf ( "%.2f GB" , float64 ( stat . Size ) / 1024 / 1024 / 1024 )
}
this . replyOk ( message . RequestId , "size:" + sizeFormat + ", count:" + strconv . Itoa ( stat . Count ) )
return nil
}
// 清理缓存
func ( this * APIStream ) handleCleanCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . ReadCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
err = storage . CleanAll ( )
if err != nil {
this . replyFail ( message . RequestId , "clean cache failed: " + err . Error ( ) )
return err
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
// 删除缓存
func ( this * APIStream ) handlePurgeCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . PurgeCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
err = storage . Purge ( msg . Keys )
if err != nil {
this . replyFail ( message . RequestId , "purge keys failed: " + err . Error ( ) )
return err
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
// 预热缓存
func ( this * APIStream ) handlePreheatCache ( message * pb . NodeStreamMessage ) error {
msg := & messageconfigs . PreheatCacheMessage { }
err := json . Unmarshal ( message . DataJSON , msg )
if err != nil {
this . replyFail ( message . RequestId , "decode message data failed: " + err . Error ( ) )
return err
}
storage , shouldStop , err := this . cacheStorage ( message , msg . CachePolicyJSON )
if err != nil {
return err
}
if shouldStop {
defer func ( ) {
storage . Stop ( )
} ( )
}
if len ( msg . Keys ) == 0 {
this . replyOk ( message . RequestId , "ok" )
return nil
}
wg := sync . WaitGroup { }
wg . Add ( len ( msg . Keys ) )
client := http . Client { } // TODO 可以设置请求超时事件
errorMessages := [ ] string { }
locker := sync . Mutex { }
for _ , key := range msg . Keys {
go func ( key string ) {
defer wg . Done ( )
req , err := http . NewRequest ( http . MethodGet , key , nil )
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "invalid url: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
2020-10-05 16:55:14 +08:00
// TODO 可以在管理界面自定义Header
2020-10-04 16:10:01 +08:00
req . Header . Set ( "User-Agent" , "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36" )
req . Header . Set ( "Accept-Encoding" , "gzip, deflate, br" ) // TODO 这里需要记录下缓存是否为gzip的
resp , err := client . Do ( req )
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "request failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
defer func ( ) {
_ = resp . Body . Close ( )
} ( )
2020-10-05 16:55:14 +08:00
expiredAt := time . Now ( ) . Unix ( ) + 8600
writer , err := storage . Open ( key , expiredAt ) // TODO 可以设置缓存过期事件
2020-10-04 16:10:01 +08:00
if err != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "open cache writer failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
return
}
buf := make ( [ ] byte , 16 * 1024 )
isClosed := false
for {
n , err := resp . Body . Read ( buf )
if n > 0 {
2020-10-05 16:55:14 +08:00
_ , writerErr := writer . Write ( buf [ : n ] )
2020-10-04 16:10:01 +08:00
if writerErr != nil {
locker . Lock ( )
errorMessages = append ( errorMessages , "write failed: " + key + ": " + writerErr . Error ( ) )
locker . Unlock ( )
break
}
}
if err != nil {
if err == io . EOF {
2020-10-05 16:55:14 +08:00
err = writer . Close ( )
if err == nil {
storage . AddToList ( & caches . Item {
Key : key ,
ExpiredAt : expiredAt ,
} )
}
2020-10-04 16:10:01 +08:00
isClosed = true
} else {
locker . Lock ( )
errorMessages = append ( errorMessages , "read url failed: " + key + ": " + err . Error ( ) )
locker . Unlock ( )
}
break
}
}
if ! isClosed {
_ = writer . Close ( )
}
} ( key )
}
wg . Wait ( )
if len ( errorMessages ) > 0 {
this . replyFail ( message . RequestId , strings . Join ( errorMessages , ", " ) )
return nil
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
2020-10-09 12:03:53 +08:00
// 处理配置变化
func ( this * APIStream ) handleConfigChanged ( message * pb . NodeStreamMessage ) error {
select {
case changeNotify <- true :
default :
}
this . replyOk ( message . RequestId , "ok" )
return nil
}
2020-10-04 14:30:42 +08:00
// 处理未知消息
func ( this * APIStream ) handleUnknownMessage ( message * pb . NodeStreamMessage ) error {
this . replyFail ( message . RequestId , "unknown message code '" + message . Code + "'" )
return nil
}
// 回复失败
func ( this * APIStream ) replyFail ( requestId int64 , message string ) {
_ = this . stream . Send ( & pb . NodeStreamMessage { RequestId : requestId , IsOk : false , Message : message } )
}
// 回复成功
func ( this * APIStream ) replyOk ( requestId int64 , message string ) {
_ = this . stream . Send ( & pb . NodeStreamMessage { RequestId : requestId , IsOk : true , Message : message } )
}
2020-10-04 16:10:01 +08:00
// 获取缓存存取对象
func ( this * APIStream ) cacheStorage ( message * pb . NodeStreamMessage , cachePolicyJSON [ ] byte ) ( storage caches . StorageInterface , shouldStop bool , err error ) {
cachePolicy := & serverconfigs . HTTPCachePolicy { }
err = json . Unmarshal ( cachePolicyJSON , cachePolicy )
if err != nil {
this . replyFail ( message . RequestId , "decode cache policy config failed: " + err . Error ( ) )
return nil , false , err
}
storage = caches . SharedManager . FindStorageWithPolicy ( cachePolicy . Id )
if storage == nil {
storage = caches . SharedManager . NewStorageWithPolicy ( cachePolicy )
if storage == nil {
this . replyFail ( message . RequestId , "invalid storage type '" + cachePolicy . Type + "'" )
2020-10-05 19:15:35 +08:00
return nil , false , errors . New ( "invalid storage type '" + cachePolicy . Type + "'" )
2020-10-04 16:10:01 +08:00
}
err = storage . Init ( )
if err != nil {
this . replyFail ( message . RequestId , "storage init failed: " + err . Error ( ) )
return nil , false , err
}
shouldStop = true
}
return
}