mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-10 20:40:56 +08:00
实现基本的访问日志策略
This commit is contained in:
66
internal/accesslogs/storage_base.go
Normal file
66
internal/accesslogs/storage_base.go
Normal file
@@ -0,0 +1,66 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BaseStorage struct {
|
||||
isOk bool
|
||||
version int
|
||||
}
|
||||
|
||||
func (this *BaseStorage) SetVersion(version int) {
|
||||
this.version = version
|
||||
}
|
||||
|
||||
func (this *BaseStorage) Version() int {
|
||||
return this.version
|
||||
}
|
||||
|
||||
func (this *BaseStorage) IsOk() bool {
|
||||
return this.isOk
|
||||
}
|
||||
|
||||
func (this *BaseStorage) SetOk(isOk bool) {
|
||||
this.isOk = isOk
|
||||
}
|
||||
|
||||
// Marshal 对日志进行编码
|
||||
func (this *BaseStorage) Marshal(accessLog *pb.HTTPAccessLog) ([]byte, error) {
|
||||
return json.Marshal(accessLog)
|
||||
}
|
||||
|
||||
// FormatVariables 格式化字符串中的变量
|
||||
func (this *BaseStorage) FormatVariables(s string) string {
|
||||
now := time.Now()
|
||||
return configutils.ParseVariables(s, func(varName string) (value string) {
|
||||
switch varName {
|
||||
case "year":
|
||||
return strconv.Itoa(now.Year())
|
||||
case "month":
|
||||
return fmt.Sprintf("%02d", now.Month())
|
||||
case "week":
|
||||
_, week := now.ISOWeek()
|
||||
return fmt.Sprintf("%02d", week)
|
||||
case "day":
|
||||
return fmt.Sprintf("%02d", now.Day())
|
||||
case "hour":
|
||||
return fmt.Sprintf("%02d", now.Hour())
|
||||
case "minute":
|
||||
return fmt.Sprintf("%02d", now.Minute())
|
||||
case "second":
|
||||
return fmt.Sprintf("%02d", now.Second())
|
||||
case "date":
|
||||
return fmt.Sprintf("%d%02d%02d", now.Year(), now.Month(), now.Day())
|
||||
}
|
||||
|
||||
return varName
|
||||
})
|
||||
}
|
||||
95
internal/accesslogs/storage_command.go
Normal file
95
internal/accesslogs/storage_command.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// CommandStorage 通过命令行存储
|
||||
type CommandStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogCommandStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
}
|
||||
|
||||
func NewCommandStorage(config *serverconfigs.AccessLogCommandStorageConfig) *CommandStorage {
|
||||
return &CommandStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *CommandStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 启动
|
||||
func (this *CommandStorage) Start() error {
|
||||
if len(this.config.Command) == 0 {
|
||||
return errors.New("'command' should not be empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *CommandStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
cmd := exec.Command(this.config.Command, this.config.Args...)
|
||||
if len(this.config.Dir) > 0 {
|
||||
cmd.Dir = this.config.Dir
|
||||
}
|
||||
|
||||
stdout := bytes.NewBuffer([]byte{})
|
||||
cmd.Stdout = stdout
|
||||
|
||||
w, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, accessLog := range accessLogs {
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = w.Write(data)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
|
||||
_, err = w.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
}
|
||||
_ = w.Close()
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
|
||||
if stdout.Len() > 0 {
|
||||
logs.Error(errors.New(string(stdout.Bytes())))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *CommandStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
63
internal/accesslogs/storage_command_test.go
Normal file
63
internal/accesslogs/storage_command_test.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCommandStorage_Write(t *testing.T) {
|
||||
php, err := exec.LookPath("php")
|
||||
if err != nil { // not found php, so we can not test
|
||||
t.Log("php:", err)
|
||||
return
|
||||
}
|
||||
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
before := time.Now()
|
||||
|
||||
storage := NewCommandStorage(&serverconfigs.AccessLogCommandStorageConfig{
|
||||
Command: php,
|
||||
Args: []string{cwd + "/tests/command_storage.php"},
|
||||
})
|
||||
err = storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/hello",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/world",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/lu",
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/ping",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(time.Since(before).Seconds(), "seconds")
|
||||
}
|
||||
125
internal/accesslogs/storage_es.go
Normal file
125
internal/accesslogs/storage_es.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ESStorage ElasticSearch存储策略
|
||||
type ESStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogESStorageConfig
|
||||
}
|
||||
|
||||
func NewESStorage(config *serverconfigs.AccessLogESStorageConfig) *ESStorage {
|
||||
return &ESStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *ESStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *ESStorage) Start() error {
|
||||
if len(this.config.Endpoint) == 0 {
|
||||
return errors.New("'endpoint' should not be nil")
|
||||
}
|
||||
if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.config.Endpoint) {
|
||||
this.config.Endpoint = "http://" + this.config.Endpoint
|
||||
}
|
||||
if len(this.config.Index) == 0 {
|
||||
return errors.New("'index' should not be nil")
|
||||
}
|
||||
if len(this.config.MappingType) == 0 {
|
||||
return errors.New("'mappingType' should not be nil")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *ESStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
bulk := &strings.Builder{}
|
||||
id := time.Now().UnixNano()
|
||||
indexName := this.FormatVariables(this.config.Index)
|
||||
typeName := this.FormatVariables(this.config.MappingType)
|
||||
for _, accessLog := range accessLogs {
|
||||
id++
|
||||
opData, err := json.Marshal(map[string]interface{}{
|
||||
"index": map[string]interface{}{
|
||||
"_index": indexName,
|
||||
"_type": typeName,
|
||||
"_id": fmt.Sprintf("%d", id),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
bulk.Write(opData)
|
||||
bulk.WriteString("\n")
|
||||
bulk.Write(data)
|
||||
bulk.WriteString("\n")
|
||||
}
|
||||
|
||||
if bulk.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, this.config.Endpoint+"/_bulk", strings.NewReader(bulk.String()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", strings.ReplaceAll(teaconst.ProductName, " ", "-")+"/"+teaconst.Version)
|
||||
if len(this.config.Username) > 0 || len(this.config.Password) > 0 {
|
||||
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(this.config.Username+":"+this.config.Password)))
|
||||
}
|
||||
client := utils.SharedHttpClient(10 * time.Second)
|
||||
defer func() {
|
||||
_ = req.Body.Close()
|
||||
}()
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
bodyData, _ := ioutil.ReadAll(resp.Body)
|
||||
return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *ESStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
53
internal/accesslogs/storage_es_test.go
Normal file
53
internal/accesslogs/storage_es_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestESStorage_Write(t *testing.T) {
|
||||
storage := NewESStorage(&serverconfigs.AccessLogESStorageConfig{
|
||||
Endpoint: "http://127.0.0.1:9200",
|
||||
Index: "logs",
|
||||
MappingType: "accessLogs",
|
||||
Username: "hello",
|
||||
Password: "world",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
TimeISO8601: "2018-07-23T22:23:35+08:00",
|
||||
Header: map[string]*pb.Strings{
|
||||
"Content-Type": {Values: []string{"text/html"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
TimeISO8601: "2018-07-23T22:23:35+08:00",
|
||||
Header: map[string]*pb.Strings{
|
||||
"Content-Type": {Values: []string{"text/css"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
127
internal/accesslogs/storage_file.go
Normal file
127
internal/accesslogs/storage_file.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// FileStorage 文件存储策略
|
||||
type FileStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogFileStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
|
||||
files map[string]*os.File // path => *File
|
||||
filesLocker sync.Mutex
|
||||
}
|
||||
|
||||
func NewFileStorage(config *serverconfigs.AccessLogFileStorageConfig) *FileStorage {
|
||||
return &FileStorage{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FileStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *FileStorage) Start() error {
|
||||
if len(this.config.Path) == 0 {
|
||||
return errors.New("'path' should not be empty")
|
||||
}
|
||||
|
||||
this.files = map[string]*os.File{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write 写入日志
|
||||
func (this *FileStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
fp := this.fp()
|
||||
if fp == nil {
|
||||
return errors.New("file pointer should not be nil")
|
||||
}
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = fp.Write(data)
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
_, _ = fp.WriteString("\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *FileStorage) Close() error {
|
||||
this.filesLocker.Lock()
|
||||
defer this.filesLocker.Unlock()
|
||||
|
||||
var resultErr error
|
||||
for _, f := range this.files {
|
||||
err := f.Close()
|
||||
if err != nil {
|
||||
resultErr = err
|
||||
}
|
||||
}
|
||||
return resultErr
|
||||
}
|
||||
|
||||
func (this *FileStorage) fp() *os.File {
|
||||
path := this.FormatVariables(this.config.Path)
|
||||
|
||||
this.filesLocker.Lock()
|
||||
defer this.filesLocker.Unlock()
|
||||
fp, ok := this.files[path]
|
||||
if ok {
|
||||
return fp
|
||||
}
|
||||
|
||||
// 关闭其他的文件
|
||||
for _, f := range this.files {
|
||||
_ = f.Close()
|
||||
}
|
||||
|
||||
// 是否创建文件目录
|
||||
if this.config.AutoCreate {
|
||||
dir := filepath.Dir(path)
|
||||
_, err := os.Stat(dir)
|
||||
if os.IsNotExist(err) {
|
||||
err = os.MkdirAll(dir, 0777)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 打开新文件
|
||||
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return nil
|
||||
}
|
||||
this.files[path] = fp
|
||||
|
||||
return fp
|
||||
}
|
||||
70
internal/accesslogs/storage_file_test.go
Normal file
70
internal/accesslogs/storage_file_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFileStorage_Write(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.AccessLogFileStorageConfig{
|
||||
Path: Tea.Root + "/logs/access-${date}.log",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestPath: "/hello",
|
||||
},
|
||||
{
|
||||
RequestPath: "/world",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestPath: "/1",
|
||||
},
|
||||
{
|
||||
RequestPath: "/2",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
30
internal/accesslogs/storage_interface.go
Normal file
30
internal/accesslogs/storage_interface.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package accesslogs
|
||||
|
||||
import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
|
||||
// StorageInterface 日志存储接口
|
||||
type StorageInterface interface {
|
||||
// Version 获取版本
|
||||
Version() int
|
||||
|
||||
// SetVersion 设置版本
|
||||
SetVersion(version int)
|
||||
|
||||
IsOk() bool
|
||||
|
||||
SetOk(ok bool)
|
||||
|
||||
// Config 获取配置
|
||||
Config() interface{}
|
||||
|
||||
// Start 开启
|
||||
Start() error
|
||||
|
||||
// Write 写入日志
|
||||
Write(accessLogs []*pb.HTTPAccessLog) error
|
||||
|
||||
// Close 关闭
|
||||
Close() error
|
||||
}
|
||||
199
internal/accesslogs/storage_manager.go
Normal file
199
internal/accesslogs/storage_manager.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var SharedStorageManager = NewStorageManager()
|
||||
|
||||
type StorageManager struct {
|
||||
storageMap map[int64]StorageInterface // policyId => Storage
|
||||
|
||||
locker sync.Mutex
|
||||
}
|
||||
|
||||
func NewStorageManager() *StorageManager {
|
||||
return &StorageManager{
|
||||
storageMap: map[int64]StorageInterface{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *StorageManager) Start() {
|
||||
var ticker = time.NewTicker(1 * time.Minute)
|
||||
if Tea.IsTesting() {
|
||||
ticker = time.NewTicker(5 * time.Second)
|
||||
}
|
||||
|
||||
// 启动时执行一次
|
||||
var err = this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
|
||||
}
|
||||
|
||||
// 循环执行
|
||||
for range ticker.C {
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *StorageManager) Write(policyId int64, accessLogs []*pb.HTTPAccessLog) error {
|
||||
this.locker.Lock()
|
||||
storage, ok := this.storageMap[policyId]
|
||||
this.locker.Unlock()
|
||||
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !storage.IsOk() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return storage.Write(accessLogs)
|
||||
}
|
||||
|
||||
// Loop 更新
|
||||
func (this *StorageManager) Loop() error {
|
||||
policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var policyIds = []int64{}
|
||||
for _, policy := range policies {
|
||||
if policy.IsOn == 1 {
|
||||
policyIds = append(policyIds, int64(policy.Id))
|
||||
}
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
// 关闭不用的
|
||||
for policyId, storage := range this.storageMap {
|
||||
if !lists.ContainsInt64(policyIds, policyId) {
|
||||
err := storage.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
}
|
||||
delete(this.storageMap, policyId)
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'")
|
||||
}
|
||||
}
|
||||
|
||||
for _, policy := range policies {
|
||||
var policyId = int64(policy.Id)
|
||||
storage, ok := this.storageMap[policyId]
|
||||
if ok {
|
||||
// 检查配置是否有变更
|
||||
if types.Int(policy.Version) != storage.Version() {
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
|
||||
// 继续往下执行
|
||||
}
|
||||
|
||||
if len(policy.Options) > 0 {
|
||||
err = json.Unmarshal([]byte(policy.Options), storage.Config())
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error())
|
||||
storage.SetOk(false)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
storage.SetVersion(types.Int(policy.Version))
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetOk(true)
|
||||
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'")
|
||||
}
|
||||
} else {
|
||||
storage, err := this.createStorage(policy.Type, []byte(policy.Options))
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetVersion(types.Int(policy.Version))
|
||||
this.storageMap[policyId] = storage
|
||||
err = storage.Start()
|
||||
if err != nil {
|
||||
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
|
||||
continue
|
||||
}
|
||||
storage.SetOk(true)
|
||||
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) {
|
||||
switch storageType {
|
||||
case serverconfigs.AccessLogStorageTypeFile:
|
||||
var config = &serverconfigs.AccessLogFileStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewFileStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeES:
|
||||
var config = &serverconfigs.AccessLogESStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewESStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeTCP:
|
||||
var config = &serverconfigs.AccessLogTCPStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewTCPStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeSyslog:
|
||||
var config = &serverconfigs.AccessLogSyslogStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewSyslogStorage(config), nil
|
||||
case serverconfigs.AccessLogStorageTypeCommand:
|
||||
var config = &serverconfigs.AccessLogCommandStorageConfig{}
|
||||
if len(optionsJSON) > 0 {
|
||||
err := json.Unmarshal(optionsJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewCommandStorage(config), nil
|
||||
}
|
||||
|
||||
return nil, errors.New("invalid policy type '" + storageType + "'")
|
||||
}
|
||||
17
internal/accesslogs/storage_manager_test.go
Normal file
17
internal/accesslogs/storage_manager_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStorageManager_Loop(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var storage = NewStorageManager()
|
||||
err := storage.Loop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(storage.storageMap)
|
||||
}
|
||||
137
internal/accesslogs/storage_syslog.go
Normal file
137
internal/accesslogs/storage_syslog.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type SyslogStorageProtocol = string
|
||||
|
||||
const (
|
||||
SyslogStorageProtocolTCP SyslogStorageProtocol = "tcp"
|
||||
SyslogStorageProtocolUDP SyslogStorageProtocol = "udp"
|
||||
SyslogStorageProtocolNone SyslogStorageProtocol = "none"
|
||||
SyslogStorageProtocolSocket SyslogStorageProtocol = "socket"
|
||||
)
|
||||
|
||||
type SyslogStoragePriority = int
|
||||
|
||||
// SyslogStorage syslog存储策略
|
||||
type SyslogStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogSyslogStorageConfig
|
||||
|
||||
exe string
|
||||
}
|
||||
|
||||
func NewSyslogStorage(config *serverconfigs.AccessLogSyslogStorageConfig) *SyslogStorage {
|
||||
return &SyslogStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *SyslogStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *SyslogStorage) Start() error {
|
||||
if runtime.GOOS != "linux" {
|
||||
return errors.New("'syslog' storage only works on linux")
|
||||
}
|
||||
|
||||
exe, err := exec.LookPath("logger")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.exe = exe
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *SyslogStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := []string{}
|
||||
if len(this.config.Tag) > 0 {
|
||||
args = append(args, "-t", this.config.Tag)
|
||||
}
|
||||
|
||||
if this.config.Priority >= 0 {
|
||||
args = append(args, "-p", strconv.Itoa(this.config.Priority))
|
||||
}
|
||||
|
||||
switch this.config.Protocol {
|
||||
case SyslogStorageProtocolTCP:
|
||||
args = append(args, "-T")
|
||||
if len(this.config.ServerAddr) > 0 {
|
||||
args = append(args, "-n", this.config.ServerAddr)
|
||||
}
|
||||
if this.config.ServerPort > 0 {
|
||||
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
|
||||
}
|
||||
case SyslogStorageProtocolUDP:
|
||||
args = append(args, "-d")
|
||||
if len(this.config.ServerAddr) > 0 {
|
||||
args = append(args, "-n", this.config.ServerAddr)
|
||||
}
|
||||
if this.config.ServerPort > 0 {
|
||||
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
|
||||
}
|
||||
case SyslogStorageProtocolSocket:
|
||||
args = append(args, "-u")
|
||||
args = append(args, this.config.Socket)
|
||||
case SyslogStorageProtocolNone:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
args = append(args, "-S", "10240")
|
||||
|
||||
cmd := exec.Command(this.exe, args...)
|
||||
w, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = w.Write(data)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
|
||||
_, err = w.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
_ = w.Close()
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *SyslogStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
111
internal/accesslogs/storage_tcp.go
Normal file
111
internal/accesslogs/storage_tcp.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TCPStorage TCP存储策略
|
||||
type TCPStorage struct {
|
||||
BaseStorage
|
||||
|
||||
config *serverconfigs.AccessLogTCPStorageConfig
|
||||
|
||||
writeLocker sync.Mutex
|
||||
|
||||
connLocker sync.Mutex
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func NewTCPStorage(config *serverconfigs.AccessLogTCPStorageConfig) *TCPStorage {
|
||||
return &TCPStorage{config: config}
|
||||
}
|
||||
|
||||
func (this *TCPStorage) Config() interface{} {
|
||||
return this.config
|
||||
}
|
||||
|
||||
// Start 开启
|
||||
func (this *TCPStorage) Start() error {
|
||||
if len(this.config.Network) == 0 {
|
||||
return errors.New("'network' should not be empty")
|
||||
}
|
||||
if len(this.config.Addr) == 0 {
|
||||
return errors.New("'addr' should not be empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入日志
|
||||
func (this *TCPStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := this.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := this.conn
|
||||
if conn == nil {
|
||||
return errors.New("connection should not be nil")
|
||||
}
|
||||
|
||||
this.writeLocker.Lock()
|
||||
defer this.writeLocker.Unlock()
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
data, err := this.Marshal(accessLog)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
continue
|
||||
}
|
||||
_, err = conn.Write(data)
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
_, err = conn.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
_ = this.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭
|
||||
func (this *TCPStorage) Close() error {
|
||||
this.connLocker.Lock()
|
||||
defer this.connLocker.Unlock()
|
||||
|
||||
if this.conn != nil {
|
||||
err := this.conn.Close()
|
||||
this.conn = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *TCPStorage) connect() error {
|
||||
this.connLocker.Lock()
|
||||
defer this.connLocker.Unlock()
|
||||
|
||||
if this.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := net.Dial(this.config.Network, this.config.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.conn = conn
|
||||
|
||||
return nil
|
||||
}
|
||||
72
internal/accesslogs/storage_tcp_test.go
Normal file
72
internal/accesslogs/storage_tcp_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package accesslogs
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTCPStorage_Write(t *testing.T) {
|
||||
go func() {
|
||||
server, err := net.Listen("tcp", "127.0.0.1:9981")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
conn, err := server.Accept()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := conn.Read(buf)
|
||||
if n > 0 {
|
||||
t.Log(string(buf[:n]))
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
_ = server.Close()
|
||||
}()
|
||||
|
||||
storage := NewTCPStorage(&serverconfigs.AccessLogTCPStorageConfig{
|
||||
Network: "tcp",
|
||||
Addr: "127.0.0.1:9981",
|
||||
})
|
||||
err := storage.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
err = storage.Write([]*pb.HTTPAccessLog{
|
||||
{
|
||||
RequestMethod: "POST",
|
||||
RequestPath: "/1",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
{
|
||||
RequestMethod: "GET",
|
||||
RequestPath: "/2",
|
||||
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
24
internal/accesslogs/tests/command_storage.php
Normal file
24
internal/accesslogs/tests/command_storage.php
Normal file
@@ -0,0 +1,24 @@
|
||||
<?php
|
||||
|
||||
// test command storage
|
||||
|
||||
// open access log file
|
||||
$fp = fopen("/tmp/goedge-command-storage.log", "a+");
|
||||
|
||||
// read access logs from stdin
|
||||
$stdin = fopen("php://stdin", "r");
|
||||
while(true) {
|
||||
if (feof($stdin)) {
|
||||
break;
|
||||
}
|
||||
$line = fgets($stdin);
|
||||
|
||||
// write to access log file
|
||||
fwrite($fp, $line);
|
||||
}
|
||||
|
||||
// close file pointers
|
||||
fclose($fp);
|
||||
fclose($stdin);
|
||||
|
||||
?>
|
||||
@@ -1,12 +1,13 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -35,12 +36,12 @@ func init() {
|
||||
})
|
||||
}
|
||||
|
||||
// 初始化
|
||||
// Init 初始化
|
||||
func (this *HTTPAccessLogPolicyDAO) Init() {
|
||||
_ = this.DAOObject.Init()
|
||||
}
|
||||
|
||||
// 启用条目
|
||||
// EnableHTTPAccessLogPolicy 启用条目
|
||||
func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(tx *dbs.Tx, id int64) error {
|
||||
_, err := this.Query(tx).
|
||||
Pk(id).
|
||||
@@ -49,7 +50,7 @@ func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(tx *dbs.Tx, id int
|
||||
return err
|
||||
}
|
||||
|
||||
// 禁用条目
|
||||
// DisableHTTPAccessLogPolicy 禁用条目
|
||||
func (this *HTTPAccessLogPolicyDAO) DisableHTTPAccessLogPolicy(tx *dbs.Tx, id int64) error {
|
||||
_, err := this.Query(tx).
|
||||
Pk(id).
|
||||
@@ -58,7 +59,7 @@ func (this *HTTPAccessLogPolicyDAO) DisableHTTPAccessLogPolicy(tx *dbs.Tx, id in
|
||||
return err
|
||||
}
|
||||
|
||||
// 查找启用中的条目
|
||||
// FindEnabledHTTPAccessLogPolicy 查找启用中的条目
|
||||
func (this *HTTPAccessLogPolicyDAO) FindEnabledHTTPAccessLogPolicy(tx *dbs.Tx, id int64) (*HTTPAccessLogPolicy, error) {
|
||||
result, err := this.Query(tx).
|
||||
Pk(id).
|
||||
@@ -70,7 +71,7 @@ func (this *HTTPAccessLogPolicyDAO) FindEnabledHTTPAccessLogPolicy(tx *dbs.Tx, i
|
||||
return result.(*HTTPAccessLogPolicy), err
|
||||
}
|
||||
|
||||
// 根据主键查找名称
|
||||
// FindHTTPAccessLogPolicyName 根据主键查找名称
|
||||
func (this *HTTPAccessLogPolicyDAO) FindHTTPAccessLogPolicyName(tx *dbs.Tx, id int64) (string, error) {
|
||||
return this.Query(tx).
|
||||
Pk(id).
|
||||
@@ -78,51 +79,116 @@ func (this *HTTPAccessLogPolicyDAO) FindHTTPAccessLogPolicyName(tx *dbs.Tx, id i
|
||||
FindStringCol("")
|
||||
}
|
||||
|
||||
// 查找所有可用策略信息
|
||||
func (this *HTTPAccessLogPolicyDAO) FindAllEnabledAccessLogPolicies(tx *dbs.Tx) (result []*HTTPAccessLogPolicy, err error) {
|
||||
// CountAllEnabledPolicies 计算策略数量
|
||||
func (this *HTTPAccessLogPolicyDAO) CountAllEnabledPolicies(tx *dbs.Tx) (int64, error) {
|
||||
return this.Query(tx).
|
||||
State(HTTPAccessLogPolicyStateEnabled).
|
||||
Count()
|
||||
}
|
||||
|
||||
// ListEnabledPolicies 查找所有可用策略信息
|
||||
func (this *HTTPAccessLogPolicyDAO) ListEnabledPolicies(tx *dbs.Tx, offset int64, size int64) (result []*HTTPAccessLogPolicy, err error) {
|
||||
_, err = this.Query(tx).
|
||||
State(HTTPAccessLogPolicyStateEnabled).
|
||||
DescPk().
|
||||
Offset(offset).
|
||||
Limit(size).
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// 组合配置
|
||||
func (this *HTTPAccessLogPolicyDAO) ComposeAccessLogPolicyConfig(tx *dbs.Tx, policyId int64) (*serverconfigs.HTTPAccessLogStoragePolicy, error) {
|
||||
policy, err := this.FindEnabledHTTPAccessLogPolicy(tx, policyId)
|
||||
// FindAllEnabledAndOnPolicies 获取所有的策略信息
|
||||
func (this *HTTPAccessLogPolicyDAO) FindAllEnabledAndOnPolicies(tx *dbs.Tx) (result []*HTTPAccessLogPolicy, err error) {
|
||||
_, err = this.Query(tx).
|
||||
State(HTTPAccessLogPolicyStateEnabled).
|
||||
Attr("isOn", true).
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// CreatePolicy 创建策略
|
||||
func (this *HTTPAccessLogPolicyDAO) CreatePolicy(tx *dbs.Tx, name string, policyType string, optionsJSON []byte, condsJSON []byte, isPublic bool) (policyId int64, err error) {
|
||||
var op = NewHTTPAccessLogPolicyOperator()
|
||||
op.Name = name
|
||||
op.Type = policyType
|
||||
if len(optionsJSON) > 0 {
|
||||
op.Options = optionsJSON
|
||||
}
|
||||
if len(condsJSON) > 0 {
|
||||
op.Conds = condsJSON
|
||||
}
|
||||
op.IsPublic = isPublic
|
||||
op.IsOn = true
|
||||
op.State = HTTPAccessLogPolicyStateEnabled
|
||||
return this.SaveInt64(tx, op)
|
||||
}
|
||||
|
||||
// UpdatePolicy 修改策略
|
||||
func (this *HTTPAccessLogPolicyDAO) UpdatePolicy(tx *dbs.Tx, policyId int64, name string, optionsJSON []byte, condsJSON []byte, isPublic bool, isOn bool) error {
|
||||
if policyId <= 0 {
|
||||
return errors.New("invalid policyId")
|
||||
}
|
||||
|
||||
oldOne, err := this.Query(tx).
|
||||
Pk(policyId).
|
||||
Find()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if policy == nil {
|
||||
return nil, nil
|
||||
if oldOne == nil {
|
||||
return nil
|
||||
}
|
||||
var oldPolicy = oldOne.(*HTTPAccessLogPolicy)
|
||||
|
||||
var op = NewHTTPAccessLogPolicyOperator()
|
||||
op.Id = policyId
|
||||
op.Name = name
|
||||
if len(optionsJSON) > 0 {
|
||||
op.Options = optionsJSON
|
||||
} else {
|
||||
op.Options = "{}"
|
||||
}
|
||||
if len(condsJSON) > 0 {
|
||||
op.Conds = condsJSON
|
||||
} else {
|
||||
op.Conds = "{}"
|
||||
}
|
||||
|
||||
config := &serverconfigs.HTTPAccessLogStoragePolicy{}
|
||||
config.Id = int64(policy.Id)
|
||||
config.IsOn = policy.IsOn == 1
|
||||
config.Name = policy.Name
|
||||
config.Type = policy.Type
|
||||
// 版本号
|
||||
if len(oldPolicy.Options) == 0 || len(optionsJSON) == 0 {
|
||||
op.Version = dbs.SQL("version+1")
|
||||
} else {
|
||||
var m1 = maps.Map{}
|
||||
_ = json.Unmarshal([]byte(oldPolicy.Options), &m1)
|
||||
|
||||
// 选项
|
||||
if IsNotNull(policy.Options) {
|
||||
m := map[string]interface{}{}
|
||||
err = json.Unmarshal([]byte(policy.Options), &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var m2 = maps.Map{}
|
||||
_ = json.Unmarshal(optionsJSON, &m2)
|
||||
|
||||
if bytes.Compare(m1.AsJSON(), m2.AsJSON()) != 0 {
|
||||
op.Version = dbs.SQL("version+1")
|
||||
}
|
||||
config.Options = m
|
||||
}
|
||||
|
||||
// 条件
|
||||
if IsNotNull(policy.Conds) {
|
||||
condsConfig := &shared.HTTPRequestCondsConfig{}
|
||||
err = json.Unmarshal([]byte(policy.Conds), condsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config.Conds = condsConfig
|
||||
op.IsPublic = isPublic
|
||||
op.IsOn = isOn
|
||||
return this.Save(tx, op)
|
||||
}
|
||||
|
||||
return config, nil
|
||||
// CancelAllPublicPolicies 取消别的公用的策略
|
||||
func (this *HTTPAccessLogPolicyDAO) CancelAllPublicPolicies(tx *dbs.Tx) error {
|
||||
return this.Query(tx).
|
||||
State(HTTPAccessLogPolicyStateEnabled).
|
||||
Set("isPublic", 0).
|
||||
UpdateQuickly()
|
||||
}
|
||||
|
||||
// FindCurrentPublicPolicyId 取得当前的公用策略
|
||||
func (this *HTTPAccessLogPolicyDAO) FindCurrentPublicPolicyId(tx *dbs.Tx) (int64, error) {
|
||||
return this.Query(tx).
|
||||
State(HTTPAccessLogPolicyStateEnabled).
|
||||
Attr("isPublic", 1).
|
||||
ResultPk().
|
||||
FindInt64Col(0)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package models
|
||||
|
||||
// 访问日志策略
|
||||
// HTTPAccessLogPolicy 访问日志策略
|
||||
type HTTPAccessLogPolicy struct {
|
||||
Id uint32 `field:"id"` // ID
|
||||
TemplateId uint32 `field:"templateId"` // 模版ID
|
||||
@@ -13,6 +13,8 @@ type HTTPAccessLogPolicy struct {
|
||||
Type string `field:"type"` // 存储类型
|
||||
Options string `field:"options"` // 存储选项
|
||||
Conds string `field:"conds"` // 请求条件
|
||||
IsPublic uint8 `field:"isPublic"` // 是否为公用
|
||||
Version uint32 `field:"version"` // 版本号
|
||||
}
|
||||
|
||||
type HTTPAccessLogPolicyOperator struct {
|
||||
@@ -27,6 +29,8 @@ type HTTPAccessLogPolicyOperator struct {
|
||||
Type interface{} // 存储类型
|
||||
Options interface{} // 存储选项
|
||||
Conds interface{} // 请求条件
|
||||
IsPublic interface{} // 是否为公用
|
||||
Version interface{} // 版本号
|
||||
}
|
||||
|
||||
func NewHTTPAccessLogPolicyOperator() *HTTPAccessLogPolicyOperator {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/configs"
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
@@ -110,6 +111,9 @@ func (this *APINode) Start() {
|
||||
// 状态变更计时器
|
||||
go NewNodeStatusExecutor().Listen()
|
||||
|
||||
// 访问日志存储管理器
|
||||
go accesslogs.SharedStorageManager.Start()
|
||||
|
||||
// 监听RPC服务
|
||||
remotelogs.Println("API_NODE", "starting RPC server ...")
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
@@ -31,6 +32,18 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 发送到访问日志策略
|
||||
policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if policyId > 0 {
|
||||
err = accesslogs.SharedStorageManager.Write(policyId, req.HttpAccessLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.CreateHTTPAccessLogsResponse{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
)
|
||||
@@ -10,32 +11,149 @@ type HTTPAccessLogPolicyService struct {
|
||||
BaseService
|
||||
}
|
||||
|
||||
// FindAllEnabledHTTPAccessLogPolicies 获取所有可用策略
|
||||
func (this *HTTPAccessLogPolicyService) FindAllEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.FindAllEnabledHTTPAccessLogPoliciesRequest) (*pb.FindAllEnabledHTTPAccessLogPoliciesResponse, error) {
|
||||
// 校验请求
|
||||
// CountAllEnabledHTTPAccessLogPolicies 计算访问日志策略数量
|
||||
func (this *HTTPAccessLogPolicyService) CountAllEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.CountAllEnabledHTTPAccessLogPoliciesRequest) (*pb.RPCCountResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tx := this.NullTx()
|
||||
var tx = this.NullTx()
|
||||
count, err := models.SharedHTTPAccessLogPolicyDAO.CountAllEnabledPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.SuccessCount(count)
|
||||
}
|
||||
|
||||
policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAccessLogPolicies(tx)
|
||||
// ListEnabledHTTPAccessLogPolicies 列出单页访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) ListEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.ListEnabledHTTPAccessLogPoliciesRequest) (*pb.ListEnabledHTTPAccessLogPoliciesResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := []*pb.HTTPAccessLogPolicy{}
|
||||
var tx = this.NullTx()
|
||||
policies, err := models.SharedHTTPAccessLogPolicyDAO.ListEnabledPolicies(tx, req.Offset, req.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var pbPolicies = []*pb.HTTPAccessLogPolicy{}
|
||||
for _, policy := range policies {
|
||||
result = append(result, &pb.HTTPAccessLogPolicy{
|
||||
pbPolicies = append(pbPolicies, &pb.HTTPAccessLogPolicy{
|
||||
Id: int64(policy.Id),
|
||||
Name: policy.Name,
|
||||
IsOn: policy.IsOn == 1,
|
||||
Type: policy.Name,
|
||||
Type: policy.Type,
|
||||
OptionsJSON: []byte(policy.Options),
|
||||
CondsJSON: []byte(policy.Conds),
|
||||
IsPublic: policy.IsPublic == 1,
|
||||
})
|
||||
}
|
||||
|
||||
return &pb.FindAllEnabledHTTPAccessLogPoliciesResponse{AccessLogPolicies: result}, nil
|
||||
return &pb.ListEnabledHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil
|
||||
}
|
||||
|
||||
// CreateHTTPAccessLogPolicy 创建访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Context, req *pb.CreateHTTPAccessLogPolicyRequest) (*pb.CreateHTTPAccessLogPolicyResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
|
||||
// 取消别的Public
|
||||
if req.IsPublic {
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 创建
|
||||
policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, req.Type, req.OptionsJSON, req.CondsJSON, req.IsPublic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.CreateHTTPAccessLogPolicyResponse{HttpAccessLogPolicyId: policyId}, nil
|
||||
}
|
||||
|
||||
// UpdateHTTPAccessLogPolicy 修改访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Context, req *pb.UpdateHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
|
||||
// 取消别的Public
|
||||
if req.IsPublic {
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 保存修改
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.IsOn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// FindEnabledHTTPAccessLogPolicy 查找单个访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) FindEnabledHTTPAccessLogPolicy(ctx context.Context, req *pb.FindEnabledHTTPAccessLogPolicyRequest) (*pb.FindEnabledHTTPAccessLogPolicyResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if policy == nil {
|
||||
return &pb.FindEnabledHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: nil}, nil
|
||||
}
|
||||
return &pb.FindEnabledHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: &pb.HTTPAccessLogPolicy{
|
||||
Id: int64(policy.Id),
|
||||
Name: policy.Name,
|
||||
IsOn: policy.IsOn == 1,
|
||||
Type: policy.Type,
|
||||
OptionsJSON: []byte(policy.Options),
|
||||
CondsJSON: []byte(policy.Conds),
|
||||
IsPublic: policy.IsPublic == 1,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// DeleteHTTPAccessLogPolicy 删除访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) DeleteHTTPAccessLogPolicy(ctx context.Context, req *pb.DeleteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
err = models.SharedHTTPAccessLogPolicyDAO.DisableHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// WriteHTTPAccessLogPolicy 测试写入某个访问日志策略
|
||||
func (this *HTTPAccessLogPolicyService) WriteHTTPAccessLogPolicy(ctx context.Context, req *pb.WriteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = accesslogs.SharedStorageManager.Write(req.HttpAccessLogPolicyId, []*pb.HTTPAccessLog{req.HttpAccessLog})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user