删除不必要的文件

This commit is contained in:
GoEdgeLab
2022-08-09 17:35:35 +08:00
parent 146c09f4da
commit e13b2105f6
22 changed files with 20 additions and 1406 deletions

View File

@@ -1,71 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package accesslogs
import (
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"strconv"
"time"
)
type BaseStorage struct {
isOk bool
version int
firewallOnly bool
}
func (this *BaseStorage) SetVersion(version int) {
this.version = version
}
func (this *BaseStorage) Version() int {
return this.version
}
func (this *BaseStorage) IsOk() bool {
return this.isOk
}
func (this *BaseStorage) SetOk(isOk bool) {
this.isOk = isOk
}
func (this *BaseStorage) SetFirewallOnly(firewallOnly bool) {
this.firewallOnly = firewallOnly
}
// Marshal 对日志进行编码
func (this *BaseStorage) Marshal(accessLog *pb.HTTPAccessLog) ([]byte, error) {
return json.Marshal(accessLog)
}
// FormatVariables 格式化字符串中的变量
func (this *BaseStorage) FormatVariables(s string) string {
var now = time.Now()
return configutils.ParseVariables(s, func(varName string) (value string) {
switch varName {
case "year":
return strconv.Itoa(now.Year())
case "month":
return fmt.Sprintf("%02d", now.Month())
case "week":
_, week := now.ISOWeek()
return fmt.Sprintf("%02d", week)
case "day":
return fmt.Sprintf("%02d", now.Day())
case "hour":
return fmt.Sprintf("%02d", now.Hour())
case "minute":
return fmt.Sprintf("%02d", now.Minute())
case "second":
return fmt.Sprintf("%02d", now.Second())
case "date":
return fmt.Sprintf("%d%02d%02d", now.Year(), now.Month(), now.Day())
}
return varName
})
}

View File

@@ -1,99 +0,0 @@
package accesslogs
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"os/exec"
"sync"
)
// CommandStorage 通过命令行存储
type CommandStorage struct {
BaseStorage
config *serverconfigs.AccessLogCommandStorageConfig
writeLocker sync.Mutex
}
func NewCommandStorage(config *serverconfigs.AccessLogCommandStorageConfig) *CommandStorage {
return &CommandStorage{config: config}
}
func (this *CommandStorage) Config() interface{} {
return this.config
}
// Start 启动
func (this *CommandStorage) Start() error {
if len(this.config.Command) == 0 {
return errors.New("'command' should not be empty")
}
return nil
}
// 写入日志
func (this *CommandStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
if len(accessLogs) == 0 {
return nil
}
this.writeLocker.Lock()
defer this.writeLocker.Unlock()
cmd := exec.Command(this.config.Command, this.config.Args...)
if len(this.config.Dir) > 0 {
cmd.Dir = this.config.Dir
}
stdout := bytes.NewBuffer([]byte{})
cmd.Stdout = stdout
w, err := cmd.StdinPipe()
if err != nil {
return err
}
err = cmd.Start()
if err != nil {
return err
}
for _, accessLog := range accessLogs {
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
continue
}
data, err := this.Marshal(accessLog)
if err != nil {
logs.Error(err)
continue
}
_, err = w.Write(data)
if err != nil {
logs.Error(err)
}
_, err = w.Write([]byte("\n"))
if err != nil {
logs.Error(err)
}
}
_ = w.Close()
err = cmd.Wait()
if err != nil {
logs.Error(err)
if stdout.Len() > 0 {
logs.Error(errors.New(string(stdout.Bytes())))
}
}
return nil
}
// Close 关闭
func (this *CommandStorage) Close() error {
return nil
}

View File

@@ -1,63 +0,0 @@
package accesslogs
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"os"
"os/exec"
"testing"
"time"
)
func TestCommandStorage_Write(t *testing.T) {
php, err := exec.LookPath("php")
if err != nil { // not found php, so we can not test
t.Log("php:", err)
return
}
cwd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
before := time.Now()
storage := NewCommandStorage(&serverconfigs.AccessLogCommandStorageConfig{
Command: php,
Args: []string{cwd + "/tests/command_storage.php"},
})
err = storage.Start()
if err != nil {
t.Fatal(err)
}
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestMethod: "GET",
RequestPath: "/hello",
},
{
RequestMethod: "GET",
RequestPath: "/world",
},
{
RequestMethod: "GET",
RequestPath: "/lu",
},
{
RequestMethod: "GET",
RequestPath: "/ping",
},
})
if err != nil {
t.Fatal(err)
}
err = storage.Close()
if err != nil {
t.Fatal(err)
}
t.Log(time.Since(before).Seconds(), "seconds")
}

View File

@@ -1,131 +0,0 @@
package accesslogs
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"io"
"net/http"
"regexp"
"strings"
"time"
)
// ESStorage ElasticSearch存储策略
type ESStorage struct {
BaseStorage
config *serverconfigs.AccessLogESStorageConfig
}
func NewESStorage(config *serverconfigs.AccessLogESStorageConfig) *ESStorage {
return &ESStorage{config: config}
}
func (this *ESStorage) Config() interface{} {
return this.config
}
// Start 开启
func (this *ESStorage) Start() error {
if len(this.config.Endpoint) == 0 {
return errors.New("'endpoint' should not be nil")
}
if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.config.Endpoint) {
this.config.Endpoint = "http://" + this.config.Endpoint
}
if len(this.config.Index) == 0 {
return errors.New("'index' should not be nil")
}
if len(this.config.MappingType) == 0 {
return errors.New("'mappingType' should not be nil")
}
return nil
}
// 写入日志
func (this *ESStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
if len(accessLogs) == 0 {
return nil
}
bulk := &strings.Builder{}
indexName := this.FormatVariables(this.config.Index)
typeName := this.FormatVariables(this.config.MappingType)
for _, accessLog := range accessLogs {
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
continue
}
if len(accessLog.RequestId) == 0 {
continue
}
opData, err := json.Marshal(map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_type": typeName,
"_id": accessLog.RequestId,
},
})
if err != nil {
remotelogs.Error("ACCESS_LOG_ES_STORAGE", "write failed: "+err.Error())
continue
}
data, err := this.Marshal(accessLog)
if err != nil {
remotelogs.Error("ACCESS_LOG_ES_STORAGE", "marshal data failed: "+err.Error())
continue
}
bulk.Write(opData)
bulk.WriteString("\n")
bulk.Write(data)
bulk.WriteString("\n")
}
if bulk.Len() == 0 {
return nil
}
req, err := http.NewRequest(http.MethodPost, this.config.Endpoint+"/_bulk", strings.NewReader(bulk.String()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", strings.ReplaceAll(teaconst.ProductName, " ", "-")+"/"+teaconst.Version)
if len(this.config.Username) > 0 || len(this.config.Password) > 0 {
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(this.config.Username+":"+this.config.Password)))
}
client := utils.SharedHttpClient(10 * time.Second)
defer func() {
_ = req.Body.Close()
}()
resp, err := client.Do(req)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
bodyData, _ := io.ReadAll(resp.Body)
return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData))
}
return nil
}
// Close 关闭
func (this *ESStorage) Close() error {
return nil
}

View File

@@ -1,53 +0,0 @@
package accesslogs
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"testing"
"time"
)
func TestESStorage_Write(t *testing.T) {
storage := NewESStorage(&serverconfigs.AccessLogESStorageConfig{
Endpoint: "http://127.0.0.1:9200",
Index: "logs",
MappingType: "accessLogs",
Username: "hello",
Password: "world",
})
err := storage.Start()
if err != nil {
t.Fatal(err)
}
{
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestMethod: "POST",
RequestPath: "/1",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
TimeISO8601: "2018-07-23T22:23:35+08:00",
Header: map[string]*pb.Strings{
"Content-Type": {Values: []string{"text/html"}},
},
},
{
RequestMethod: "GET",
RequestPath: "/2",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
TimeISO8601: "2018-07-23T22:23:35+08:00",
Header: map[string]*pb.Strings{
"Content-Type": {Values: []string{"text/css"}},
},
},
})
if err != nil {
t.Fatal(err)
}
}
err = storage.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -1,130 +0,0 @@
package accesslogs
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"os"
"path/filepath"
"sync"
)
// FileStorage 文件存储策略
type FileStorage struct {
BaseStorage
config *serverconfigs.AccessLogFileStorageConfig
writeLocker sync.Mutex
files map[string]*os.File // path => *File
filesLocker sync.Mutex
}
func NewFileStorage(config *serverconfigs.AccessLogFileStorageConfig) *FileStorage {
return &FileStorage{
config: config,
}
}
func (this *FileStorage) Config() interface{} {
return this.config
}
// Start 开启
func (this *FileStorage) Start() error {
if len(this.config.Path) == 0 {
return errors.New("'path' should not be empty")
}
this.files = map[string]*os.File{}
return nil
}
// Write 写入日志
func (this *FileStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
if len(accessLogs) == 0 {
return nil
}
fp := this.fp()
if fp == nil {
return errors.New("file pointer should not be nil")
}
this.writeLocker.Lock()
defer this.writeLocker.Unlock()
for _, accessLog := range accessLogs {
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
continue
}
data, err := this.Marshal(accessLog)
if err != nil {
logs.Error(err)
continue
}
_, err = fp.Write(data)
if err != nil {
_ = this.Close()
break
}
_, _ = fp.WriteString("\n")
}
return nil
}
// Close 关闭
func (this *FileStorage) Close() error {
this.filesLocker.Lock()
defer this.filesLocker.Unlock()
var resultErr error
for _, f := range this.files {
err := f.Close()
if err != nil {
resultErr = err
}
}
return resultErr
}
func (this *FileStorage) fp() *os.File {
path := this.FormatVariables(this.config.Path)
this.filesLocker.Lock()
defer this.filesLocker.Unlock()
fp, ok := this.files[path]
if ok {
return fp
}
// 关闭其他的文件
for _, f := range this.files {
_ = f.Close()
}
// 是否创建文件目录
if this.config.AutoCreate {
dir := filepath.Dir(path)
_, err := os.Stat(dir)
if os.IsNotExist(err) {
err = os.MkdirAll(dir, 0777)
if err != nil {
logs.Error(err)
return nil
}
}
}
// 打开新文件
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
logs.Error(err)
return nil
}
this.files[path] = fp
return fp
}

View File

@@ -1,70 +0,0 @@
package accesslogs
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/Tea"
"testing"
"time"
)
func TestFileStorage_Write(t *testing.T) {
storage := NewFileStorage(&serverconfigs.AccessLogFileStorageConfig{
Path: Tea.Root + "/logs/access-${date}.log",
})
err := storage.Start()
if err != nil {
t.Fatal(err)
}
{
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestPath: "/hello",
},
{
RequestPath: "/world",
},
})
if err != nil {
t.Fatal(err)
}
}
{
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestPath: "/1",
},
{
RequestPath: "/2",
},
})
if err != nil {
t.Fatal(err)
}
}
{
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestMethod: "POST",
RequestPath: "/1",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
},
{
RequestMethod: "GET",
RequestPath: "/2",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
},
})
if err != nil {
t.Fatal(err)
}
}
err = storage.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -1,33 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package accesslogs
import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
// StorageInterface 日志存储接口
type StorageInterface interface {
// Version 获取版本
Version() int
// SetVersion 设置版本
SetVersion(version int)
// SetFirewallOnly 设置是否只处理防火墙相关的访问日志
SetFirewallOnly(firewallOnly bool)
IsOk() bool
SetOk(ok bool)
// Config 获取配置
Config() interface{}
// Start 开启
Start() error
// Write 写入日志
Write(accessLogs []*pb.HTTPAccessLog) error
// Close 关闭
Close() error
}

View File

@@ -1,185 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package accesslogs
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
"sync"
"time"
)
var SharedStorageManager = NewStorageManager()
type StorageManager struct {
storageMap map[int64]StorageInterface // policyId => Storage
locker sync.Mutex
}
func NewStorageManager() *StorageManager {
return &StorageManager{
storageMap: map[int64]StorageInterface{},
}
}
func (this *StorageManager) Start() {
var ticker = time.NewTicker(1 * time.Minute)
if Tea.IsTesting() {
ticker = time.NewTicker(5 * time.Second)
}
// 启动时执行一次
var err = this.Loop()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
}
// 循环执行
for range ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
}
}
}
// Loop 更新
func (this *StorageManager) Loop() error {
policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(nil)
if err != nil {
return err
}
var policyIds = []int64{}
for _, policy := range policies {
if policy.IsOn {
policyIds = append(policyIds, int64(policy.Id))
}
}
this.locker.Lock()
defer this.locker.Unlock()
// 关闭不用的
for policyId, storage := range this.storageMap {
if !lists.ContainsInt64(policyIds, policyId) {
err := storage.Close()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error())
}
delete(this.storageMap, policyId)
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'")
}
}
for _, policy := range policies {
var policyId = int64(policy.Id)
storage, ok := this.storageMap[policyId]
if ok {
// 检查配置是否有变更
if types.Int(policy.Version) != storage.Version() {
err = storage.Close()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error())
// 继续往下执行
}
if len(policy.Options) > 0 {
err = json.Unmarshal(policy.Options, storage.Config())
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error())
storage.SetOk(false)
continue
}
}
storage.SetVersion(types.Int(policy.Version))
storage.SetFirewallOnly(policy.FirewallOnly == 1)
err := storage.Start()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetOk(true)
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'")
}
} else {
storage, err := this.createStorage(policy.Type, policy.Options)
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetVersion(types.Int(policy.Version))
storage.SetFirewallOnly(policy.FirewallOnly == 1)
this.storageMap[policyId] = storage
err = storage.Start()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetOk(true)
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'")
}
}
return nil
}
func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) {
switch storageType {
case serverconfigs.AccessLogStorageTypeFile:
var config = &serverconfigs.AccessLogFileStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewFileStorage(config), nil
case serverconfigs.AccessLogStorageTypeES:
var config = &serverconfigs.AccessLogESStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewESStorage(config), nil
case serverconfigs.AccessLogStorageTypeTCP:
var config = &serverconfigs.AccessLogTCPStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewTCPStorage(config), nil
case serverconfigs.AccessLogStorageTypeSyslog:
var config = &serverconfigs.AccessLogSyslogStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewSyslogStorage(config), nil
case serverconfigs.AccessLogStorageTypeCommand:
var config = &serverconfigs.AccessLogCommandStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewCommandStorage(config), nil
}
return nil, errors.New("invalid policy type '" + storageType + "'")
}

View File

@@ -1,17 +0,0 @@
package accesslogs
import (
"github.com/iwind/TeaGo/dbs"
"testing"
)
func TestStorageManager_Loop(t *testing.T) {
dbs.NotifyReady()
var storage = NewStorageManager()
err := storage.Loop()
if err != nil {
t.Fatal(err)
}
t.Log(storage.storageMap)
}

View File

@@ -1,14 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
//go:build !plus
package accesslogs
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 写入日志
func (this *StorageManager) Write(policyId int64, accessLogs []*pb.HTTPAccessLog) (success bool, failMessage string, err error) {
return false, "only works in plus version", nil
}

View File

@@ -1,146 +0,0 @@
package accesslogs
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"os/exec"
"runtime"
"strconv"
)
type SyslogStorageProtocol = string
const (
SyslogStorageProtocolTCP SyslogStorageProtocol = "tcp"
SyslogStorageProtocolUDP SyslogStorageProtocol = "udp"
SyslogStorageProtocolNone SyslogStorageProtocol = "none"
SyslogStorageProtocolSocket SyslogStorageProtocol = "socket"
)
type SyslogStoragePriority = int
// SyslogStorage syslog存储策略
type SyslogStorage struct {
BaseStorage
config *serverconfigs.AccessLogSyslogStorageConfig
exe string
}
func NewSyslogStorage(config *serverconfigs.AccessLogSyslogStorageConfig) *SyslogStorage {
return &SyslogStorage{config: config}
}
func (this *SyslogStorage) Config() interface{} {
return this.config
}
// Start 开启
func (this *SyslogStorage) Start() error {
if runtime.GOOS != "linux" {
return errors.New("'syslog' storage only works on linux")
}
exe, err := exec.LookPath("logger")
if err != nil {
return err
}
this.exe = exe
return nil
}
// 写入日志
func (this *SyslogStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
if len(accessLogs) == 0 {
return nil
}
args := []string{}
if len(this.config.Tag) > 0 {
args = append(args, "-t", this.config.Tag)
}
if this.config.Priority >= 0 {
args = append(args, "-p", strconv.Itoa(this.config.Priority))
}
switch this.config.Protocol {
case SyslogStorageProtocolTCP:
args = append(args, "-T")
if len(this.config.ServerAddr) > 0 {
args = append(args, "-n", this.config.ServerAddr)
}
if this.config.ServerPort > 0 {
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
}
case SyslogStorageProtocolUDP:
args = append(args, "-d")
if len(this.config.ServerAddr) > 0 {
args = append(args, "-n", this.config.ServerAddr)
}
if this.config.ServerPort > 0 {
args = append(args, "-P", strconv.Itoa(this.config.ServerPort))
}
case SyslogStorageProtocolSocket:
args = append(args, "-u")
args = append(args, this.config.Socket)
case SyslogStorageProtocolNone:
// do nothing
}
args = append(args, "-S", "10240")
var cmd = exec.Command(this.exe, args...)
var stderrBuffer = &bytes.Buffer{}
cmd.Stderr = stderrBuffer
w, err := cmd.StdinPipe()
if err != nil {
return err
}
err = cmd.Start()
if err != nil {
return err
}
for _, accessLog := range accessLogs {
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
continue
}
data, err := this.Marshal(accessLog)
if err != nil {
remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "marshal accesslog failed: "+err.Error())
continue
}
_, err = w.Write(data)
if err != nil {
logs.Error(err)
}
_, err = w.Write([]byte("\n"))
if err != nil {
remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "write accesslog failed: "+err.Error())
}
}
_ = w.Close()
err = cmd.Wait()
if err != nil {
return errors.New("send syslog failed: " + err.Error() + ", stderr: " + stderrBuffer.String())
}
return nil
}
// Close 关闭
func (this *SyslogStorage) Close() error {
return nil
}

View File

@@ -1,114 +0,0 @@
package accesslogs
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"net"
"sync"
)
// TCPStorage TCP存储策略
type TCPStorage struct {
BaseStorage
config *serverconfigs.AccessLogTCPStorageConfig
writeLocker sync.Mutex
connLocker sync.Mutex
conn net.Conn
}
func NewTCPStorage(config *serverconfigs.AccessLogTCPStorageConfig) *TCPStorage {
return &TCPStorage{config: config}
}
func (this *TCPStorage) Config() interface{} {
return this.config
}
// Start 开启
func (this *TCPStorage) Start() error {
if len(this.config.Network) == 0 {
return errors.New("'network' should not be empty")
}
if len(this.config.Addr) == 0 {
return errors.New("'addr' should not be empty")
}
return nil
}
// 写入日志
func (this *TCPStorage) Write(accessLogs []*pb.HTTPAccessLog) error {
if len(accessLogs) == 0 {
return nil
}
err := this.connect()
if err != nil {
return err
}
conn := this.conn
if conn == nil {
return errors.New("connection should not be nil")
}
this.writeLocker.Lock()
defer this.writeLocker.Unlock()
for _, accessLog := range accessLogs {
if this.firewallOnly && accessLog.FirewallPolicyId == 0 {
continue
}
data, err := this.Marshal(accessLog)
if err != nil {
logs.Error(err)
continue
}
_, err = conn.Write(data)
if err != nil {
_ = this.Close()
break
}
_, err = conn.Write([]byte("\n"))
if err != nil {
_ = this.Close()
break
}
}
return nil
}
// Close 关闭
func (this *TCPStorage) Close() error {
this.connLocker.Lock()
defer this.connLocker.Unlock()
if this.conn != nil {
err := this.conn.Close()
this.conn = nil
return err
}
return nil
}
func (this *TCPStorage) connect() error {
this.connLocker.Lock()
defer this.connLocker.Unlock()
if this.conn != nil {
return nil
}
conn, err := net.Dial(this.config.Network, this.config.Addr)
if err != nil {
return err
}
this.conn = conn
return nil
}

View File

@@ -1,72 +0,0 @@
package accesslogs
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"net"
"testing"
"time"
)
func TestTCPStorage_Write(t *testing.T) {
go func() {
server, err := net.Listen("tcp", "127.0.0.1:9981")
if err != nil {
t.Error(err)
return
}
for {
conn, err := server.Accept()
if err != nil {
break
}
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if n > 0 {
t.Log(string(buf[:n]))
}
if err != nil {
break
}
}
break
}
_ = server.Close()
}()
storage := NewTCPStorage(&serverconfigs.AccessLogTCPStorageConfig{
Network: "tcp",
Addr: "127.0.0.1:9981",
})
err := storage.Start()
if err != nil {
t.Fatal(err)
}
{
err = storage.Write([]*pb.HTTPAccessLog{
{
RequestMethod: "POST",
RequestPath: "/1",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
},
{
RequestMethod: "GET",
RequestPath: "/2",
TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"),
},
})
if err != nil {
t.Fatal(err)
}
}
time.Sleep(2 * time.Second)
err = storage.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -1,24 +0,0 @@
<?php
// test command storage
// open access log file
$fp = fopen("/tmp/goedge-command-storage.log", "a+");
// read access logs from stdin
$stdin = fopen("php://stdin", "r");
while(true) {
if (feof($stdin)) {
break;
}
$line = fgets($stdin);
// write to access log file
fwrite($fp, $line);
}
// close file pointers
fclose($fp);
fclose($stdin);
?>

View File

@@ -93,7 +93,6 @@ func init() {
}
})
})
}
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {

View File

@@ -5,7 +5,6 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
@@ -159,9 +158,7 @@ func (this *APINode) Start() {
})
// 访问日志存储管理器
goman.New(func() {
accesslogs.SharedStorageManager.Start()
})
this.startAccessLogStorages()
// 监听RPC服务
remotelogs.Println("API_NODE", "starting RPC server ...")

View File

@@ -0,0 +1,8 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package nodes
func (this *APINode) startAccessLogStorages() {
}

View File

@@ -102,11 +102,6 @@ func (this *APINode) registerServices(server *grpc.Server) {
pb.RegisterHTTPPageServiceServer(server, instance)
this.rest(instance)
}
{
var instance = this.serviceInstance(&services.HTTPAccessLogPolicyService{}).(*services.HTTPAccessLogPolicyService)
pb.RegisterHTTPAccessLogPolicyServiceServer(server, instance)
this.rest(instance)
}
{
var instance = this.serviceInstance(&services.HTTPCachePolicyService{}).(*services.HTTPCachePolicyService)
pb.RegisterHTTPCachePolicyServiceServer(server, instance)

View File

@@ -2,7 +2,6 @@ package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
@@ -37,17 +36,10 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req
return nil, err
}
// 发送到访问日志策略
policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx)
err = this.writeAccessLogsToPolicy(req.HttpAccessLogs)
if err != nil {
return nil, err
}
if policyId > 0 {
_, _, err = accesslogs.SharedStorageManager.Write(policyId, req.HttpAccessLogs)
if err != nil {
return nil, err
}
}
return &pb.CreateHTTPAccessLogsResponse{}, nil
}

View File

@@ -0,0 +1,10 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package services
import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
func (this *HTTPAccessLogService) writeAccessLogsToPolicy(pbAccessLogs []*pb.HTTPAccessLog) error {
return nil
}

View File

@@ -1,165 +0,0 @@
package services
import (
"context"
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
type HTTPAccessLogPolicyService struct {
BaseService
}
// CountAllHTTPAccessLogPolicies 计算访问日志策略数量
func (this *HTTPAccessLogPolicyService) CountAllHTTPAccessLogPolicies(ctx context.Context, req *pb.CountAllHTTPAccessLogPoliciesRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedHTTPAccessLogPolicyDAO.CountAllEnabledPolicies(tx)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListHTTPAccessLogPolicies 列出单页访问日志策略
func (this *HTTPAccessLogPolicyService) ListHTTPAccessLogPolicies(ctx context.Context, req *pb.ListHTTPAccessLogPoliciesRequest) (*pb.ListHTTPAccessLogPoliciesResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
policies, err := models.SharedHTTPAccessLogPolicyDAO.ListEnabledPolicies(tx, req.Offset, req.Size)
if err != nil {
return nil, err
}
var pbPolicies = []*pb.HTTPAccessLogPolicy{}
for _, policy := range policies {
pbPolicies = append(pbPolicies, &pb.HTTPAccessLogPolicy{
Id: int64(policy.Id),
Name: policy.Name,
IsOn: policy.IsOn,
Type: policy.Type,
OptionsJSON: policy.Options,
CondsJSON: policy.Conds,
IsPublic: policy.IsPublic,
FirewallOnly: policy.FirewallOnly == 1,
})
}
return &pb.ListHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil
}
// CreateHTTPAccessLogPolicy 创建访问日志策略
func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Context, req *pb.CreateHTTPAccessLogPolicyRequest) (*pb.CreateHTTPAccessLogPolicyResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 取消别的Public
if req.IsPublic {
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
if err != nil {
return nil, err
}
}
// 创建
policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, req.Type, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly)
if err != nil {
return nil, err
}
return &pb.CreateHTTPAccessLogPolicyResponse{HttpAccessLogPolicyId: policyId}, nil
}
// UpdateHTTPAccessLogPolicy 修改访问日志策略
func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Context, req *pb.UpdateHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 取消别的Public
if req.IsPublic {
err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx)
if err != nil {
return nil, err
}
}
// 保存修改
err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly, req.IsOn)
if err != nil {
return nil, err
}
return this.Success()
}
// FindHTTPAccessLogPolicy 查找单个访问日志策略
func (this *HTTPAccessLogPolicyService) FindHTTPAccessLogPolicy(ctx context.Context, req *pb.FindHTTPAccessLogPolicyRequest) (*pb.FindHTTPAccessLogPolicyResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
if err != nil {
return nil, err
}
if policy == nil {
return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: nil}, nil
}
return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: &pb.HTTPAccessLogPolicy{
Id: int64(policy.Id),
Name: policy.Name,
IsOn: policy.IsOn,
Type: policy.Type,
OptionsJSON: policy.Options,
CondsJSON: policy.Conds,
IsPublic: policy.IsPublic,
FirewallOnly: policy.FirewallOnly == 1,
}}, nil
}
// DeleteHTTPAccessLogPolicy 删除访问日志策略
func (this *HTTPAccessLogPolicyService) DeleteHTTPAccessLogPolicy(ctx context.Context, req *pb.DeleteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedHTTPAccessLogPolicyDAO.DisableHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId)
if err != nil {
return nil, err
}
return this.Success()
}
// WriteHTTPAccessLogPolicy 测试写入某个访问日志策略
func (this *HTTPAccessLogPolicyService) WriteHTTPAccessLogPolicy(ctx context.Context, req *pb.WriteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
success, failMessage, err := accesslogs.SharedStorageManager.Write(req.HttpAccessLogPolicyId, []*pb.HTTPAccessLog{req.HttpAccessLog})
if err != nil {
return nil, err
}
if !success {
return nil, errors.New("test failed: " + failMessage)
}
return this.Success()
}