diff --git a/internal/accesslogs/storage_base.go b/internal/accesslogs/storage_base.go new file mode 100644 index 00000000..deb6abb9 --- /dev/null +++ b/internal/accesslogs/storage_base.go @@ -0,0 +1,66 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package accesslogs + +import ( + "encoding/json" + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "strconv" + "time" +) + +type BaseStorage struct { + isOk bool + version int +} + +func (this *BaseStorage) SetVersion(version int) { + this.version = version +} + +func (this *BaseStorage) Version() int { + return this.version +} + +func (this *BaseStorage) IsOk() bool { + return this.isOk +} + +func (this *BaseStorage) SetOk(isOk bool) { + this.isOk = isOk +} + +// Marshal 对日志进行编码 +func (this *BaseStorage) Marshal(accessLog *pb.HTTPAccessLog) ([]byte, error) { + return json.Marshal(accessLog) +} + +// FormatVariables 格式化字符串中的变量 +func (this *BaseStorage) FormatVariables(s string) string { + now := time.Now() + return configutils.ParseVariables(s, func(varName string) (value string) { + switch varName { + case "year": + return strconv.Itoa(now.Year()) + case "month": + return fmt.Sprintf("%02d", now.Month()) + case "week": + _, week := now.ISOWeek() + return fmt.Sprintf("%02d", week) + case "day": + return fmt.Sprintf("%02d", now.Day()) + case "hour": + return fmt.Sprintf("%02d", now.Hour()) + case "minute": + return fmt.Sprintf("%02d", now.Minute()) + case "second": + return fmt.Sprintf("%02d", now.Second()) + case "date": + return fmt.Sprintf("%d%02d%02d", now.Year(), now.Month(), now.Day()) + } + + return varName + }) +} diff --git a/internal/accesslogs/storage_command.go b/internal/accesslogs/storage_command.go new file mode 100644 index 00000000..44f6f1f1 --- /dev/null +++ b/internal/accesslogs/storage_command.go @@ -0,0 +1,95 @@ +package accesslogs + +import ( + "bytes" + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "os/exec" + "sync" +) + +// CommandStorage 通过命令行存储 +type CommandStorage struct { + BaseStorage + + config *serverconfigs.AccessLogCommandStorageConfig + + writeLocker sync.Mutex +} + +func NewCommandStorage(config *serverconfigs.AccessLogCommandStorageConfig) *CommandStorage { + return &CommandStorage{config: config} +} + +func (this *CommandStorage) Config() interface{} { + return this.config +} + +// Start 启动 +func (this *CommandStorage) Start() error { + if len(this.config.Command) == 0 { + return errors.New("'command' should not be empty") + } + return nil +} + +// 写入日志 +func (this *CommandStorage) Write(accessLogs []*pb.HTTPAccessLog) error { + if len(accessLogs) == 0 { + return nil + } + + this.writeLocker.Lock() + defer this.writeLocker.Unlock() + + cmd := exec.Command(this.config.Command, this.config.Args...) + if len(this.config.Dir) > 0 { + cmd.Dir = this.config.Dir + } + + stdout := bytes.NewBuffer([]byte{}) + cmd.Stdout = stdout + + w, err := cmd.StdinPipe() + if err != nil { + return err + } + err = cmd.Start() + if err != nil { + return err + } + for _, accessLog := range accessLogs { + data, err := this.Marshal(accessLog) + if err != nil { + logs.Error(err) + continue + } + _, err = w.Write(data) + if err != nil { + logs.Error(err) + } + + _, err = w.Write([]byte("\n")) + if err != nil { + logs.Error(err) + } + } + _ = w.Close() + err = cmd.Wait() + if err != nil { + logs.Error(err) + + if stdout.Len() > 0 { + logs.Error(errors.New(string(stdout.Bytes()))) + } + } + + return nil +} + +// Close 关闭 +func (this *CommandStorage) Close() error { + return nil +} diff --git a/internal/accesslogs/storage_command_test.go b/internal/accesslogs/storage_command_test.go new file mode 100644 index 00000000..2bc6cb1f --- /dev/null +++ b/internal/accesslogs/storage_command_test.go @@ -0,0 +1,63 @@ +package accesslogs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "os" + "os/exec" + "testing" + "time" +) + +func TestCommandStorage_Write(t *testing.T) { + php, err := exec.LookPath("php") + if err != nil { // not found php, so we can not test + t.Log("php:", err) + return + } + + cwd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + + before := time.Now() + + storage := NewCommandStorage(&serverconfigs.AccessLogCommandStorageConfig{ + Command: php, + Args: []string{cwd + "/tests/command_storage.php"}, + }) + err = storage.Start() + if err != nil { + t.Fatal(err) + } + + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestMethod: "GET", + RequestPath: "/hello", + }, + { + RequestMethod: "GET", + RequestPath: "/world", + }, + { + RequestMethod: "GET", + RequestPath: "/lu", + }, + { + RequestMethod: "GET", + RequestPath: "/ping", + }, + }) + if err != nil { + t.Fatal(err) + } + + err = storage.Close() + if err != nil { + t.Fatal(err) + } + + t.Log(time.Since(before).Seconds(), "seconds") +} diff --git a/internal/accesslogs/storage_es.go b/internal/accesslogs/storage_es.go new file mode 100644 index 00000000..0d1bbc99 --- /dev/null +++ b/internal/accesslogs/storage_es.go @@ -0,0 +1,125 @@ +package accesslogs + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "io/ioutil" + "net/http" + "regexp" + "strings" + "time" +) + +// ESStorage ElasticSearch存储策略 +type ESStorage struct { + BaseStorage + + config *serverconfigs.AccessLogESStorageConfig +} + +func NewESStorage(config *serverconfigs.AccessLogESStorageConfig) *ESStorage { + return &ESStorage{config: config} +} + +func (this *ESStorage) Config() interface{} { + return this.config +} + +// Start 开启 +func (this *ESStorage) Start() error { + if len(this.config.Endpoint) == 0 { + return errors.New("'endpoint' should not be nil") + } + if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.config.Endpoint) { + this.config.Endpoint = "http://" + this.config.Endpoint + } + if len(this.config.Index) == 0 { + return errors.New("'index' should not be nil") + } + if len(this.config.MappingType) == 0 { + return errors.New("'mappingType' should not be nil") + } + return nil +} + +// 写入日志 +func (this *ESStorage) Write(accessLogs []*pb.HTTPAccessLog) error { + if len(accessLogs) == 0 { + return nil + } + + bulk := &strings.Builder{} + id := time.Now().UnixNano() + indexName := this.FormatVariables(this.config.Index) + typeName := this.FormatVariables(this.config.MappingType) + for _, accessLog := range accessLogs { + id++ + opData, err := json.Marshal(map[string]interface{}{ + "index": map[string]interface{}{ + "_index": indexName, + "_type": typeName, + "_id": fmt.Sprintf("%d", id), + }, + }) + if err != nil { + logs.Error(err) + continue + } + + data, err := this.Marshal(accessLog) + if err != nil { + logs.Error(err) + continue + } + + bulk.Write(opData) + bulk.WriteString("\n") + bulk.Write(data) + bulk.WriteString("\n") + } + + if bulk.Len() == 0 { + return nil + } + + req, err := http.NewRequest(http.MethodPost, this.config.Endpoint+"/_bulk", strings.NewReader(bulk.String())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", strings.ReplaceAll(teaconst.ProductName, " ", "-")+"/"+teaconst.Version) + if len(this.config.Username) > 0 || len(this.config.Password) > 0 { + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(this.config.Username+":"+this.config.Password))) + } + client := utils.SharedHttpClient(10 * time.Second) + defer func() { + _ = req.Body.Close() + }() + + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + bodyData, _ := ioutil.ReadAll(resp.Body) + return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData)) + } + + return nil +} + +// Close 关闭 +func (this *ESStorage) Close() error { + return nil +} diff --git a/internal/accesslogs/storage_es_test.go b/internal/accesslogs/storage_es_test.go new file mode 100644 index 00000000..7c60babf --- /dev/null +++ b/internal/accesslogs/storage_es_test.go @@ -0,0 +1,53 @@ +package accesslogs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "testing" + "time" +) + +func TestESStorage_Write(t *testing.T) { + storage := NewESStorage(&serverconfigs.AccessLogESStorageConfig{ + Endpoint: "http://127.0.0.1:9200", + Index: "logs", + MappingType: "accessLogs", + Username: "hello", + Password: "world", + }) + err := storage.Start() + if err != nil { + t.Fatal(err) + } + + { + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestMethod: "POST", + RequestPath: "/1", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + TimeISO8601: "2018-07-23T22:23:35+08:00", + Header: map[string]*pb.Strings{ + "Content-Type": {Values: []string{"text/html"}}, + }, + }, + { + RequestMethod: "GET", + RequestPath: "/2", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + TimeISO8601: "2018-07-23T22:23:35+08:00", + Header: map[string]*pb.Strings{ + "Content-Type": {Values: []string{"text/css"}}, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + } + + err = storage.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/accesslogs/storage_file.go b/internal/accesslogs/storage_file.go new file mode 100644 index 00000000..e3f3d90b --- /dev/null +++ b/internal/accesslogs/storage_file.go @@ -0,0 +1,127 @@ +package accesslogs + +import ( + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "os" + "path/filepath" + "sync" +) + +// FileStorage 文件存储策略 +type FileStorage struct { + BaseStorage + + config *serverconfigs.AccessLogFileStorageConfig + + writeLocker sync.Mutex + + files map[string]*os.File // path => *File + filesLocker sync.Mutex +} + +func NewFileStorage(config *serverconfigs.AccessLogFileStorageConfig) *FileStorage { + return &FileStorage{ + config: config, + } +} + +func (this *FileStorage) Config() interface{} { + return this.config +} + +// Start 开启 +func (this *FileStorage) Start() error { + if len(this.config.Path) == 0 { + return errors.New("'path' should not be empty") + } + + this.files = map[string]*os.File{} + + return nil +} + +// Write 写入日志 +func (this *FileStorage) Write(accessLogs []*pb.HTTPAccessLog) error { + if len(accessLogs) == 0 { + return nil + } + + fp := this.fp() + if fp == nil { + return errors.New("file pointer should not be nil") + } + this.writeLocker.Lock() + defer this.writeLocker.Unlock() + + for _, accessLog := range accessLogs { + data, err := this.Marshal(accessLog) + if err != nil { + logs.Error(err) + continue + } + _, err = fp.Write(data) + if err != nil { + _ = this.Close() + break + } + _, _ = fp.WriteString("\n") + } + return nil +} + +// Close 关闭 +func (this *FileStorage) Close() error { + this.filesLocker.Lock() + defer this.filesLocker.Unlock() + + var resultErr error + for _, f := range this.files { + err := f.Close() + if err != nil { + resultErr = err + } + } + return resultErr +} + +func (this *FileStorage) fp() *os.File { + path := this.FormatVariables(this.config.Path) + + this.filesLocker.Lock() + defer this.filesLocker.Unlock() + fp, ok := this.files[path] + if ok { + return fp + } + + // 关闭其他的文件 + for _, f := range this.files { + _ = f.Close() + } + + // 是否创建文件目录 + if this.config.AutoCreate { + dir := filepath.Dir(path) + _, err := os.Stat(dir) + if os.IsNotExist(err) { + err = os.MkdirAll(dir, 0777) + if err != nil { + logs.Error(err) + return nil + } + } + } + + // 打开新文件 + fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + logs.Error(err) + return nil + } + this.files[path] = fp + + return fp +} diff --git a/internal/accesslogs/storage_file_test.go b/internal/accesslogs/storage_file_test.go new file mode 100644 index 00000000..d26a4d7d --- /dev/null +++ b/internal/accesslogs/storage_file_test.go @@ -0,0 +1,70 @@ +package accesslogs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/Tea" + "testing" + "time" +) + +func TestFileStorage_Write(t *testing.T) { + storage := NewFileStorage(&serverconfigs.AccessLogFileStorageConfig{ + Path: Tea.Root + "/logs/access-${date}.log", + }) + err := storage.Start() + if err != nil { + t.Fatal(err) + } + + { + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestPath: "/hello", + }, + { + RequestPath: "/world", + }, + }) + if err != nil { + t.Fatal(err) + } + } + + { + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestPath: "/1", + }, + { + RequestPath: "/2", + }, + }) + if err != nil { + t.Fatal(err) + } + } + + { + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestMethod: "POST", + RequestPath: "/1", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + }, + { + RequestMethod: "GET", + RequestPath: "/2", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + }, + }) + if err != nil { + t.Fatal(err) + } + } + + err = storage.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/accesslogs/storage_interface.go b/internal/accesslogs/storage_interface.go new file mode 100644 index 00000000..810058db --- /dev/null +++ b/internal/accesslogs/storage_interface.go @@ -0,0 +1,30 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package accesslogs + +import "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + +// StorageInterface 日志存储接口 +type StorageInterface interface { + // Version 获取版本 + Version() int + + // SetVersion 设置版本 + SetVersion(version int) + + IsOk() bool + + SetOk(ok bool) + + // Config 获取配置 + Config() interface{} + + // Start 开启 + Start() error + + // Write 写入日志 + Write(accessLogs []*pb.HTTPAccessLog) error + + // Close 关闭 + Close() error +} diff --git a/internal/accesslogs/storage_manager.go b/internal/accesslogs/storage_manager.go new file mode 100644 index 00000000..b5ee49cb --- /dev/null +++ b/internal/accesslogs/storage_manager.go @@ -0,0 +1,199 @@ +package accesslogs + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/types" + "sync" + "time" +) + +var SharedStorageManager = NewStorageManager() + +type StorageManager struct { + storageMap map[int64]StorageInterface // policyId => Storage + + locker sync.Mutex +} + +func NewStorageManager() *StorageManager { + return &StorageManager{ + storageMap: map[int64]StorageInterface{}, + } +} + +func (this *StorageManager) Start() { + var ticker = time.NewTicker(1 * time.Minute) + if Tea.IsTesting() { + ticker = time.NewTicker(5 * time.Second) + } + + // 启动时执行一次 + var err = this.Loop() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error()) + } + + // 循环执行 + for range ticker.C { + err := this.Loop() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error()) + } + } +} + +// 写入日志 +func (this *StorageManager) Write(policyId int64, accessLogs []*pb.HTTPAccessLog) error { + this.locker.Lock() + storage, ok := this.storageMap[policyId] + this.locker.Unlock() + + if !ok { + return nil + } + + if !storage.IsOk() { + return nil + } + + return storage.Write(accessLogs) +} + +// Loop 更新 +func (this *StorageManager) Loop() error { + policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(nil) + if err != nil { + return err + } + var policyIds = []int64{} + for _, policy := range policies { + if policy.IsOn == 1 { + policyIds = append(policyIds, int64(policy.Id)) + } + } + + this.locker.Lock() + defer this.locker.Unlock() + + // 关闭不用的 + for policyId, storage := range this.storageMap { + if !lists.ContainsInt64(policyIds, policyId) { + err := storage.Close() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error()) + } + delete(this.storageMap, policyId) + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'") + } + } + + for _, policy := range policies { + var policyId = int64(policy.Id) + storage, ok := this.storageMap[policyId] + if ok { + // 检查配置是否有变更 + if types.Int(policy.Version) != storage.Version() { + err = storage.Close() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error()) + + // 继续往下执行 + } + + if len(policy.Options) > 0 { + err = json.Unmarshal([]byte(policy.Options), storage.Config()) + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error()) + storage.SetOk(false) + continue + } + } + + storage.SetVersion(types.Int(policy.Version)) + err := storage.Start() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error()) + continue + } + storage.SetOk(true) + remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'") + } + } else { + storage, err := this.createStorage(policy.Type, []byte(policy.Options)) + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error()) + continue + } + storage.SetVersion(types.Int(policy.Version)) + this.storageMap[policyId] = storage + err = storage.Start() + if err != nil { + remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error()) + continue + } + storage.SetOk(true) + remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'") + } + } + + return nil +} + +func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) { + switch storageType { + case serverconfigs.AccessLogStorageTypeFile: + var config = &serverconfigs.AccessLogFileStorageConfig{} + if len(optionsJSON) > 0 { + err := json.Unmarshal(optionsJSON, config) + if err != nil { + return nil, err + } + } + return NewFileStorage(config), nil + case serverconfigs.AccessLogStorageTypeES: + var config = &serverconfigs.AccessLogESStorageConfig{} + if len(optionsJSON) > 0 { + err := json.Unmarshal(optionsJSON, config) + if err != nil { + return nil, err + } + } + return NewESStorage(config), nil + case serverconfigs.AccessLogStorageTypeTCP: + var config = &serverconfigs.AccessLogTCPStorageConfig{} + if len(optionsJSON) > 0 { + err := json.Unmarshal(optionsJSON, config) + if err != nil { + return nil, err + } + } + return NewTCPStorage(config), nil + case serverconfigs.AccessLogStorageTypeSyslog: + var config = &serverconfigs.AccessLogSyslogStorageConfig{} + if len(optionsJSON) > 0 { + err := json.Unmarshal(optionsJSON, config) + if err != nil { + return nil, err + } + } + return NewSyslogStorage(config), nil + case serverconfigs.AccessLogStorageTypeCommand: + var config = &serverconfigs.AccessLogCommandStorageConfig{} + if len(optionsJSON) > 0 { + err := json.Unmarshal(optionsJSON, config) + if err != nil { + return nil, err + } + } + return NewCommandStorage(config), nil + } + + return nil, errors.New("invalid policy type '" + storageType + "'") +} diff --git a/internal/accesslogs/storage_manager_test.go b/internal/accesslogs/storage_manager_test.go new file mode 100644 index 00000000..8b157943 --- /dev/null +++ b/internal/accesslogs/storage_manager_test.go @@ -0,0 +1,17 @@ +package accesslogs + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestStorageManager_Loop(t *testing.T) { + dbs.NotifyReady() + + var storage = NewStorageManager() + err := storage.Loop() + if err != nil { + t.Fatal(err) + } + t.Log(storage.storageMap) +} diff --git a/internal/accesslogs/storage_syslog.go b/internal/accesslogs/storage_syslog.go new file mode 100644 index 00000000..18b9591f --- /dev/null +++ b/internal/accesslogs/storage_syslog.go @@ -0,0 +1,137 @@ +package accesslogs + +import ( + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "os/exec" + "runtime" + "strconv" +) + +type SyslogStorageProtocol = string + +const ( + SyslogStorageProtocolTCP SyslogStorageProtocol = "tcp" + SyslogStorageProtocolUDP SyslogStorageProtocol = "udp" + SyslogStorageProtocolNone SyslogStorageProtocol = "none" + SyslogStorageProtocolSocket SyslogStorageProtocol = "socket" +) + +type SyslogStoragePriority = int + +// SyslogStorage syslog存储策略 +type SyslogStorage struct { + BaseStorage + + config *serverconfigs.AccessLogSyslogStorageConfig + + exe string +} + +func NewSyslogStorage(config *serverconfigs.AccessLogSyslogStorageConfig) *SyslogStorage { + return &SyslogStorage{config: config} +} + +func (this *SyslogStorage) Config() interface{} { + return this.config +} + +// Start 开启 +func (this *SyslogStorage) Start() error { + if runtime.GOOS != "linux" { + return errors.New("'syslog' storage only works on linux") + } + + exe, err := exec.LookPath("logger") + if err != nil { + return err + } + + this.exe = exe + + return nil +} + +// 写入日志 +func (this *SyslogStorage) Write(accessLogs []*pb.HTTPAccessLog) error { + if len(accessLogs) == 0 { + return nil + } + + args := []string{} + if len(this.config.Tag) > 0 { + args = append(args, "-t", this.config.Tag) + } + + if this.config.Priority >= 0 { + args = append(args, "-p", strconv.Itoa(this.config.Priority)) + } + + switch this.config.Protocol { + case SyslogStorageProtocolTCP: + args = append(args, "-T") + if len(this.config.ServerAddr) > 0 { + args = append(args, "-n", this.config.ServerAddr) + } + if this.config.ServerPort > 0 { + args = append(args, "-P", strconv.Itoa(this.config.ServerPort)) + } + case SyslogStorageProtocolUDP: + args = append(args, "-d") + if len(this.config.ServerAddr) > 0 { + args = append(args, "-n", this.config.ServerAddr) + } + if this.config.ServerPort > 0 { + args = append(args, "-P", strconv.Itoa(this.config.ServerPort)) + } + case SyslogStorageProtocolSocket: + args = append(args, "-u") + args = append(args, this.config.Socket) + case SyslogStorageProtocolNone: + // do nothing + } + + args = append(args, "-S", "10240") + + cmd := exec.Command(this.exe, args...) + w, err := cmd.StdinPipe() + if err != nil { + return err + } + err = cmd.Start() + if err != nil { + return err + } + + for _, accessLog := range accessLogs { + data, err := this.Marshal(accessLog) + if err != nil { + logs.Error(err) + continue + } + _, err = w.Write(data) + if err != nil { + logs.Error(err) + } + + _, err = w.Write([]byte("\n")) + if err != nil { + logs.Error(err) + } + } + + _ = w.Close() + err = cmd.Wait() + if err != nil { + return err + } + + return nil +} + +// Close 关闭 +func (this *SyslogStorage) Close() error { + return nil +} diff --git a/internal/accesslogs/storage_tcp.go b/internal/accesslogs/storage_tcp.go new file mode 100644 index 00000000..77a85eed --- /dev/null +++ b/internal/accesslogs/storage_tcp.go @@ -0,0 +1,111 @@ +package accesslogs + +import ( + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "net" + "sync" +) + +// TCPStorage TCP存储策略 +type TCPStorage struct { + BaseStorage + + config *serverconfigs.AccessLogTCPStorageConfig + + writeLocker sync.Mutex + + connLocker sync.Mutex + conn net.Conn +} + +func NewTCPStorage(config *serverconfigs.AccessLogTCPStorageConfig) *TCPStorage { + return &TCPStorage{config: config} +} + +func (this *TCPStorage) Config() interface{} { + return this.config +} + +// Start 开启 +func (this *TCPStorage) Start() error { + if len(this.config.Network) == 0 { + return errors.New("'network' should not be empty") + } + if len(this.config.Addr) == 0 { + return errors.New("'addr' should not be empty") + } + return nil +} + +// 写入日志 +func (this *TCPStorage) Write(accessLogs []*pb.HTTPAccessLog) error { + if len(accessLogs) == 0 { + return nil + } + + err := this.connect() + if err != nil { + return err + } + + conn := this.conn + if conn == nil { + return errors.New("connection should not be nil") + } + + this.writeLocker.Lock() + defer this.writeLocker.Unlock() + + for _, accessLog := range accessLogs { + data, err := this.Marshal(accessLog) + if err != nil { + logs.Error(err) + continue + } + _, err = conn.Write(data) + if err != nil { + _ = this.Close() + break + } + _, err = conn.Write([]byte("\n")) + if err != nil { + _ = this.Close() + break + } + } + + return nil +} + +// Close 关闭 +func (this *TCPStorage) Close() error { + this.connLocker.Lock() + defer this.connLocker.Unlock() + + if this.conn != nil { + err := this.conn.Close() + this.conn = nil + return err + } + return nil +} + +func (this *TCPStorage) connect() error { + this.connLocker.Lock() + defer this.connLocker.Unlock() + + if this.conn != nil { + return nil + } + + conn, err := net.Dial(this.config.Network, this.config.Addr) + if err != nil { + return err + } + this.conn = conn + + return nil +} diff --git a/internal/accesslogs/storage_tcp_test.go b/internal/accesslogs/storage_tcp_test.go new file mode 100644 index 00000000..ea1a630e --- /dev/null +++ b/internal/accesslogs/storage_tcp_test.go @@ -0,0 +1,72 @@ +package accesslogs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "net" + "testing" + "time" +) + +func TestTCPStorage_Write(t *testing.T) { + go func() { + server, err := net.Listen("tcp", "127.0.0.1:9981") + if err != nil { + t.Error(err) + return + } + for { + conn, err := server.Accept() + if err != nil { + break + } + + buf := make([]byte, 1024) + for { + n, err := conn.Read(buf) + if n > 0 { + t.Log(string(buf[:n])) + } + if err != nil { + break + } + } + break + } + _ = server.Close() + }() + + storage := NewTCPStorage(&serverconfigs.AccessLogTCPStorageConfig{ + Network: "tcp", + Addr: "127.0.0.1:9981", + }) + err := storage.Start() + if err != nil { + t.Fatal(err) + } + + { + err = storage.Write([]*pb.HTTPAccessLog{ + { + RequestMethod: "POST", + RequestPath: "/1", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + }, + { + RequestMethod: "GET", + RequestPath: "/2", + TimeLocal: time.Now().Format("2/Jan/2006:15:04:05 -0700"), + }, + }) + if err != nil { + t.Fatal(err) + } + } + + time.Sleep(2 * time.Second) + + err = storage.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/accesslogs/tests/command_storage.php b/internal/accesslogs/tests/command_storage.php new file mode 100644 index 00000000..15d2bfc4 --- /dev/null +++ b/internal/accesslogs/tests/command_storage.php @@ -0,0 +1,24 @@ + \ No newline at end of file diff --git a/internal/db/models/http_access_log_policy_dao.go b/internal/db/models/http_access_log_policy_dao.go index f056c79e..acc5c176 100644 --- a/internal/db/models/http_access_log_policy_dao.go +++ b/internal/db/models/http_access_log_policy_dao.go @@ -1,12 +1,13 @@ package models import ( + "bytes" "encoding/json" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" + "github.com/TeaOSLab/EdgeAPI/internal/errors" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" ) const ( @@ -35,12 +36,12 @@ func init() { }) } -// 初始化 +// Init 初始化 func (this *HTTPAccessLogPolicyDAO) Init() { _ = this.DAOObject.Init() } -// 启用条目 +// EnableHTTPAccessLogPolicy 启用条目 func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -49,7 +50,7 @@ func (this *HTTPAccessLogPolicyDAO) EnableHTTPAccessLogPolicy(tx *dbs.Tx, id int return err } -// 禁用条目 +// DisableHTTPAccessLogPolicy 禁用条目 func (this *HTTPAccessLogPolicyDAO) DisableHTTPAccessLogPolicy(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -58,7 +59,7 @@ func (this *HTTPAccessLogPolicyDAO) DisableHTTPAccessLogPolicy(tx *dbs.Tx, id in return err } -// 查找启用中的条目 +// FindEnabledHTTPAccessLogPolicy 查找启用中的条目 func (this *HTTPAccessLogPolicyDAO) FindEnabledHTTPAccessLogPolicy(tx *dbs.Tx, id int64) (*HTTPAccessLogPolicy, error) { result, err := this.Query(tx). Pk(id). @@ -70,7 +71,7 @@ func (this *HTTPAccessLogPolicyDAO) FindEnabledHTTPAccessLogPolicy(tx *dbs.Tx, i return result.(*HTTPAccessLogPolicy), err } -// 根据主键查找名称 +// FindHTTPAccessLogPolicyName 根据主键查找名称 func (this *HTTPAccessLogPolicyDAO) FindHTTPAccessLogPolicyName(tx *dbs.Tx, id int64) (string, error) { return this.Query(tx). Pk(id). @@ -78,51 +79,116 @@ func (this *HTTPAccessLogPolicyDAO) FindHTTPAccessLogPolicyName(tx *dbs.Tx, id i FindStringCol("") } -// 查找所有可用策略信息 -func (this *HTTPAccessLogPolicyDAO) FindAllEnabledAccessLogPolicies(tx *dbs.Tx) (result []*HTTPAccessLogPolicy, err error) { +// CountAllEnabledPolicies 计算策略数量 +func (this *HTTPAccessLogPolicyDAO) CountAllEnabledPolicies(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + State(HTTPAccessLogPolicyStateEnabled). + Count() +} + +// ListEnabledPolicies 查找所有可用策略信息 +func (this *HTTPAccessLogPolicyDAO) ListEnabledPolicies(tx *dbs.Tx, offset int64, size int64) (result []*HTTPAccessLogPolicy, err error) { _, err = this.Query(tx). State(HTTPAccessLogPolicyStateEnabled). DescPk(). + Offset(offset). + Limit(size). Slice(&result). FindAll() return } -// 组合配置 -func (this *HTTPAccessLogPolicyDAO) ComposeAccessLogPolicyConfig(tx *dbs.Tx, policyId int64) (*serverconfigs.HTTPAccessLogStoragePolicy, error) { - policy, err := this.FindEnabledHTTPAccessLogPolicy(tx, policyId) - if err != nil { - return nil, err - } - if policy == nil { - return nil, nil - } - - config := &serverconfigs.HTTPAccessLogStoragePolicy{} - config.Id = int64(policy.Id) - config.IsOn = policy.IsOn == 1 - config.Name = policy.Name - config.Type = policy.Type - - // 选项 - if IsNotNull(policy.Options) { - m := map[string]interface{}{} - err = json.Unmarshal([]byte(policy.Options), &m) - if err != nil { - return nil, err - } - config.Options = m - } - - // 条件 - if IsNotNull(policy.Conds) { - condsConfig := &shared.HTTPRequestCondsConfig{} - err = json.Unmarshal([]byte(policy.Conds), condsConfig) - if err != nil { - return nil, err - } - config.Conds = condsConfig - } - - return config, nil +// FindAllEnabledAndOnPolicies 获取所有的策略信息 +func (this *HTTPAccessLogPolicyDAO) FindAllEnabledAndOnPolicies(tx *dbs.Tx) (result []*HTTPAccessLogPolicy, err error) { + _, err = this.Query(tx). + State(HTTPAccessLogPolicyStateEnabled). + Attr("isOn", true). + Slice(&result). + FindAll() + return +} + +// CreatePolicy 创建策略 +func (this *HTTPAccessLogPolicyDAO) CreatePolicy(tx *dbs.Tx, name string, policyType string, optionsJSON []byte, condsJSON []byte, isPublic bool) (policyId int64, err error) { + var op = NewHTTPAccessLogPolicyOperator() + op.Name = name + op.Type = policyType + if len(optionsJSON) > 0 { + op.Options = optionsJSON + } + if len(condsJSON) > 0 { + op.Conds = condsJSON + } + op.IsPublic = isPublic + op.IsOn = true + op.State = HTTPAccessLogPolicyStateEnabled + return this.SaveInt64(tx, op) +} + +// UpdatePolicy 修改策略 +func (this *HTTPAccessLogPolicyDAO) UpdatePolicy(tx *dbs.Tx, policyId int64, name string, optionsJSON []byte, condsJSON []byte, isPublic bool, isOn bool) error { + if policyId <= 0 { + return errors.New("invalid policyId") + } + + oldOne, err := this.Query(tx). + Pk(policyId). + Find() + if err != nil { + return err + } + if oldOne == nil { + return nil + } + var oldPolicy = oldOne.(*HTTPAccessLogPolicy) + + var op = NewHTTPAccessLogPolicyOperator() + op.Id = policyId + op.Name = name + if len(optionsJSON) > 0 { + op.Options = optionsJSON + } else { + op.Options = "{}" + } + if len(condsJSON) > 0 { + op.Conds = condsJSON + } else { + op.Conds = "{}" + } + + // 版本号 + if len(oldPolicy.Options) == 0 || len(optionsJSON) == 0 { + op.Version = dbs.SQL("version+1") + } else { + var m1 = maps.Map{} + _ = json.Unmarshal([]byte(oldPolicy.Options), &m1) + + var m2 = maps.Map{} + _ = json.Unmarshal(optionsJSON, &m2) + + if bytes.Compare(m1.AsJSON(), m2.AsJSON()) != 0 { + op.Version = dbs.SQL("version+1") + } + } + + op.IsPublic = isPublic + op.IsOn = isOn + return this.Save(tx, op) +} + +// CancelAllPublicPolicies 取消别的公用的策略 +func (this *HTTPAccessLogPolicyDAO) CancelAllPublicPolicies(tx *dbs.Tx) error { + return this.Query(tx). + State(HTTPAccessLogPolicyStateEnabled). + Set("isPublic", 0). + UpdateQuickly() +} + +// FindCurrentPublicPolicyId 取得当前的公用策略 +func (this *HTTPAccessLogPolicyDAO) FindCurrentPublicPolicyId(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + State(HTTPAccessLogPolicyStateEnabled). + Attr("isPublic", 1). + ResultPk(). + FindInt64Col(0) } diff --git a/internal/db/models/http_access_log_policy_model.go b/internal/db/models/http_access_log_policy_model.go index b4fef975..0de6361c 100644 --- a/internal/db/models/http_access_log_policy_model.go +++ b/internal/db/models/http_access_log_policy_model.go @@ -1,6 +1,6 @@ package models -// 访问日志策略 +// HTTPAccessLogPolicy 访问日志策略 type HTTPAccessLogPolicy struct { Id uint32 `field:"id"` // ID TemplateId uint32 `field:"templateId"` // 模版ID @@ -13,6 +13,8 @@ type HTTPAccessLogPolicy struct { Type string `field:"type"` // 存储类型 Options string `field:"options"` // 存储选项 Conds string `field:"conds"` // 请求条件 + IsPublic uint8 `field:"isPublic"` // 是否为公用 + Version uint32 `field:"version"` // 版本号 } type HTTPAccessLogPolicyOperator struct { @@ -27,6 +29,8 @@ type HTTPAccessLogPolicyOperator struct { Type interface{} // 存储类型 Options interface{} // 存储选项 Conds interface{} // 请求条件 + IsPublic interface{} // 是否为公用 + Version interface{} // 版本号 } func NewHTTPAccessLogPolicyOperator() *HTTPAccessLogPolicyOperator { diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index e018b12f..aa20b9ed 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/configs" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" @@ -110,6 +111,9 @@ func (this *APINode) Start() { // 状态变更计时器 go NewNodeStatusExecutor().Listen() + // 访问日志存储管理器 + go accesslogs.SharedStorageManager.Start() + // 监听RPC服务 remotelogs.Println("API_NODE", "starting RPC server ...") diff --git a/internal/rpc/services/service_http_access_log.go b/internal/rpc/services/service_http_access_log.go index ced371e7..aae97cc4 100644 --- a/internal/rpc/services/service_http_access_log.go +++ b/internal/rpc/services/service_http_access_log.go @@ -2,6 +2,7 @@ package services import ( "context" + "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" @@ -31,6 +32,18 @@ func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req return nil, err } + // 发送到访问日志策略 + policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx) + if err != nil { + return nil, err + } + if policyId > 0 { + err = accesslogs.SharedStorageManager.Write(policyId, req.HttpAccessLogs) + if err != nil { + return nil, err + } + } + return &pb.CreateHTTPAccessLogsResponse{}, nil } diff --git a/internal/rpc/services/service_http_access_log_policy.go b/internal/rpc/services/service_http_access_log_policy.go index 4e2a2970..0073f2c1 100644 --- a/internal/rpc/services/service_http_access_log_policy.go +++ b/internal/rpc/services/service_http_access_log_policy.go @@ -2,6 +2,7 @@ package services import ( "context" + "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) @@ -10,32 +11,149 @@ type HTTPAccessLogPolicyService struct { BaseService } -// FindAllEnabledHTTPAccessLogPolicies 获取所有可用策略 -func (this *HTTPAccessLogPolicyService) FindAllEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.FindAllEnabledHTTPAccessLogPoliciesRequest) (*pb.FindAllEnabledHTTPAccessLogPoliciesResponse, error) { - // 校验请求 +// CountAllEnabledHTTPAccessLogPolicies 计算访问日志策略数量 +func (this *HTTPAccessLogPolicyService) CountAllEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.CountAllEnabledHTTPAccessLogPoliciesRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { return nil, err } - tx := this.NullTx() + var tx = this.NullTx() + count, err := models.SharedHTTPAccessLogPolicyDAO.CountAllEnabledPolicies(tx) + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} - policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAccessLogPolicies(tx) +// ListEnabledHTTPAccessLogPolicies 列出单页访问日志策略 +func (this *HTTPAccessLogPolicyService) ListEnabledHTTPAccessLogPolicies(ctx context.Context, req *pb.ListEnabledHTTPAccessLogPoliciesRequest) (*pb.ListEnabledHTTPAccessLogPoliciesResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) if err != nil { return nil, err } - result := []*pb.HTTPAccessLogPolicy{} + var tx = this.NullTx() + policies, err := models.SharedHTTPAccessLogPolicyDAO.ListEnabledPolicies(tx, req.Offset, req.Size) + if err != nil { + return nil, err + } + var pbPolicies = []*pb.HTTPAccessLogPolicy{} for _, policy := range policies { - result = append(result, &pb.HTTPAccessLogPolicy{ + pbPolicies = append(pbPolicies, &pb.HTTPAccessLogPolicy{ Id: int64(policy.Id), Name: policy.Name, IsOn: policy.IsOn == 1, - Type: policy.Name, + Type: policy.Type, OptionsJSON: []byte(policy.Options), CondsJSON: []byte(policy.Conds), + IsPublic: policy.IsPublic == 1, }) } - - return &pb.FindAllEnabledHTTPAccessLogPoliciesResponse{AccessLogPolicies: result}, nil + return &pb.ListEnabledHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil +} + +// CreateHTTPAccessLogPolicy 创建访问日志策略 +func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Context, req *pb.CreateHTTPAccessLogPolicyRequest) (*pb.CreateHTTPAccessLogPolicyResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + + // 取消别的Public + if req.IsPublic { + err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx) + if err != nil { + return nil, err + } + } + + // 创建 + policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, req.Type, req.OptionsJSON, req.CondsJSON, req.IsPublic) + if err != nil { + return nil, err + } + return &pb.CreateHTTPAccessLogPolicyResponse{HttpAccessLogPolicyId: policyId}, nil +} + +// UpdateHTTPAccessLogPolicy 修改访问日志策略 +func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Context, req *pb.UpdateHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + + // 取消别的Public + if req.IsPublic { + err = models.SharedHTTPAccessLogPolicyDAO.CancelAllPublicPolicies(tx) + if err != nil { + return nil, err + } + } + + // 保存修改 + err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.IsOn) + if err != nil { + return nil, err + } + return this.Success() +} + +// FindEnabledHTTPAccessLogPolicy 查找单个访问日志策略 +func (this *HTTPAccessLogPolicyService) FindEnabledHTTPAccessLogPolicy(ctx context.Context, req *pb.FindEnabledHTTPAccessLogPolicyRequest) (*pb.FindEnabledHTTPAccessLogPolicyResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId) + if err != nil { + return nil, err + } + if policy == nil { + return &pb.FindEnabledHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: nil}, nil + } + return &pb.FindEnabledHTTPAccessLogPolicyResponse{HttpAccessLogPolicy: &pb.HTTPAccessLogPolicy{ + Id: int64(policy.Id), + Name: policy.Name, + IsOn: policy.IsOn == 1, + Type: policy.Type, + OptionsJSON: []byte(policy.Options), + CondsJSON: []byte(policy.Conds), + IsPublic: policy.IsPublic == 1, + }}, nil +} + +// DeleteHTTPAccessLogPolicy 删除访问日志策略 +func (this *HTTPAccessLogPolicyService) DeleteHTTPAccessLogPolicy(ctx context.Context, req *pb.DeleteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedHTTPAccessLogPolicyDAO.DisableHTTPAccessLogPolicy(tx, req.HttpAccessLogPolicyId) + if err != nil { + return nil, err + } + return this.Success() +} + +// WriteHTTPAccessLogPolicy 测试写入某个访问日志策略 +func (this *HTTPAccessLogPolicyService) WriteHTTPAccessLogPolicy(ctx context.Context, req *pb.WriteHTTPAccessLogPolicyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + err = accesslogs.SharedStorageManager.Write(req.HttpAccessLogPolicyId, []*pb.HTTPAccessLog{req.HttpAccessLog}) + if err != nil { + return nil, err + } + return this.Success() }