mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-07 02:20:24 +08:00
删除不必要的文件
This commit is contained in:
@@ -1,71 +0,0 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BaseStorage struct {
|
||||
isOk bool
|
||||
version int
|
||||
firewallOnly bool
|
||||
}
|
||||
|
||||
func (this *BaseStorage) SetVersion(version int) {
|
||||
this.version = version
|
||||
}
|
||||
|
||||
func (this *BaseStorage) Version() int {
|
||||
return this.version
|
||||
}
|
||||
|
||||
func (this *BaseStorage) IsOk() bool {
|
||||
return this.isOk
|
||||
}
|
||||
|
||||
func (this *BaseStorage) SetOk(isOk bool) {
|
||||
this.isOk = isOk
|
||||
}
|
||||
|
||||
func (this *BaseStorage) SetFirewallOnly(firewallOnly bool) {
|
||||
this.firewallOnly = firewallOnly
|
||||
}
|
||||
|
||||
// Marshal 对日志进行编码
|
||||
func (this *BaseStorage) Marshal(accessLog *pb.HTTPAccessLog) ([]byte, error) {
|
||||
return json.Marshal(accessLog)
|
||||
}
|
||||
|
||||
// FormatVariables 格式化字符串中的变量
|
||||
func (this *BaseStorage) FormatVariables(s string) string {
|
||||
var now = time.Now()
|
||||
return configutils.ParseVariables(s, func(varName string) (value string) {
|
||||
switch varName {
|
||||
case "year":
|
||||
return strconv.Itoa(now.Year())
|
||||
case "month":
|
||||
return fmt.Sprintf("%02d", now.Month())
|
||||
case "week":
|
||||
_, week := now.ISOWeek()
|
||||
return fmt.Sprintf("%02d", week)
|
||||
case "day":
|
||||
return fmt.Sprintf("%02d", now.Day())
|
||||
case "hour":
|
||||
return fmt.Sprintf("%02d", now.Hour())
|
||||
case "minute":
|
||||
return fmt.Sprintf("%02d", now.Minute())
|
||||
case "second":
|
||||
return fmt.Sprintf("%02d", now.Second())
|
||||
case "date":
|
||||
return fmt.Sprintf("%d%02d%02d", now.Year(), now.Month(), now.Day())
|
||||
}
|
||||
|
||||
return varName
|
||||
})
|
||||
}
|
||||
@@ -1,99 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// CommandStorage 通过命令行存储
|
||||
type CommandStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogCommandStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
}
|
||||
|
||||
func NewCommandStorage(config *serverconfigs.AccessLogCommandStorageConfig) *CommandStorage {
|
||||
return &CommandStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *CommandStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 启动
|
||||
func (this *CommandStorage) Start() error {
|
||||
if len(this.config.Command) == 0 {
|
||||
return errors.New("'command' should not be empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *CommandStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
cmd := exec.Command(this.config.Command, this.config.Args...)
|
||||
if len(this.config.Dir) > 0 {
|
||||
cmd.Dir = this.config.Dir
|
||||
}
|
||||
|
||||
stdout := bytes.NewBuffer([]byte{})
|
||||
cmd.Stdout = stdout
|
||||
|
||||
w, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, accessLog := range accessLogs {
|
||||
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = w.Write(data)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
|
||||
_, err = w.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
}
|
||||
_ = w.Close()
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
|
||||
if stdout.Len() > 0 {
|
||||
logs.Error(errors.New(string(stdout.Bytes())))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *CommandStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCommandStorage_Write(t *testing.T) {
|
||||
php, err := exec.LookPath("php")
|
||||
if err != nil { // not found php, so we can not test
|
||||
t.Log("php:", err)
|
||||
return
|
||||
}
|
||||
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
before := time.Now()
|
||||
|
||||
storage := NewCommandStorage(&serverconfigs.AccessLogCommandStorageConfig{
|
||||
Command: php,
|
||||
Args: []string{cwd + "/tests/command_storage.php"},
|
||||
})
|
||||
err = storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/hello",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/world",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/lu",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/ping",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(time.Since(before).Seconds(), "seconds")
|
||||
}
|
||||
@@ -1,131 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ESStorage ElasticSearch存储策略
|
||||
type ESStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogESStorageConfig
|
||||
}
|
||||
|
||||
func NewESStorage(config *serverconfigs.AccessLogESStorageConfig) *ESStorage {
|
||||
return &ESStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *ESStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *ESStorage) Start() error {
|
||||
if len(this.config.Endpoint) == 0 {
|
||||
return errors.New("'endpoint' should not be nil")
|
||||
}
|
||||
if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.config.Endpoint) {
|
||||
this.config.Endpoint = "http://" + this.config.Endpoint
|
||||
}
|
||||
if len(this.config.Index) == 0 {
|
||||
return errors.New("'index' should not be nil")
|
||||
}
|
||||
if len(this.config.MappingType) == 0 {
|
||||
return errors.New("'mappingType' should not be nil")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *ESStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
bulk := &strings.Builder{}
|
||||
indexName := this.FormatVariables(this.config.Index)
|
||||
typeName := this.FormatVariables(this.config.MappingType)
|
||||
for _, accessLog := range accessLogs {
|
||||
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(accessLog.RequestId) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
opData, err := json.Marshal(map[string]interface{}{
|
||||
"index": map[string]interface{}{
|
||||
"_index": indexName,
|
||||
"_type": typeName,
|
||||
"_id": accessLog.RequestId,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_ES_STORAGE", "write failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_ES_STORAGE", "marshal data failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
bulk.Write(opData)
|
||||
bulk.WriteString("\n")
|
||||
bulk.Write(data)
|
||||
bulk.WriteString("\n")
|
||||
}
|
||||
|
||||
if bulk.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, this.config.Endpoint+"/_bulk", strings.NewReader(bulk.String()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", strings.ReplaceAll(teaconst.ProductName, " ", "-")+"/"+teaconst.Version)
|
||||
if len(this.config.Username) > 0 || len(this.config.Password) > 0 {
|
||||
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(this.config.Username+":"+this.config.Password)))
|
||||
}
|
||||
client := utils.SharedHttpClient(10 * time.Second)
|
||||
defer func() {
|
||||
_ = req.Body.Close()
|
||||
}()
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
bodyData, _ := io.ReadAll(resp.Body)
|
||||
return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *ESStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,53 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestESStorage_Write(t *testing.T) {
|
||||
storage := NewESStorage(&serverconfigs.AccessLogESStorageConfig{
|
||||
Endpoint: "http://127.0.0.1:9200",
|
||||
Index: "logs",
|
||||
MappingType: "accessLogs",
|
||||
Username: "hello",
|
||||
Password: "world",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
TimeISO8601: "2018-07-23T22:23:35+08:00",
|
||||
Header: map[string]*pb.Strings{
|
||||
"Content-Type": {Values: []string{"text/html"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
TimeISO8601: "2018-07-23T22:23:35+08:00",
|
||||
Header: map[string]*pb.Strings{
|
||||
"Content-Type": {Values: []string{"text/css"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,130 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// FileStorage 文件存储策略
|
||||
type FileStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogFileStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
|
||||
files map[string]*os.File // path => *File
|
||||
filesLocker sync.Mutex
|
||||
}
|
||||
|
||||
func NewFileStorage(config *serverconfigs.AccessLogFileStorageConfig) *FileStorage {
|
||||
return &FileStorage{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FileStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *FileStorage) Start() error {
|
||||
if len(this.config.Path) == 0 {
|
||||
return errors.New("'path' should not be empty")
|
||||
}
|
||||
|
||||
this.files = map[string]*os.File{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write 写入日志
|
||||
func (this *FileStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
fp := this.fp()
|
||||
if fp == nil {
|
||||
return errors.New("file pointer should not be nil")
|
||||
}
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
|
||||
continue
|
||||
}
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = fp.Write(data)
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
_, _ = fp.WriteString("\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *FileStorage) Close() error {
|
||||
this.filesLocker.Lock()
|
||||
defer this.filesLocker.Unlock()
|
||||
|
||||
var resultErr error
|
||||
for _, f := range this.files {
|
||||
err := f.Close()
|
||||
if err != nil {
|
||||
resultErr = err
|
||||
}
|
||||
}
|
||||
return resultErr
|
||||
}
|
||||
|
||||
func (this *FileStorage) fp() *os.File {
|
||||
path := this.FormatVariables(this.config.Path)
|
||||
|
||||
this.filesLocker.Lock()
|
||||
defer this.filesLocker.Unlock()
|
||||
fp, ok := this.files[path]
|
||||
if ok {
|
||||
return fp
|
||||
}
|
||||
|
||||
// 关闭其他的文件
|
||||
for _, f := range this.files {
|
||||
_ = f.Close()
|
||||
}
|
||||
|
||||
// 是否创建文件目录
|
||||
if this.config.AutoCreate {
|
||||
dir := filepath.Dir(path)
|
||||
_, err := os.Stat(dir)
|
||||
if os.IsNotExist(err) {
|
||||
err = os.MkdirAll(dir, 0777)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 打开新文件
|
||||
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return nil
|
||||
}
|
||||
this.files[path] = fp
|
||||
|
||||
return fp
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFileStorage_Write(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.AccessLogFileStorageConfig{
|
||||
Path: Tea.Root + "/logs/access-${date}.log",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestPath: "/hello",
|
||||
},
|
||||
{
|
||||
RequestPath: "/world",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestPath: "/1",
|
||||
},
|
||||
{
|
||||
RequestPath: "/2",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,33 +0,0 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package accesslogs
|
||||
|
||||
import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
|
||||
// StorageInterface 日志存储接口
|
||||
type StorageInterface interface {
|
||||
// Version 获取版本
|
||||
Version() int
|
||||
|
||||
// SetVersion 设置版本
|
||||
SetVersion(version int)
|
||||
|
||||
// SetFirewallOnly 设置是否只处理防火墙相关的访问日志
|
||||
SetFirewallOnly(firewallOnly bool)
|
||||
|
||||
IsOk() bool
|
||||
|
||||
SetOk(ok bool)
|
||||
|
||||
// Config 获取配置
|
||||
Config() interface{}
|
||||
|
||||
// Start 开启
|
||||
Start() error
|
||||
|
||||
// Write 写入日志
|
||||
Write(accessLogs []*pb.HTTPAccessLog) error
|
||||
|
||||
// Close 关闭
|
||||
Close() error
|
||||
}
|
||||
@@ -1,185 +0,0 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var SharedStorageManager = NewStorageManager()
|
||||
|
||||
type StorageManager struct {
|
||||
storageMap map[int64]StorageInterface // policyId => Storage
|
||||
|
||||
locker sync.Mutex
|
||||
}
|
||||
|
||||
func NewStorageManager() *StorageManager {
|
||||
return &StorageManager{
|
||||
storageMap: map[int64]StorageInterface{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *StorageManager) Start() {
|
||||
var ticker = time.NewTicker(1 * time.Minute)
|
||||
if Tea.IsTesting() {
|
||||
ticker = time.NewTicker(5 * time.Second)
|
||||
}
|
||||
|
||||
// 启动时执行一次
|
||||
var err = this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
|
||||
}
|
||||
|
||||
// 循环执行
|
||||
for range ticker.C {
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Loop 更新
|
||||
func (this *StorageManager) Loop() error {
|
||||
policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var policyIds = []int64{}
|
||||
for _, policy := range policies {
|
||||
if policy.IsOn {
|
||||
policyIds = append(policyIds, int64(policy.Id))
|
||||
}
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
// 关闭不用的
|
||||
for policyId, storage := range this.storageMap {
|
||||
if !lists.ContainsInt64(policyIds, policyId) {
|
||||
err := storage.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
}
|
||||
delete(this.storageMap, policyId)
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'")
|
||||
}
|
||||
}
|
||||
|
||||
for _, policy := range policies {
|
||||
var policyId = int64(policy.Id)
|
||||
storage, ok := this.storageMap[policyId]
|
||||
if ok {
|
||||
// 检查配置是否有变更
|
||||
if types.Int(policy.Version) != storage.Version() {
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
|
||||
// 继续往下执行
|
||||
}
|
||||
|
||||
if len(policy.Options) > 0 {
|
||||
err = json.Unmarshal(policy.Options, storage.Config())
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error())
|
||||
storage.SetOk(false)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
storage.SetVersion(types.Int(policy.Version))
|
||||
storage.SetFirewallOnly(policy.FirewallOnly == 1)
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetOk(true)
|
||||
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'")
|
||||
}
|
||||
} else {
|
||||
storage, err := this.createStorage(policy.Type, policy.Options)
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetVersion(types.Int(policy.Version))
|
||||
storage.SetFirewallOnly(policy.FirewallOnly == 1)
|
||||
this.storageMap[policyId] = storage
|
||||
err = storage.Start()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetOk(true)
|
||||
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) {
|
||||
switch storageType {
|
||||
case serverconfigs.AccessLogStorageTypeFile:
|
||||
var config = &serverconfigs.AccessLogFileStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewFileStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeES:
|
||||
var config = &serverconfigs.AccessLogESStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewESStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeTCP:
|
||||
var config = &serverconfigs.AccessLogTCPStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewTCPStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeSyslog:
|
||||
var config = &serverconfigs.AccessLogSyslogStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewSyslogStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeCommand:
|
||||
var config = &serverconfigs.AccessLogCommandStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewCommandStorage(config), nil
|
||||
}
|
||||
|
||||
return nil, errors.New("invalid policy type '" + storageType + "'")
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStorageManager_Loop(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var storage = NewStorageManager()
|
||||
err := storage.Loop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(storage.storageMap)
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
//go:build !plus
|
||||
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
)
|
||||
|
||||
// 写入日志
|
||||
func (this *StorageManager) Write(policyId int64, accessLogs []*pb.HTTPAccessLog) (success bool, failMessage string, err error) {
|
||||
return false, "only works in plus version", nil
|
||||
}
|
||||
@@ -1,146 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type SyslogStorageProtocol = string
|
||||
|
||||
const (
|
||||
SyslogStorageProtocolTCP SyslogStorageProtocol = "tcp"
|
||||
SyslogStorageProtocolUDP SyslogStorageProtocol = "udp"
|
||||
SyslogStorageProtocolNone SyslogStorageProtocol = "none"
|
||||
SyslogStorageProtocolSocket SyslogStorageProtocol = "socket"
|
||||
)
|
||||
|
||||
type SyslogStoragePriority = int
|
||||
|
||||
// SyslogStorage syslog存储策略
|
||||
type SyslogStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogSyslogStorageConfig
|
||||
|
||||
exe string
|
||||
}
|
||||
|
||||
func NewSyslogStorage(config *serverconfigs.AccessLogSyslogStorageConfig) *SyslogStorage {
|
||||
return &SyslogStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *SyslogStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *SyslogStorage) Start() error {
|
||||
if runtime.GOOS != "linux" {
|
||||
return errors.New("'syslog' storage only works on linux")
|
||||
}
|
||||
|
||||
exe, err := exec.LookPath("logger")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.exe = exe
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *SyslogStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := []string{}
|
||||
if len(this.config.Tag) > 0 {
|
||||
args = append(args, "-t", this.config.Tag)
|
||||
}
|
||||
|
||||
if this.config.Priority >= 0 {
|
||||
args = append(args, "-p", strconv.Itoa(this.config.Priority))
|
||||
}
|
||||
|
||||
switch this.config.Protocol {
|
||||
case SyslogStorageProtocolTCP:
|
||||
args = append(args, "-T")
|
||||
if len(this.config.ServerAddr) > 0 {
|
||||
args = append(args, "-n", this.config.ServerAddr)
|
||||
}
|
||||
if this.config.ServerPort > 0 {
|
||||
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
|
||||
}
|
||||
case SyslogStorageProtocolUDP:
|
||||
args = append(args, "-d")
|
||||
if len(this.config.ServerAddr) > 0 {
|
||||
args = append(args, "-n", this.config.ServerAddr)
|
||||
}
|
||||
if this.config.ServerPort > 0 {
|
||||
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
|
||||
}
|
||||
case SyslogStorageProtocolSocket:
|
||||
args = append(args, "-u")
|
||||
args = append(args, this.config.Socket)
|
||||
case SyslogStorageProtocolNone:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
args = append(args, "-S", "10240")
|
||||
|
||||
var cmd = exec.Command(this.exe, args...)
|
||||
var stderrBuffer = &bytes.Buffer{}
|
||||
cmd.Stderr = stderrBuffer
|
||||
|
||||
w, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
|
||||
continue
|
||||
}
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "marshal accesslog failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
_, err = w.Write(data)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
|
||||
_, err = w.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "write accesslog failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
_ = w.Close()
|
||||
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
return errors.New("send syslog failed: " + err.Error() + ", stderr: " + stderrBuffer.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *SyslogStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,114 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TCPStorage TCP存储策略
|
||||
type TCPStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogTCPStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
|
||||
connLocker sync.Mutex
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func NewTCPStorage(config *serverconfigs.AccessLogTCPStorageConfig) *TCPStorage {
|
||||
return &TCPStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *TCPStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *TCPStorage) Start() error {
|
||||
if len(this.config.Network) == 0 {
|
||||
return errors.New("'network' should not be empty")
|
||||
}
|
||||
if len(this.config.Addr) == 0 {
|
||||
return errors.New("'addr' should not be empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *TCPStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := this.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := this.conn
|
||||
if conn == nil {
|
||||
return errors.New("connection should not be nil")
|
||||
}
|
||||
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
|
||||
continue
|
||||
}
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = conn.Write(data)
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
_, err = conn.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *TCPStorage) Close() error {
|
||||
this.connLocker.Lock()
|
||||
defer this.connLocker.Unlock()
|
||||
|
||||
if this.conn != nil {
|
||||
err := this.conn.Close()
|
||||
this.conn = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *TCPStorage) connect() error {
|
||||
this.connLocker.Lock()
|
||||
defer this.connLocker.Unlock()
|
||||
|
||||
if this.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := net.Dial(this.config.Network, this.config.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.conn = conn
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTCPStorage_Write(t *testing.T) {
|
||||
go func() {
|
||||
server, err := net.Listen("tcp", "127.0.0.1:9981")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
conn, err := server.Accept()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := conn.Read(buf)
|
||||
if n > 0 {
|
||||
t.Log(string(buf[:n]))
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
_ = server.Close()
|
||||
}()
|
||||
|
||||
storage := NewTCPStorage(&serverconfigs.AccessLogTCPStorageConfig{
|
||||
Network: "tcp",
|
||||
Addr: "127.0.0.1:9981",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
<?php
|
||||
|
||||
// test command storage
|
||||
|
||||
// open access log file
|
||||
$fp = fopen("/tmp/goedge-command-storage.log", "a+");
|
||||
|
||||
// read access logs from stdin
|
||||
$stdin = fopen("php://stdin", "r");
|
||||
while(true) {
|
||||
if (feof($stdin)) {
|
||||
break;
|
||||
}
|
||||
$line = fgets($stdin);
|
||||
|
||||
// write to access log file
|
||||
fwrite($fp, $line);
|
||||
}
|
||||
|
||||
// close file pointers
|
||||
fclose($fp);
|
||||
fclose($stdin);
|
||||
|
||||
?>
|
||||
@@ -93,7 +93,6 @@ func init() {
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/configs"
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
@@ -159,9 +158,7 @@ func (this *APINode) Start() {
|
||||
})
|
||||
|
||||
// 访问日志存储管理器
|
||||
goman.New(func() {
|
||||
accesslogs.SharedStorageManager.Start()
|
||||
})
|
||||
this.startAccessLogStorages()
|
||||
|
||||
// 监听RPC服务
|
||||
remotelogs.Println("API_NODE", "starting RPC server ...")
|
||||
|
||||
8
internal/nodes/api_node_ext.go
Normal file
8
internal/nodes/api_node_ext.go
Normal file
@@ -0,0 +1,8 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build !plus
|
||||
|
||||
package nodes
|
||||
|
||||
func (this *APINode) startAccessLogStorages() {
|
||||
|
||||
}
|
||||
@@ -102,11 +102,6 @@ func (this *APINode) registerServices(server *grpc.Server) {
|
||||
pb.RegisterHTTPPageServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
var instance = this.serviceInstance(&services.HTTPAccessLogPolicyService{}).(*services.HTTPAccessLogPolicyService)
|
||||
pb.RegisterHTTPAccessLogPolicyServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
var instance = this.serviceInstance(&services.HTTPCachePolicyService{}).(*services.HTTPCachePolicyService)
|
||||
pb.RegisterHTTPCachePolicyServiceServer(server, instance)
|
||||
|
||||
@@ -2,7 +2,6 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
@@ -37,17 +36,10 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 发送到访问日志策略
|
||||
policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx)
|
||||
err = this.writeAccessLogsToPolicy(req.HttpAccessLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if policyId > 0 {
|
||||
_, _, err = accesslogs.SharedStorageManager.Write(policyId, req.HttpAccessLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.CreateHTTPAccessLogsResponse{}, nil
|
||||
}
|
||||
|
||||
10
internal/rpc/services/service_http_access_log_ext.go
Normal file
10
internal/rpc/services/service_http_access_log_ext.go
Normal file
@@ -0,0 +1,10 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build !plus
|
||||
|
||||
package services
|
||||
|
||||
import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
|
||||
func (this *HTTPAccessLogService) writeAccessLogsToPolicy(pbAccessLogs []*pb.HTTPAccessLog) error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,165 +0,0 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
)
|
||||
|
||||
type HTTPAccessLogPolicyService struct {
|
||||
BaseService
|
||||
}
|
||||
|
||||
// CountAllHTTPAccessLogPolicies 计算访问日志策略数量
|
||||
func (this *HTTPAccessLogPolicyService) CountAllHTTPAccessLogPolicies(ctx context.Context, req *pb.CountAllHTTPAccessLogPoliciesRequest) (*pb.RPCCountResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
count, err := models.SharedHTTPAccessLogPolicyDAO.CountAllEnabledPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.SuccessCount(count)
|
||||
}
|
||||
|
||||
// ListHTTPAccessLogPolicies 列出单页访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) ListHTTPAccessLogPolicies(ctx context.Context, req *pb.ListHTTPAccessLogPoliciesRequest) (*pb.ListHTTPAccessLogPoliciesResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
policies, err := models.SharedHTTPAccessLogPolicyDAO.ListEnabledPolicies(tx, req.Offset, req.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var pbPolicies = []*pb.HTTPAccessLogPolicy{}
|
||||
for _, policy := range policies {
|
||||
pbPolicies = append(pbPolicies, &pb.HTTPAccessLogPolicy{
|
||||
Id: int64(policy.Id),
|
||||
Name: policy.Name,
|
||||
IsOn: policy.IsOn,
|
||||
Type: policy.Type,
|
||||
OptionsJSON: policy.Options,
|
||||
CondsJSON: policy.Conds,
|
||||
IsPublic: policy.IsPublic,
|
||||
FirewallOnly: policy.FirewallOnly == 1,
|
||||
})
|
||||
}
|
||||
return &pb.ListHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil
|
||||
}
|
||||
|
||||
// CreateHTTPAccessLogPolicy 创建访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Context, req *pb.CreateHTTPAccessLogPolicyRequest) (*pb.CreateHTTPAccessLogPolicyResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
|
||||
// 取消别的Public
|
||||
if req.IsPublic {
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 创建
|
||||
policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, req.Type, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.CreateHTTPAccessLogPolicyResponse{HttpAccessLogPolicyId: policyId}, nil
|
||||
}
|
||||
|
||||
// UpdateHTTPAccessLogPolicy 修改访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Context, req *pb.UpdateHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
|
||||
// 取消别的Public
|
||||
if req.IsPublic {
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 保存修改
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly, req.IsOn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// FindHTTPAccessLogPolicy 查找单个访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) FindHTTPAccessLogPolicy(ctx context.Context, req *pb.FindHTTPAccessLogPolicyRequest) (*pb.FindHTTPAccessLogPolicyResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if policy == nil {
|
||||
return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: nil}, nil
|
||||
}
|
||||
return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: &pb.HTTPAccessLogPolicy{
|
||||
Id: int64(policy.Id),
|
||||
Name: policy.Name,
|
||||
IsOn: policy.IsOn,
|
||||
Type: policy.Type,
|
||||
OptionsJSON: policy.Options,
|
||||
CondsJSON: policy.Conds,
|
||||
IsPublic: policy.IsPublic,
|
||||
FirewallOnly: policy.FirewallOnly == 1,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// DeleteHTTPAccessLogPolicy 删除访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) DeleteHTTPAccessLogPolicy(ctx context.Context, req *pb.DeleteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.DisableHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// WriteHTTPAccessLogPolicy 测试写入某个访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) WriteHTTPAccessLogPolicy(ctx context.Context, req *pb.WriteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
success, failMessage, err := accesslogs.SharedStorageManager.Write(req.HttpAccessLogPolicyId, []*pb.HTTPAccessLog{req.HttpAccessLog})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !success {
|
||||
return nil, errors.New("test failed: " + failMessage)
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
Reference in New Issue
Block a user