mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-12-30 09:56:35 +08:00
[API节点]支持HTTP API
This commit is contained in:
@@ -75,72 +75,7 @@ func (this *APINode) Start() {
|
||||
// 监听RPC服务
|
||||
remotelogs.Println("API_NODE", "starting RPC server ...")
|
||||
|
||||
// HTTP
|
||||
httpConfig, err := apiNode.DecodeHTTP()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode http config: "+err.Error())
|
||||
return
|
||||
}
|
||||
isListening := false
|
||||
if httpConfig != nil && httpConfig.IsOn && len(httpConfig.Listen) > 0 {
|
||||
for _, listen := range httpConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
err := this.listenRPC(listener, nil)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' rpc: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPS
|
||||
httpsConfig, err := apiNode.DecodeHTTPS()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode https config: "+err.Error())
|
||||
return
|
||||
}
|
||||
if httpsConfig != nil &&
|
||||
httpsConfig.IsOn &&
|
||||
len(httpsConfig.Listen) > 0 &&
|
||||
httpsConfig.SSLPolicy != nil &&
|
||||
httpsConfig.SSLPolicy.IsOn &&
|
||||
len(httpsConfig.SSLPolicy.Certs) > 0 {
|
||||
certs := []tls.Certificate{}
|
||||
for _, cert := range httpsConfig.SSLPolicy.Certs {
|
||||
certs = append(certs, *cert.CertObject())
|
||||
}
|
||||
|
||||
for _, listen := range httpsConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
err := this.listenRPC(listener, &tls.Config{
|
||||
Certificates: certs,
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' rpc: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HTTP接口
|
||||
isListening := this.listenPorts(apiNode)
|
||||
|
||||
if !isListening {
|
||||
remotelogs.Error("API_NODE", "the api node require at least one listening address")
|
||||
@@ -155,10 +90,10 @@ func (this *APINode) Start() {
|
||||
func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) error {
|
||||
var rpcServer *grpc.Server
|
||||
if tlsConfig == nil {
|
||||
remotelogs.Println("API_NODE", "listening http://"+listener.Addr().String()+" ...")
|
||||
remotelogs.Println("API_NODE", "listening GRPC http://"+listener.Addr().String()+" ...")
|
||||
rpcServer = grpc.NewServer()
|
||||
} else {
|
||||
logs.Println("[API_NODE]listening https://" + listener.Addr().String() + " ...")
|
||||
logs.Println("[API_NODE]listening GRPC https://" + listener.Addr().String() + " ...")
|
||||
rpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
pb.RegisterAdminServiceServer(rpcServer, &services.AdminService{})
|
||||
@@ -264,3 +199,142 @@ func (this *APINode) autoUpgrade() error {
|
||||
logs.Println("[API_NODE]upgrade database done")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 启动端口
|
||||
func (this *APINode) listenPorts(apiNode *models.APINode) (isListening bool) {
|
||||
// HTTP
|
||||
httpConfig, err := apiNode.DecodeHTTP()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode http config: "+err.Error())
|
||||
return
|
||||
}
|
||||
isListening = false
|
||||
if httpConfig != nil && httpConfig.IsOn && len(httpConfig.Listen) > 0 {
|
||||
for _, listen := range httpConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
err := this.listenRPC(listener, nil)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' rpc: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPS
|
||||
httpsConfig, err := apiNode.DecodeHTTPS()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode https config: "+err.Error())
|
||||
return
|
||||
}
|
||||
if httpsConfig != nil &&
|
||||
httpsConfig.IsOn &&
|
||||
len(httpsConfig.Listen) > 0 &&
|
||||
httpsConfig.SSLPolicy != nil &&
|
||||
httpsConfig.SSLPolicy.IsOn &&
|
||||
len(httpsConfig.SSLPolicy.Certs) > 0 {
|
||||
certs := []tls.Certificate{}
|
||||
for _, cert := range httpsConfig.SSLPolicy.Certs {
|
||||
certs = append(certs, *cert.CertObject())
|
||||
}
|
||||
|
||||
for _, listen := range httpsConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
err := this.listenRPC(listener, &tls.Config{
|
||||
Certificates: certs,
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening '"+addr+"' rpc: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rest HTTP
|
||||
restHTTPConfig, err := apiNode.DecodeRestHTTP()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode REST http config: "+err.Error())
|
||||
return
|
||||
}
|
||||
if restHTTPConfig != nil && restHTTPConfig.IsOn && len(restHTTPConfig.Listen) > 0 {
|
||||
for _, listen := range restHTTPConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening REST 'http://"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
remotelogs.Println("API_NODE", "listening REST http://"+addr+" ...")
|
||||
server := &RestServer{}
|
||||
err := server.Listen(listener)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening REST 'http://"+addr+"' failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rest HTTPS
|
||||
restHTTPSConfig, err := apiNode.DecodeRestHTTPS()
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "decode REST https config: "+err.Error())
|
||||
return
|
||||
}
|
||||
if restHTTPSConfig != nil &&
|
||||
restHTTPSConfig.IsOn &&
|
||||
len(restHTTPSConfig.Listen) > 0 &&
|
||||
restHTTPSConfig.SSLPolicy != nil &&
|
||||
restHTTPSConfig.SSLPolicy.IsOn &&
|
||||
len(restHTTPSConfig.SSLPolicy.Certs) > 0 {
|
||||
for _, listen := range restHTTPSConfig.Listen {
|
||||
for _, addr := range listen.Addresses() {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening REST 'https://"+addr+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
remotelogs.Println("API_NODE", "listening REST https://"+addr+" ...")
|
||||
server := &RestServer{}
|
||||
|
||||
certs := []tls.Certificate{}
|
||||
for _, cert := range httpsConfig.SSLPolicy.Certs {
|
||||
certs = append(certs, *cert.CertObject())
|
||||
}
|
||||
|
||||
err := server.ListenHTTPS(listener, &tls.Config{
|
||||
Certificates: certs,
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("API_NODE", "listening REST 'https://"+addr+"' failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
isListening = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
189
internal/nodes/rest_server.go
Normal file
189
internal/nodes/rest_server.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
var servicePathReg = regexp.MustCompile(`^/([a-zA-Z0-9]+)/([a-zA-Z0-9]+)$`)
|
||||
var servicesMap = map[string]reflect.Value{
|
||||
"APIAccessTokenService": reflect.ValueOf(new(services.APIAccessTokenService)),
|
||||
"HTTPAccessLogService": reflect.ValueOf(new(services.HTTPAccessLogService)),
|
||||
}
|
||||
|
||||
type RestServer struct{}
|
||||
|
||||
func (this *RestServer) Listen(listener net.Listener) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", this.handle)
|
||||
server := &http.Server{}
|
||||
server.Handler = mux
|
||||
return server.Serve(listener)
|
||||
}
|
||||
|
||||
func (this *RestServer) ListenHTTPS(listener net.Listener, tlsConfig *tls.Config) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", this.handle)
|
||||
server := &http.Server{}
|
||||
server.Handler = mux
|
||||
server.TLSConfig = tlsConfig
|
||||
return server.ServeTLS(listener, "", "")
|
||||
}
|
||||
|
||||
func (this *RestServer) handle(writer http.ResponseWriter, req *http.Request) {
|
||||
path := req.URL.Path
|
||||
matches := servicePathReg.FindStringSubmatch(path)
|
||||
if len(matches) != 3 {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
serviceName := matches[1]
|
||||
methodName := matches[2]
|
||||
|
||||
serviceType, ok := servicesMap[serviceName]
|
||||
if !ok {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
method := serviceType.MethodByName(methodName)
|
||||
if !method.IsValid() {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if method.Type().NumIn() != 2 || method.Type().NumOut() != 2 {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if method.Type().In(0).Name() != "Context" {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// 是否显示Pretty后的JSON
|
||||
shouldPretty := req.Header.Get("Edge-Response-Pretty") == "on"
|
||||
|
||||
// 上下文
|
||||
ctx := context.Background()
|
||||
|
||||
if serviceName != "APIAccessTokenService" || methodName != "GetAPIAccessToken" {
|
||||
// 校验TOKEN
|
||||
token := req.Header.Get("Edge-Access-Token")
|
||||
if len(token) == 0 {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 400,
|
||||
"data": maps.Map{},
|
||||
"message": "require 'Edge-Access-Token' header",
|
||||
}, shouldPretty)
|
||||
return
|
||||
}
|
||||
|
||||
accessToken, err := models.SharedAPIAccessTokenDAO.FindAccessToken(token)
|
||||
if err != nil {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 400,
|
||||
"data": maps.Map{},
|
||||
"message": "server error: " + err.Error(),
|
||||
}, shouldPretty)
|
||||
return
|
||||
}
|
||||
|
||||
if accessToken == nil || int64(accessToken.ExpiredAt) < time.Now().Unix() {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 400,
|
||||
"data": maps.Map{},
|
||||
"message": "invalid access token",
|
||||
}, shouldPretty)
|
||||
return
|
||||
}
|
||||
|
||||
if accessToken.UserId > 0 {
|
||||
ctx = rpcutils.NewPlainContext("user", int64(accessToken.UserId))
|
||||
} else {
|
||||
// TODO 支持更多类型的角色
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 400,
|
||||
"data": maps.Map{},
|
||||
"message": "not supported role",
|
||||
}, shouldPretty)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 需要防止BODY过大攻击
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = writer.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
// 请求数据
|
||||
reqValue := reflect.New(method.Type().In(1).Elem()).Interface()
|
||||
err = json.Unmarshal(body, reqValue)
|
||||
if err != nil {
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = writer.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
result := method.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(reqValue)})
|
||||
resultErr := result[1].Interface()
|
||||
if resultErr != nil {
|
||||
e, ok := resultErr.(error)
|
||||
if ok {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 400,
|
||||
"message": e.Error(),
|
||||
"data": maps.Map{},
|
||||
}, shouldPretty)
|
||||
} else {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 500,
|
||||
"message": "server error: server should return a error object, but return a " + result[1].Type().String(),
|
||||
"data": maps.Map{},
|
||||
}, shouldPretty)
|
||||
}
|
||||
} else { // 没有返回错误
|
||||
data := maps.Map{
|
||||
"code": 200,
|
||||
"message": "ok",
|
||||
"data": result[0].Interface(),
|
||||
}
|
||||
var dataJSON []byte
|
||||
if shouldPretty {
|
||||
dataJSON = data.AsPrettyJSON()
|
||||
} else {
|
||||
dataJSON = data.AsJSON()
|
||||
}
|
||||
if err != nil {
|
||||
this.writeJSON(writer, maps.Map{
|
||||
"code": 500,
|
||||
"message": "server error: marshal json failed: " + err.Error(),
|
||||
"data": maps.Map{},
|
||||
}, shouldPretty)
|
||||
} else {
|
||||
_, _ = writer.Write(dataJSON)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *RestServer) writeJSON(writer http.ResponseWriter, v maps.Map, pretty bool) {
|
||||
if pretty {
|
||||
_, _ = writer.Write(v.AsPrettyJSON())
|
||||
} else {
|
||||
_, _ = writer.Write(v.AsJSON())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user