diff --git a/internal/accesslogs/storage_base.go b/internal/accesslogs/storage_base.go deleted file mode 100644 index fe7af2ba..00000000 --- a/internal/accesslogs/storage_base.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -package accesslogs - -import ( - "encoding/json" - "fmt" - "github.com/TeaOSLab/EdgeCommon/pkg/configutils" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "strconv" - "time" -) - -type BaseStorage struct { - isOk bool - version int - firewallOnly bool -} - -func (this *BaseStorage) SetVersion(version int) { - this.version = version -} - -func (this *BaseStorage) Version() int { - return this.version -} - -func (this *BaseStorage) IsOk() bool { - return this.isOk -} - -func (this *BaseStorage) SetOk(isOk bool) { - this.isOk = isOk -} - -func (this *BaseStorage) SetFirewallOnly(firewallOnly bool) { - this.firewallOnly = firewallOnly -} - -// Marshal 对日志进行编码 -func (this *BaseStorage) Marshal(accessLog *pb.HTTPAccessLog) ([]byte, error) { - return json.Marshal(accessLog) -} - -// FormatVariables 格式化字符串中的变量 -func (this *BaseStorage) FormatVariables(s string) string { - var now = time.Now() - return configutils.ParseVariables(s, func(varName string) (value string) { - switch varName { - case "year": - return strconv.Itoa(now.Year()) - case "month": - return fmt.Sprintf("%02d", now.Month()) - case "week": - _, week := now.ISOWeek() - return fmt.Sprintf("%02d", week) - case "day": - return fmt.Sprintf("%02d", now.Day()) - case "hour": - return fmt.Sprintf("%02d", now.Hour()) - case "minute": - return fmt.Sprintf("%02d", now.Minute()) - case "second": - return fmt.Sprintf("%02d", now.Second()) - case "date": - return fmt.Sprintf("%d%02d%02d", now.Year(), now.Month(), now.Day()) - } - - return varName - }) -} diff --git a/internal/accesslogs/storage_command.go b/internal/accesslogs/storage_command.go deleted file mode 100644 index c11b17ab..00000000 --- a/internal/accesslogs/storage_command.go +++ /dev/null @@ -1,99 +0,0 @@ -package accesslogs - -import ( - "bytes" - "errors" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" - "os/exec" - "sync" -) - -// CommandStorage 通过命令行存储 -type CommandStorage struct { - BaseStorage - - config *serverconfigs.AccessLogCommandStorageConfig - - writeLocker sync.Mutex -} - -func NewCommandStorage(config *serverconfigs.AccessLogCommandStorageConfig) *CommandStorage { - return &CommandStorage{config: config} -} - -func (this *CommandStorage) Config() interface{} { - return this.config -} - -// Start 启动 -func (this *CommandStorage) Start() error { - if len(this.config.Command) == 0 { - return errors.New("'command' should not be empty") - } - return nil -} - -// 写入日志 -func (this *CommandStorage) Write(accessLogs []*pb.HTTPAccessLog) error { - if len(accessLogs) == 0 { - return nil - } - - this.writeLocker.Lock() - defer this.writeLocker.Unlock() - - cmd := exec.Command(this.config.Command, this.config.Args...) - if len(this.config.Dir) > 0 { - cmd.Dir = this.config.Dir - } - - stdout := bytes.NewBuffer([]byte{}) - cmd.Stdout = stdout - - w, err := cmd.StdinPipe() - if err != nil { - return err - } - err = cmd.Start() - if err != nil { - return err - } - for _, accessLog := range accessLogs { - if this.firewallOnly && accessLog.FirewallPolicyId == 0 { - continue - } - - data, err := this.Marshal(accessLog) - if err != nil { - logs.Error(err) - continue - } - _, err = w.Write(data) - if err != nil { - logs.Error(err) - } - - _, err = w.Write([]byte("\n")) - if err != nil { - logs.Error(err) - } - } - _ = w.Close() - err = cmd.Wait() - if err != nil { - logs.Error(err) - - if stdout.Len() > 0 { - logs.Error(errors.New(string(stdout.Bytes()))) - } - } - - return nil -} - -// Close 关闭 -func (this *CommandStorage) Close() error { - return nil -} diff --git a/internal/accesslogs/storage_command_test.go b/internal/accesslogs/storage_command_test.go deleted file mode 100644 index 2bc6cb1f..00000000 --- a/internal/accesslogs/storage_command_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package accesslogs - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "os" - "os/exec" - "testing" - "time" -) - -func TestCommandStorage_Write(t *testing.T) { - php, err := exec.LookPath("php") - if err != nil { // not found php, so we can not test - t.Log("php:", err) - return - } - - cwd, err := os.Getwd() - if err != nil { - t.Fatal(err) - } - - before := time.Now() - - storage := NewCommandStorage(&serverconfigs.AccessLogCommandStorageConfig{ - Command: php, - Args: []string{cwd + "/tests/command_storage.php"}, - }) - err = storage.Start() - if err != nil { - t.Fatal(err) - } - - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestMethod: "GET", - RequestPath: "/hello", - }, - { - RequestMethod: "GET", - RequestPath: "/world", - }, - { - RequestMethod: "GET", - RequestPath: "/lu", - }, - { - RequestMethod: "GET", - RequestPath: "/ping", - }, - }) - if err != nil { - t.Fatal(err) - } - - err = storage.Close() - if err != nil { - t.Fatal(err) - } - - t.Log(time.Since(before).Seconds(), "seconds") -} diff --git a/internal/accesslogs/storage_es.go b/internal/accesslogs/storage_es.go deleted file mode 100644 index 8cd36e32..00000000 --- a/internal/accesslogs/storage_es.go +++ /dev/null @@ -1,131 +0,0 @@ -package accesslogs - -import ( - "encoding/base64" - "encoding/json" - "errors" - "fmt" - teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" - "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" - "github.com/TeaOSLab/EdgeAPI/internal/utils" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "io" - "net/http" - "regexp" - "strings" - "time" -) - -// ESStorage ElasticSearch存储策略 -type ESStorage struct { - BaseStorage - - config *serverconfigs.AccessLogESStorageConfig -} - -func NewESStorage(config *serverconfigs.AccessLogESStorageConfig) *ESStorage { - return &ESStorage{config: config} -} - -func (this *ESStorage) Config() interface{} { - return this.config -} - -// Start 开启 -func (this *ESStorage) Start() error { - if len(this.config.Endpoint) == 0 { - return errors.New("'endpoint' should not be nil") - } - if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.config.Endpoint) { - this.config.Endpoint = "http://" + this.config.Endpoint - } - if len(this.config.Index) == 0 { - return errors.New("'index' should not be nil") - } - if len(this.config.MappingType) == 0 { - return errors.New("'mappingType' should not be nil") - } - return nil -} - -// 写入日志 -func (this *ESStorage) Write(accessLogs []*pb.HTTPAccessLog) error { - if len(accessLogs) == 0 { - return nil - } - - bulk := &strings.Builder{} - indexName := this.FormatVariables(this.config.Index) - typeName := this.FormatVariables(this.config.MappingType) - for _, accessLog := range accessLogs { - if this.firewallOnly && accessLog.FirewallPolicyId == 0 { - continue - } - - if len(accessLog.RequestId) == 0 { - continue - } - - opData, err := json.Marshal(map[string]interface{}{ - "index": map[string]interface{}{ - "_index": indexName, - "_type": typeName, - "_id": accessLog.RequestId, - }, - }) - if err != nil { - remotelogs.Error("ACCESS_LOG_ES_STORAGE", "write failed: "+err.Error()) - continue - } - - data, err := this.Marshal(accessLog) - if err != nil { - remotelogs.Error("ACCESS_LOG_ES_STORAGE", "marshal data failed: "+err.Error()) - continue - } - - bulk.Write(opData) - bulk.WriteString("\n") - bulk.Write(data) - bulk.WriteString("\n") - } - - if bulk.Len() == 0 { - return nil - } - - req, err := http.NewRequest(http.MethodPost, this.config.Endpoint+"/_bulk", strings.NewReader(bulk.String())) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", strings.ReplaceAll(teaconst.ProductName, " ", "-")+"/"+teaconst.Version) - if len(this.config.Username) > 0 || len(this.config.Password) > 0 { - req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(this.config.Username+":"+this.config.Password))) - } - client := utils.SharedHttpClient(10 * time.Second) - defer func() { - _ = req.Body.Close() - }() - - resp, err := client.Do(req) - if err != nil { - return err - } - defer func() { - _ = resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - bodyData, _ := io.ReadAll(resp.Body) - return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData)) - } - - return nil -} - -// Close 关闭 -func (this *ESStorage) Close() error { - return nil -} diff --git a/internal/accesslogs/storage_es_test.go b/internal/accesslogs/storage_es_test.go deleted file mode 100644 index 7c60babf..00000000 --- a/internal/accesslogs/storage_es_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package accesslogs - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "testing" - "time" -) - -func TestESStorage_Write(t *testing.T) { - storage := NewESStorage(&serverconfigs.AccessLogESStorageConfig{ - Endpoint: "http://127.0.0.1:9200", - Index: "logs", - MappingType: "accessLogs", - Username: "hello", - Password: "world", - }) - err := storage.Start() - if err != nil { - t.Fatal(err) - } - - { - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestMethod: "POST", - RequestPath: "/1", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - TimeISO8601: "2018-07-23T22:23:35+08:00", - Header: map[string]*pb.Strings{ - "Content-Type": {Values: []string{"text/html"}}, - }, - }, - { - RequestMethod: "GET", - RequestPath: "/2", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - TimeISO8601: "2018-07-23T22:23:35+08:00", - Header: map[string]*pb.Strings{ - "Content-Type": {Values: []string{"text/css"}}, - }, - }, - }) - if err != nil { - t.Fatal(err) - } - } - - err = storage.Close() - if err != nil { - t.Fatal(err) - } -} diff --git a/internal/accesslogs/storage_file.go b/internal/accesslogs/storage_file.go deleted file mode 100644 index c5345ca6..00000000 --- a/internal/accesslogs/storage_file.go +++ /dev/null @@ -1,130 +0,0 @@ -package accesslogs - -import ( - "errors" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" - "os" - "path/filepath" - "sync" -) - -// FileStorage 文件存储策略 -type FileStorage struct { - BaseStorage - - config *serverconfigs.AccessLogFileStorageConfig - - writeLocker sync.Mutex - - files map[string]*os.File // path => *File - filesLocker sync.Mutex -} - -func NewFileStorage(config *serverconfigs.AccessLogFileStorageConfig) *FileStorage { - return &FileStorage{ - config: config, - } -} - -func (this *FileStorage) Config() interface{} { - return this.config -} - -// Start 开启 -func (this *FileStorage) Start() error { - if len(this.config.Path) == 0 { - return errors.New("'path' should not be empty") - } - - this.files = map[string]*os.File{} - - return nil -} - -// Write 写入日志 -func (this *FileStorage) Write(accessLogs []*pb.HTTPAccessLog) error { - if len(accessLogs) == 0 { - return nil - } - - fp := this.fp() - if fp == nil { - return errors.New("file pointer should not be nil") - } - this.writeLocker.Lock() - defer this.writeLocker.Unlock() - - for _, accessLog := range accessLogs { - if this.firewallOnly && accessLog.FirewallPolicyId == 0 { - continue - } - data, err := this.Marshal(accessLog) - if err != nil { - logs.Error(err) - continue - } - _, err = fp.Write(data) - if err != nil { - _ = this.Close() - break - } - _, _ = fp.WriteString("\n") - } - return nil -} - -// Close 关闭 -func (this *FileStorage) Close() error { - this.filesLocker.Lock() - defer this.filesLocker.Unlock() - - var resultErr error - for _, f := range this.files { - err := f.Close() - if err != nil { - resultErr = err - } - } - return resultErr -} - -func (this *FileStorage) fp() *os.File { - path := this.FormatVariables(this.config.Path) - - this.filesLocker.Lock() - defer this.filesLocker.Unlock() - fp, ok := this.files[path] - if ok { - return fp - } - - // 关闭其他的文件 - for _, f := range this.files { - _ = f.Close() - } - - // 是否创建文件目录 - if this.config.AutoCreate { - dir := filepath.Dir(path) - _, err := os.Stat(dir) - if os.IsNotExist(err) { - err = os.MkdirAll(dir, 0777) - if err != nil { - logs.Error(err) - return nil - } - } - } - - // 打开新文件 - fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - logs.Error(err) - return nil - } - this.files[path] = fp - - return fp -} diff --git a/internal/accesslogs/storage_file_test.go b/internal/accesslogs/storage_file_test.go deleted file mode 100644 index d26a4d7d..00000000 --- a/internal/accesslogs/storage_file_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package accesslogs - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/Tea" - "testing" - "time" -) - -func TestFileStorage_Write(t *testing.T) { - storage := NewFileStorage(&serverconfigs.AccessLogFileStorageConfig{ - Path: Tea.Root + "/logs/access-${date}.log", - }) - err := storage.Start() - if err != nil { - t.Fatal(err) - } - - { - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestPath: "/hello", - }, - { - RequestPath: "/world", - }, - }) - if err != nil { - t.Fatal(err) - } - } - - { - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestPath: "/1", - }, - { - RequestPath: "/2", - }, - }) - if err != nil { - t.Fatal(err) - } - } - - { - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestMethod: "POST", - RequestPath: "/1", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - }, - { - RequestMethod: "GET", - RequestPath: "/2", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - }, - }) - if err != nil { - t.Fatal(err) - } - } - - err = storage.Close() - if err != nil { - t.Fatal(err) - } -} diff --git a/internal/accesslogs/storage_interface.go b/internal/accesslogs/storage_interface.go deleted file mode 100644 index 9d052b6e..00000000 --- a/internal/accesslogs/storage_interface.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -package accesslogs - -import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - -// StorageInterface 日志存储接口 -type StorageInterface interface { - // Version 获取版本 - Version() int - - // SetVersion 设置版本 - SetVersion(version int) - - // SetFirewallOnly 设置是否只处理防火墙相关的访问日志 - SetFirewallOnly(firewallOnly bool) - - IsOk() bool - - SetOk(ok bool) - - // Config 获取配置 - Config() interface{} - - // Start 开启 - Start() error - - // Write 写入日志 - Write(accessLogs []*pb.HTTPAccessLog) error - - // Close 关闭 - Close() error -} diff --git a/internal/accesslogs/storage_manager.go b/internal/accesslogs/storage_manager.go deleted file mode 100644 index 4c6b2b4f..00000000 --- a/internal/accesslogs/storage_manager.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -package accesslogs - -import ( - "encoding/json" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/errors" - "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/lists" - "github.com/iwind/TeaGo/types" - "sync" - "time" -) - -var SharedStorageManager = NewStorageManager() - -type StorageManager struct { - storageMap map[int64]StorageInterface // policyId => Storage - - locker sync.Mutex -} - -func NewStorageManager() *StorageManager { - return &StorageManager{ - storageMap: map[int64]StorageInterface{}, - } -} - -func (this *StorageManager) Start() { - var ticker = time.NewTicker(1 * time.Minute) - if Tea.IsTesting() { - ticker = time.NewTicker(5 * time.Second) - } - - // 启动时执行一次 - var err = this.Loop() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error()) - } - - // 循环执行 - for range ticker.C { - err := this.Loop() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error()) - } - } -} - -// Loop 更新 -func (this *StorageManager) Loop() error { - policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(nil) - if err != nil { - return err - } - var policyIds = []int64{} - for _, policy := range policies { - if policy.IsOn { - policyIds = append(policyIds, int64(policy.Id)) - } - } - - this.locker.Lock() - defer this.locker.Unlock() - - // 关闭不用的 - for policyId, storage := range this.storageMap { - if !lists.ContainsInt64(policyIds, policyId) { - err := storage.Close() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error()) - } - delete(this.storageMap, policyId) - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'") - } - } - - for _, policy := range policies { - var policyId = int64(policy.Id) - storage, ok := this.storageMap[policyId] - if ok { - // 检查配置是否有变更 - if types.Int(policy.Version) != storage.Version() { - err = storage.Close() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error()) - - // 继续往下执行 - } - - if len(policy.Options) > 0 { - err = json.Unmarshal(policy.Options, storage.Config()) - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error()) - storage.SetOk(false) - continue - } - } - - storage.SetVersion(types.Int(policy.Version)) - storage.SetFirewallOnly(policy.FirewallOnly == 1) - err := storage.Start() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error()) - continue - } - storage.SetOk(true) - remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'") - } - } else { - storage, err := this.createStorage(policy.Type, policy.Options) - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error()) - continue - } - storage.SetVersion(types.Int(policy.Version)) - storage.SetFirewallOnly(policy.FirewallOnly == 1) - this.storageMap[policyId] = storage - err = storage.Start() - if err != nil { - remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error()) - continue - } - storage.SetOk(true) - remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'") - } - } - - return nil -} - -func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) { - switch storageType { - case serverconfigs.AccessLogStorageTypeFile: - var config = &serverconfigs.AccessLogFileStorageConfig{} - if len(optionsJSON) > 0 { - err := json.Unmarshal(optionsJSON, config) - if err != nil { - return nil, err - } - } - return NewFileStorage(config), nil - case serverconfigs.AccessLogStorageTypeES: - var config = &serverconfigs.AccessLogESStorageConfig{} - if len(optionsJSON) > 0 { - err := json.Unmarshal(optionsJSON, config) - if err != nil { - return nil, err - } - } - return NewESStorage(config), nil - case serverconfigs.AccessLogStorageTypeTCP: - var config = &serverconfigs.AccessLogTCPStorageConfig{} - if len(optionsJSON) > 0 { - err := json.Unmarshal(optionsJSON, config) - if err != nil { - return nil, err - } - } - return NewTCPStorage(config), nil - case serverconfigs.AccessLogStorageTypeSyslog: - var config = &serverconfigs.AccessLogSyslogStorageConfig{} - if len(optionsJSON) > 0 { - err := json.Unmarshal(optionsJSON, config) - if err != nil { - return nil, err - } - } - return NewSyslogStorage(config), nil - case serverconfigs.AccessLogStorageTypeCommand: - var config = &serverconfigs.AccessLogCommandStorageConfig{} - if len(optionsJSON) > 0 { - err := json.Unmarshal(optionsJSON, config) - if err != nil { - return nil, err - } - } - return NewCommandStorage(config), nil - } - - return nil, errors.New("invalid policy type '" + storageType + "'") -} diff --git a/internal/accesslogs/storage_manager_test.go b/internal/accesslogs/storage_manager_test.go deleted file mode 100644 index 8b157943..00000000 --- a/internal/accesslogs/storage_manager_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package accesslogs - -import ( - "github.com/iwind/TeaGo/dbs" - "testing" -) - -func TestStorageManager_Loop(t *testing.T) { - dbs.NotifyReady() - - var storage = NewStorageManager() - err := storage.Loop() - if err != nil { - t.Fatal(err) - } - t.Log(storage.storageMap) -} diff --git a/internal/accesslogs/storage_manager_write.go b/internal/accesslogs/storage_manager_write.go deleted file mode 100644 index f8b212f2..00000000 --- a/internal/accesslogs/storage_manager_write.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -//go:build !plus - -package accesslogs - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" -) - -// 写入日志 -func (this *StorageManager) Write(policyId int64, accessLogs []*pb.HTTPAccessLog) (success bool, failMessage string, err error) { - return false, "only works in plus version", nil -} diff --git a/internal/accesslogs/storage_syslog.go b/internal/accesslogs/storage_syslog.go deleted file mode 100644 index 2c878a3a..00000000 --- a/internal/accesslogs/storage_syslog.go +++ /dev/null @@ -1,146 +0,0 @@ -package accesslogs - -import ( - "bytes" - "errors" - "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" - "os/exec" - "runtime" - "strconv" -) - -type SyslogStorageProtocol = string - -const ( - SyslogStorageProtocolTCP SyslogStorageProtocol = "tcp" - SyslogStorageProtocolUDP SyslogStorageProtocol = "udp" - SyslogStorageProtocolNone SyslogStorageProtocol = "none" - SyslogStorageProtocolSocket SyslogStorageProtocol = "socket" -) - -type SyslogStoragePriority = int - -// SyslogStorage syslog存储策略 -type SyslogStorage struct { - BaseStorage - - config *serverconfigs.AccessLogSyslogStorageConfig - - exe string -} - -func NewSyslogStorage(config *serverconfigs.AccessLogSyslogStorageConfig) *SyslogStorage { - return &SyslogStorage{config: config} -} - -func (this *SyslogStorage) Config() interface{} { - return this.config -} - -// Start 开启 -func (this *SyslogStorage) Start() error { - if runtime.GOOS != "linux" { - return errors.New("'syslog' storage only works on linux") - } - - exe, err := exec.LookPath("logger") - if err != nil { - return err - } - - this.exe = exe - - return nil -} - -// 写入日志 -func (this *SyslogStorage) Write(accessLogs []*pb.HTTPAccessLog) error { - if len(accessLogs) == 0 { - return nil - } - - args := []string{} - if len(this.config.Tag) > 0 { - args = append(args, "-t", this.config.Tag) - } - - if this.config.Priority >= 0 { - args = append(args, "-p", strconv.Itoa(this.config.Priority)) - } - - switch this.config.Protocol { - case SyslogStorageProtocolTCP: - args = append(args, "-T") - if len(this.config.ServerAddr) > 0 { - args = append(args, "-n", this.config.ServerAddr) - } - if this.config.ServerPort > 0 { - args = append(args, "-P", strconv.Itoa(this.config.ServerPort)) - } - case SyslogStorageProtocolUDP: - args = append(args, "-d") - if len(this.config.ServerAddr) > 0 { - args = append(args, "-n", this.config.ServerAddr) - } - if this.config.ServerPort > 0 { - args = append(args, "-P", strconv.Itoa(this.config.ServerPort)) - } - case SyslogStorageProtocolSocket: - args = append(args, "-u") - args = append(args, this.config.Socket) - case SyslogStorageProtocolNone: - // do nothing - } - - args = append(args, "-S", "10240") - - var cmd = exec.Command(this.exe, args...) - var stderrBuffer = &bytes.Buffer{} - cmd.Stderr = stderrBuffer - - w, err := cmd.StdinPipe() - if err != nil { - return err - } - err = cmd.Start() - if err != nil { - return err - } - - for _, accessLog := range accessLogs { - if this.firewallOnly && accessLog.FirewallPolicyId == 0 { - continue - } - data, err := this.Marshal(accessLog) - if err != nil { - remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "marshal accesslog failed: "+err.Error()) - continue - } - _, err = w.Write(data) - if err != nil { - logs.Error(err) - } - - _, err = w.Write([]byte("\n")) - if err != nil { - remotelogs.Error("ACCESS_LOG_POLICY_SYSLOG", "write accesslog failed: "+err.Error()) - } - } - - _ = w.Close() - - err = cmd.Wait() - if err != nil { - return errors.New("send syslog failed: " + err.Error() + ", stderr: " + stderrBuffer.String()) - } - - return nil -} - -// Close 关闭 -func (this *SyslogStorage) Close() error { - return nil -} diff --git a/internal/accesslogs/storage_tcp.go b/internal/accesslogs/storage_tcp.go deleted file mode 100644 index c21ab235..00000000 --- a/internal/accesslogs/storage_tcp.go +++ /dev/null @@ -1,114 +0,0 @@ -package accesslogs - -import ( - "errors" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" - "net" - "sync" -) - -// TCPStorage TCP存储策略 -type TCPStorage struct { - BaseStorage - - config *serverconfigs.AccessLogTCPStorageConfig - - writeLocker sync.Mutex - - connLocker sync.Mutex - conn net.Conn -} - -func NewTCPStorage(config *serverconfigs.AccessLogTCPStorageConfig) *TCPStorage { - return &TCPStorage{config: config} -} - -func (this *TCPStorage) Config() interface{} { - return this.config -} - -// Start 开启 -func (this *TCPStorage) Start() error { - if len(this.config.Network) == 0 { - return errors.New("'network' should not be empty") - } - if len(this.config.Addr) == 0 { - return errors.New("'addr' should not be empty") - } - return nil -} - -// 写入日志 -func (this *TCPStorage) Write(accessLogs []*pb.HTTPAccessLog) error { - if len(accessLogs) == 0 { - return nil - } - - err := this.connect() - if err != nil { - return err - } - - conn := this.conn - if conn == nil { - return errors.New("connection should not be nil") - } - - this.writeLocker.Lock() - defer this.writeLocker.Unlock() - - for _, accessLog := range accessLogs { - if this.firewallOnly && accessLog.FirewallPolicyId == 0 { - continue - } - data, err := this.Marshal(accessLog) - if err != nil { - logs.Error(err) - continue - } - _, err = conn.Write(data) - if err != nil { - _ = this.Close() - break - } - _, err = conn.Write([]byte("\n")) - if err != nil { - _ = this.Close() - break - } - } - - return nil -} - -// Close 关闭 -func (this *TCPStorage) Close() error { - this.connLocker.Lock() - defer this.connLocker.Unlock() - - if this.conn != nil { - err := this.conn.Close() - this.conn = nil - return err - } - return nil -} - -func (this *TCPStorage) connect() error { - this.connLocker.Lock() - defer this.connLocker.Unlock() - - if this.conn != nil { - return nil - } - - conn, err := net.Dial(this.config.Network, this.config.Addr) - if err != nil { - return err - } - this.conn = conn - - return nil -} diff --git a/internal/accesslogs/storage_tcp_test.go b/internal/accesslogs/storage_tcp_test.go deleted file mode 100644 index ea1a630e..00000000 --- a/internal/accesslogs/storage_tcp_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package accesslogs - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "net" - "testing" - "time" -) - -func TestTCPStorage_Write(t *testing.T) { - go func() { - server, err := net.Listen("tcp", "127.0.0.1:9981") - if err != nil { - t.Error(err) - return - } - for { - conn, err := server.Accept() - if err != nil { - break - } - - buf := make([]byte, 1024) - for { - n, err := conn.Read(buf) - if n > 0 { - t.Log(string(buf[:n])) - } - if err != nil { - break - } - } - break - } - _ = server.Close() - }() - - storage := NewTCPStorage(&serverconfigs.AccessLogTCPStorageConfig{ - Network: "tcp", - Addr: "127.0.0.1:9981", - }) - err := storage.Start() - if err != nil { - t.Fatal(err) - } - - { - err = storage.Write([]*pb.HTTPAccessLog{ - { - RequestMethod: "POST", - RequestPath: "/1", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - }, - { - RequestMethod: "GET", - RequestPath: "/2", - TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), - }, - }) - if err != nil { - t.Fatal(err) - } - } - - time.Sleep(2 * time.Second) - - err = storage.Close() - if err != nil { - t.Fatal(err) - } -} diff --git a/internal/accesslogs/tests/command_storage.php b/internal/accesslogs/tests/command_storage.php deleted file mode 100644 index 15d2bfc4..00000000 --- a/internal/accesslogs/tests/command_storage.php +++ /dev/null @@ -1,24 +0,0 @@ - \ No newline at end of file diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index bcafc67e..861c3cb4 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -93,7 +93,6 @@ func init() { } }) }) - } func NewHTTPAccessLogDAO() *HTTPAccessLogDAO { diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 47a2749e..83861444 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "encoding/json" "errors" - "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/configs" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" @@ -159,9 +158,7 @@ func (this *APINode) Start() { }) // 访问日志存储管理器 - goman.New(func() { - accesslogs.SharedStorageManager.Start() - }) + this.startAccessLogStorages() // 监听RPC服务 remotelogs.Println("API_NODE", "starting RPC server ...") diff --git a/internal/nodes/api_node_ext.go b/internal/nodes/api_node_ext.go new file mode 100644 index 00000000..97b4613d --- /dev/null +++ b/internal/nodes/api_node_ext.go @@ -0,0 +1,8 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . +//go:build !plus + +package nodes + +func (this *APINode) startAccessLogStorages() { + +} diff --git a/internal/nodes/api_node_services.go b/internal/nodes/api_node_services.go index 58237985..a8df3a74 100644 --- a/internal/nodes/api_node_services.go +++ b/internal/nodes/api_node_services.go @@ -102,11 +102,6 @@ func (this *APINode) registerServices(server *grpc.Server) { pb.RegisterHTTPPageServiceServer(server, instance) this.rest(instance) } - { - var instance = this.serviceInstance(&services.HTTPAccessLogPolicyService{}).(*services.HTTPAccessLogPolicyService) - pb.RegisterHTTPAccessLogPolicyServiceServer(server, instance) - this.rest(instance) - } { var instance = this.serviceInstance(&services.HTTPCachePolicyService{}).(*services.HTTPCachePolicyService) pb.RegisterHTTPCachePolicyServiceServer(server, instance) diff --git a/internal/rpc/services/service_http_access_log.go b/internal/rpc/services/service_http_access_log.go index 43ed244c..acd3e320 100644 --- a/internal/rpc/services/service_http_access_log.go +++ b/internal/rpc/services/service_http_access_log.go @@ -2,7 +2,6 @@ package services import ( "context" - "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" @@ -37,17 +36,10 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req return nil, err } - // 发送到访问日志策略 - policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx) + err = this.writeAccessLogsToPolicy(req.HttpAccessLogs) if err != nil { return nil, err } - if policyId > 0 { - _, _, err = accesslogs.SharedStorageManager.Write(policyId, req.HttpAccessLogs) - if err != nil { - return nil, err - } - } return &pb.CreateHTTPAccessLogsResponse{}, nil } diff --git a/internal/rpc/services/service_http_access_log_ext.go b/internal/rpc/services/service_http_access_log_ext.go new file mode 100644 index 00000000..a46c202c --- /dev/null +++ b/internal/rpc/services/service_http_access_log_ext.go @@ -0,0 +1,10 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . +//go:build !plus + +package services + +import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + +func (this *HTTPAccessLogService) writeAccessLogsToPolicy(pbAccessLogs []*pb.HTTPAccessLog) error { + return nil +} diff --git a/internal/rpc/services/service_http_access_log_policy.go b/internal/rpc/services/service_http_access_log_policy.go deleted file mode 100644 index f0426d06..00000000 --- a/internal/rpc/services/service_http_access_log_policy.go +++ /dev/null @@ -1,165 +0,0 @@ -package services - -import ( - "context" - "errors" - "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" -) - -type HTTPAccessLogPolicyService struct { - BaseService -} - -// CountAllHTTPAccessLogPolicies 计算访问日志策略数量 -func (this *HTTPAccessLogPolicyService) CountAllHTTPAccessLogPolicies(ctx context.Context, req *pb.CountAllHTTPAccessLogPoliciesRequest) (*pb.RPCCountResponse, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - count, err := models.SharedHTTPAccessLogPolicyDAO.CountAllEnabledPolicies(tx) - if err != nil { - return nil, err - } - return this.SuccessCount(count) -} - -// ListHTTPAccessLogPolicies 列出单页访问日志策略 -func (this *HTTPAccessLogPolicyService) ListHTTPAccessLogPolicies(ctx context.Context, req *pb.ListHTTPAccessLogPoliciesRequest) (*pb.ListHTTPAccessLogPoliciesResponse, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - policies, err := models.SharedHTTPAccessLogPolicyDAO.ListEnabledPolicies(tx, req.Offset, req.Size) - if err != nil { - return nil, err - } - var pbPolicies = []*pb.HTTPAccessLogPolicy{} - for _, policy := range policies { - pbPolicies = append(pbPolicies, &pb.HTTPAccessLogPolicy{ - Id: int64(policy.Id), - Name: policy.Name, - IsOn: policy.IsOn, - Type: policy.Type, - OptionsJSON: policy.Options, - CondsJSON: policy.Conds, - IsPublic: policy.IsPublic, - FirewallOnly: policy.FirewallOnly == 1, - }) - } - return &pb.ListHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil -} - -// CreateHTTPAccessLogPolicy 创建访问日志策略 -func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Context, req *pb.CreateHTTPAccessLogPolicyRequest) (*pb.CreateHTTPAccessLogPolicyResponse, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - - // 取消别的Public - if req.IsPublic { - err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx) - if err != nil { - return nil, err - } - } - - // 创建 - policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, req.Type, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly) - if err != nil { - return nil, err - } - return &pb.CreateHTTPAccessLogPolicyResponse{HttpAccessLogPolicyId: policyId}, nil -} - -// UpdateHTTPAccessLogPolicy 修改访问日志策略 -func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Context, req *pb.UpdateHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - - // 取消别的Public - if req.IsPublic { - err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx) - if err != nil { - return nil, err - } - } - - // 保存修改 - err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly, req.IsOn) - if err != nil { - return nil, err - } - return this.Success() -} - -// FindHTTPAccessLogPolicy 查找单个访问日志策略 -func (this *HTTPAccessLogPolicyService) FindHTTPAccessLogPolicy(ctx context.Context, req *pb.FindHTTPAccessLogPolicyRequest) (*pb.FindHTTPAccessLogPolicyResponse, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId) - if err != nil { - return nil, err - } - if policy == nil { - return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: nil}, nil - } - return &pb.FindHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: &pb.HTTPAccessLogPolicy{ - Id: int64(policy.Id), - Name: policy.Name, - IsOn: policy.IsOn, - Type: policy.Type, - OptionsJSON: policy.Options, - CondsJSON: policy.Conds, - IsPublic: policy.IsPublic, - FirewallOnly: policy.FirewallOnly == 1, - }}, nil -} - -// DeleteHTTPAccessLogPolicy 删除访问日志策略 -func (this *HTTPAccessLogPolicyService) DeleteHTTPAccessLogPolicy(ctx context.Context, req *pb.DeleteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - var tx = this.NullTx() - err = models.SharedHTTPAccessLogPolicyDAO.DisableHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId) - if err != nil { - return nil, err - } - return this.Success() -} - -// WriteHTTPAccessLogPolicy 测试写入某个访问日志策略 -func (this *HTTPAccessLogPolicyService) WriteHTTPAccessLogPolicy(ctx context.Context, req *pb.WriteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { - _, err := this.ValidateAdmin(ctx) - if err != nil { - return nil, err - } - - success, failMessage, err := accesslogs.SharedStorageManager.Write(req.HttpAccessLogPolicyId, []*pb.HTTPAccessLog{req.HttpAccessLog}) - if err != nil { - return nil, err - } - if !success { - return nil, errors.New("test failed: " + failMessage) - } - return this.Success() -}