实现请求日志写入

This commit is contained in:
刘祥超
2020-10-10 11:49:21 +08:00
parent 8a5fa81c5a
commit 8fce802185
13 changed files with 479 additions and 2 deletions

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0
github.com/go-yaml/yaml v2.1.0+incompatible github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.4.2 github.com/golang/protobuf v1.4.2
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a
github.com/pkg/sftp v1.12.0 github.com/pkg/sftp v1.12.0
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
google.golang.org/grpc v1.32.0 google.golang.org/grpc v1.32.0

10
go.sum
View File

@@ -54,6 +54,16 @@ github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e h1:/xn7wUvlwaoA5IkdBUc
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 h1:7OZC/Qy7Z/hK9vG6YQOwHNOUPunSImYYJMiIfvuDQZ0= github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6 h1:7OZC/Qy7Z/hK9vG6YQOwHNOUPunSImYYJMiIfvuDQZ0=
github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20200924024009-d088df3778a6/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201009105423-b2d996734307 h1:Vzrn/Q1cvSPVaieSq1PKJ234GBNOQjaCnU/VdiSfNVs=
github.com/iwind/TeaGo v0.0.0-20201009105423-b2d996734307/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201009112016-344e63215665 h1:qqsdkRWRd8/1uTbbdTx3l9CFkUdUmjhn3tJc/Dc7uEE=
github.com/iwind/TeaGo v0.0.0-20201009112016-344e63215665/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201009115948-1a609db84e26 h1:IvpeMnLs3RWyPsSjFBhg1O6DF+w6G9o1/wPBqhmGE8w=
github.com/iwind/TeaGo v0.0.0-20201009115948-1a609db84e26/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201009121920-5ab994526a80 h1:GFObMYO7mqeE+2n/4AACqCLlPg+GNNZCAb+pPIF70NE=
github.com/iwind/TeaGo v0.0.0-20201009121920-5ab994526a80/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a h1:sO6uDbQOEe6/tIB3o31vn6eD/JmkKGErKgcOA/Cpb+Q=
github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=

View File

@@ -124,3 +124,14 @@ func (this *DBNodeDAO) UpdateNode(nodeId int64, isOn bool, name string, descript
_, err := this.Save(op) _, err := this.Save(op)
return err return err
} }
// 查找所有可用的数据库节点
func (this *DBNodeDAO) FindAllEnabledAndOnDBNodes() (result []*DBNode, err error) {
_, err = this.Query().
State(DBNodeStateEnabled).
Attr("isOn", true).
Slice(&result).
DescPk().
FindAll()
return
}

View File

@@ -0,0 +1,227 @@
package models
import (
"fmt"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
timeutil "github.com/iwind/TeaGo/utils/time"
"hash/crc32"
"strconv"
"strings"
"sync"
"time"
)
var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB
var accessLogDAOMapping = map[int64]*HTTPAccessLogDAO{} // dbNodeId => DAO
var accessLogLocker = &sync.RWMutex{}
var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
func init() {
initializer := NewDBNodeInitializer()
go initializer.Start()
}
// 获取获取DAO
func randomAccessLogDAO() (dao *HTTPAccessLogDAO) {
accessLogLocker.RLock()
if len(accessLogDAOMapping) == 0 {
dao = nil
} else {
for _, d := range accessLogDAOMapping {
dao = d
break
}
}
accessLogLocker.RUnlock()
return
}
// 根据日期获取表名
func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
config, err := db.Config()
if err != nil {
return "", err
}
tableName := "edgeHTTPAccessLogs_" + day
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
if !force {
accessLogLocker.RLock()
_, ok := accessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, nil
}
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, err
}
if lists.ContainsString(tableNames, tableName) {
accessLogLocker.Lock()
accessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
// 创建表格
_, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `day` varchar(8) DEFAULT NULL COMMENT '日期Ymd',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;")
if err != nil {
return tableName, err
}
accessLogLocker.Lock()
accessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
// 初始化数据库连接
type DBNodeInitializer struct {
}
func NewDBNodeInitializer() *DBNodeInitializer {
return &DBNodeInitializer{}
}
// 启动
func (this *DBNodeInitializer) Start() {
// 初始运行
err := this.loop()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
}
// 定时运行
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
err := this.loop()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
}
}
}
// 单次运行
func (this *DBNodeInitializer) loop() error {
dbNodes, err := SharedDBNodeDAO.FindAllEnabledAndOnDBNodes()
if err != nil {
return err
}
nodeIds := []int64{}
for _, node := range dbNodes {
nodeIds = append(nodeIds, int64(node.Id))
}
// 关掉老的
accessLogLocker.Lock()
closingDbs := []*dbs.DB{}
for nodeId, db := range accessLogDBMapping {
if !this.containsInt64(nodeIds, nodeId) {
closingDbs = append(closingDbs, db)
delete(accessLogDBMapping, nodeId)
delete(accessLogDAOMapping, nodeId)
logs.Println("[DB_NODE]close db node '" + strconv.FormatInt(nodeId, 10) + "'")
}
}
accessLogLocker.Unlock()
for _, db := range closingDbs {
_ = db.Close()
}
// 启动新的
for _, node := range dbNodes {
nodeId := int64(node.Id)
accessLogLocker.Lock()
db, ok := accessLogDBMapping[nodeId]
accessLogLocker.Unlock()
dsn := node.Username + ":" + node.Password + "@tcp(" + node.Host + ":" + fmt.Sprintf("%d", node.Port) + ")/" + node.Database + "?charset=utf8mb4&timeout=10s"
if ok {
// 检查配置是否有变化
oldConfig, err := db.Config()
if err != nil {
logs.Println("[DB_NODE]read database old config failed: " + err.Error())
continue
}
// 如果有变化则关闭
if oldConfig.Dsn != dsn {
_ = db.Close()
db = nil
}
}
if db == nil {
config := &dbs.DBConfig{
Driver: "mysql",
Dsn: dsn,
Prefix: "edge",
}
db, err := dbs.NewInstanceFromConfig(config)
if err != nil {
logs.Println("[DB_NODE]initialize database config failed: " + err.Error())
continue
}
// 检查表是否存在
tableName, err := findAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
// 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(NodeRoleDatabase, nodeId, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error())
}
continue
} else {
err = nil
}
}
daoObject := dbs.DAOObject{
Instance: db,
DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")",
Table: tableName,
PkName: "id",
Model: new(HTTPAccessLog),
}
err = daoObject.Init()
if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
continue
}
accessLogLocker.Lock()
accessLogDBMapping[nodeId] = db
dao := &HTTPAccessLogDAO{
DAOObject: daoObject,
}
accessLogDAOMapping[nodeId] = dao
accessLogLocker.Unlock()
}
}
return nil
}
// 判断是否包含某数字
func (this *DBNodeInitializer) containsInt64(values []int64, value int64) bool {
for _, v := range values {
if v == value {
return true
}
}
return false
}

View File

@@ -0,0 +1,45 @@
package models
import (
"runtime"
"testing"
"time"
)
func TestDBNodeInitializer_loop(t *testing.T) {
initializer := NewDBNodeInitializer()
err := initializer.loop()
if err != nil {
t.Fatal(err)
}
t.Log(len(accessLogDBMapping), len(accessLogDAOMapping))
}
func TestFindAccessLogTable(t *testing.T) {
before := time.Now()
db := SharedHTTPAccessLogDAO.Instance
tableName, err := findAccessLogTable(db, "20201010", false)
if err != nil {
t.Fatal(err)
}
t.Log(tableName)
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
tableName, err = findAccessLogTable(db, "20201010", false)
if err != nil {
t.Fatal(err)
}
t.Log(tableName)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func BenchmarkFindAccessLogTable(b *testing.B) {
db := SharedHTTPAccessLogDAO.Instance
runtime.GOMAXPROCS(1)
for i := 0; i < b.N; i++ {
_, _ = findAccessLogTable(db, "20201010", false)
}
}

View File

@@ -0,0 +1,93 @@
package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time"
"strings"
"time"
)
type HTTPAccessLogDAO dbs.DAO
var SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO()
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
return dbs.NewDAO(&HTTPAccessLogDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeHTTPAccessLogs",
Model: new(HTTPAccessLog),
PkName: "id",
},
}).(*HTTPAccessLogDAO)
}
// 创建访问日志
func CreateHTTPAccessLogs(accessLogs []*pb.HTTPAccessLog) error {
dao := randomAccessLogDAO()
if dao == nil {
dao = SharedHTTPAccessLogDAO
}
return CreateHTTPAccessLogsWithDAO(dao, accessLogs)
}
// 使用特定的DAO创建访问日志
func CreateHTTPAccessLogsWithDAO(dao *HTTPAccessLogDAO, accessLogs []*pb.HTTPAccessLog) error {
if dao == nil {
return errors.New("dao should not be nil")
}
if len(accessLogs) == 0 {
return nil
}
// TODO 改成事务批量提交,以加快速度
for _, accessLog := range accessLogs {
day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
table, err := findAccessLogTable(dao.Instance, day, false)
if err != nil {
return err
}
fields := map[string]interface{}{}
fields["serverId"] = accessLog.ServerId
fields["nodeId"] = accessLog.NodeId
fields["status"] = accessLog.Status
fields["createdAt"] = accessLog.Timestamp
fields["day"] = day
content, err := json.Marshal(accessLog)
if err != nil {
return err
}
fields["content"] = content
_, err = dao.Query().
Table(table).
Sets(fields).
Insert()
if err != nil {
// 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试
if strings.Contains(err.Error(), "1146") {
table, err = findAccessLogTable(dao.Instance, day, true)
if err != nil {
return err
}
_, err = dao.Query().
Table(table).
Sets(fields).
Insert()
if err != nil {
return err
}
}
}
}
return nil
}

View File

@@ -0,0 +1,30 @@
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
"time"
)
func TestCreateHTTPAccessLogs(t *testing.T) {
err := NewDBNodeInitializer().loop()
if err != nil {
t.Fatal(err)
}
accessLog := &pb.HTTPAccessLog{
ServerId: 1,
NodeId: 4,
Status: 200,
Timestamp: time.Now().Unix(),
}
dao := randomAccessLogDAO()
t.Log("dao:", dao)
err = CreateHTTPAccessLogsWithDAO(dao, []*pb.HTTPAccessLog{accessLog})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -0,0 +1,26 @@
package models
//
type HTTPAccessLog struct {
Id uint64 `field:"id"` // ID
ServerId uint32 `field:"serverId"` // 服务ID
NodeId uint32 `field:"nodeId"` // 节点ID
Status uint32 `field:"status"` // 状态码
CreatedAt uint64 `field:"createdAt"` // 创建时间
Content string `field:"content"` // 日志内容
Day string `field:"day"` // 日期Ymd
}
type HTTPAccessLogOperator struct {
Id interface{} // ID
ServerId interface{} // 服务ID
NodeId interface{} // 节点ID
Status interface{} // 状态码
CreatedAt interface{} // 创建时间
Content interface{} // 日志内容
Day interface{} // 日期Ymd
}
func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {
return &HTTPAccessLogOperator{}
}

View File

@@ -0,0 +1 @@
package models

View File

@@ -22,6 +22,7 @@ type Node struct {
InstallStatus string `field:"installStatus"` // 安装状态 InstallStatus string `field:"installStatus"` // 安装状态
State uint8 `field:"state"` // 状态 State uint8 `field:"state"` // 状态
ConnectedAPINodes string `field:"connectedAPINodes"` // 当前连接的API节点 ConnectedAPINodes string `field:"connectedAPINodes"` // 当前连接的API节点
MaxCPU uint32 `field:"maxCPU"` // 可以使用的最多CPU
} }
type NodeOperator struct { type NodeOperator struct {
@@ -45,6 +46,7 @@ type NodeOperator struct {
InstallStatus interface{} // 安装状态 InstallStatus interface{} // 安装状态
State interface{} // 状态 State interface{} // 状态
ConnectedAPINodes interface{} // 当前连接的API节点 ConnectedAPINodes interface{} // 当前连接的API节点
MaxCPU interface{} // 可以使用的最多CPU
} }
func NewNodeOperator() *NodeOperator { func NewNodeOperator() *NodeOperator {

View File

@@ -26,7 +26,7 @@ func TestServerDAO_UpdateServerConfig(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = SharedServerDAO.UpdateServerConfig(1, configJSON) _, err = SharedServerDAO.UpdateServerConfig(1, configJSON, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -165,6 +165,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
pb.RegisterHTTPFirewallRuleSetServiceServer(rpcServer, &services.HTTPFirewallRuleSetService{}) pb.RegisterHTTPFirewallRuleSetServiceServer(rpcServer, &services.HTTPFirewallRuleSetService{})
pb.RegisterDBNodeServiceServer(rpcServer, &services.DBNodeService{}) pb.RegisterDBNodeServiceServer(rpcServer, &services.DBNodeService{})
pb.RegisterNodeLogServiceServer(rpcServer, &services.NodeLogService{}) pb.RegisterNodeLogServiceServer(rpcServer, &services.NodeLogService{})
pb.RegisterHTTPAccessLogServiceServer(rpcServer, &services.HTTPAccessLogService{})
err := rpcServer.Serve(listener) err := rpcServer.Serve(listener)
if err != nil { if err != nil {
return errors.New("[API]start rpc failed: " + err.Error()) return errors.New("[API]start rpc failed: " + err.Error())

View File

@@ -0,0 +1,31 @@
package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 访问日志相关服务
type HTTPAccessLogService struct {
}
// 创建访问日志
func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) {
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
if len(req.AccessLogs) == 0 {
return &pb.CreateHTTPAccessLogsResponse{}, nil
}
err = models.CreateHTTPAccessLogs(req.AccessLogs)
if err != nil {
return nil, err
}
return &pb.CreateHTTPAccessLogsResponse{}, nil
}