域名服务增加访问日志

This commit is contained in:
刘祥超
2021-06-02 11:53:24 +08:00
parent e9e4abff03
commit 442dd195ca
17 changed files with 829 additions and 121 deletions

View File

@@ -16,16 +16,29 @@ import (
"time"
)
var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB
var accessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO
var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB
var accessLogLocker = &sync.RWMutex{}
var accessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
// HTTP服务访问
var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO
var httpAccessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
// DNS服务访问
var nsAccessLogDAOMapping = map[int64]*NSAccessLogDAOWrapper{} // dbNodeId => DAO
var nsAccessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
// HTTPAccessLogDAOWrapper HTTP访问日志DAO
type HTTPAccessLogDAOWrapper struct {
DAO *HTTPAccessLogDAO
NodeId int64
}
// NSAccessLogDAOWrapper NS访问日志DAO
type NSAccessLogDAOWrapper struct {
DAO *NSAccessLogDAO
NodeId int64
}
func init() {
initializer := NewDBNodeInitializer()
dbs.OnReadyDone(func() {
@@ -34,12 +47,26 @@ func init() {
}
// 获取获取DAO
func randomAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
func randomHTTPAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
accessLogLocker.RLock()
if len(accessLogDAOMapping) == 0 {
if len(httpAccessLogDAOMapping) == 0 {
dao = nil
} else {
for _, d := range accessLogDAOMapping {
for _, d := range httpAccessLogDAOMapping {
dao = d
break
}
}
accessLogLocker.RUnlock()
return
}
func randomNSAccessLogDAO() (dao *NSAccessLogDAOWrapper) {
accessLogLocker.RLock()
if len(nsAccessLogDAOMapping) == 0 {
dao = nil
} else {
for _, d := range nsAccessLogDAOMapping {
dao = d
break
}
@@ -49,7 +76,7 @@ func randomAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
}
// 检查表格是否存在
func findAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) {
func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) {
if !regexp.MustCompile(`^\d{8}$`).MatchString(day) {
err = errors.New("invalid day '" + day + "', should be YYYYMMDD")
return
@@ -64,7 +91,36 @@ func findAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool,
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
accessLogLocker.RLock()
_, ok = accessLogTableMapping[cacheKey]
_, ok = httpAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, true, nil
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, false, err
}
return tableName, lists.ContainsString(tableNames, tableName), nil
}
func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) {
if !regexp.MustCompile(`^\d{8}$`).MatchString(day) {
err = errors.New("invalid day '" + day + "', should be YYYYMMDD")
return
}
config, err := db.Config()
if err != nil {
return "", false, err
}
tableName = "edgeNSAccessLogs_" + day
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
accessLogLocker.RLock()
_, ok = nsAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, true, nil
@@ -79,7 +135,7 @@ func findAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool,
}
// 根据日期获取表名
func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
config, err := db.Config()
if err != nil {
return "", err
@@ -90,7 +146,7 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
if !force {
accessLogLocker.RLock()
_, ok := accessLogTableMapping[cacheKey]
_, ok := httpAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, nil
@@ -104,7 +160,7 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
if lists.ContainsString(tableNames, tableName) {
accessLogLocker.Lock()
accessLogTableMapping[cacheKey] = true
httpAccessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
@@ -116,13 +172,56 @@ func findAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
}
accessLogLocker.Lock()
accessLogTableMapping[cacheKey] = true
httpAccessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
// 初始化数据库连接
func findNSAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
config, err := db.Config()
if err != nil {
return "", err
}
tableName := "edgeNSAccessLogs_" + day
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
if !force {
accessLogLocker.RLock()
_, ok := nsAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, nil
}
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, err
}
if lists.ContainsString(tableNames, tableName) {
accessLogLocker.Lock()
nsAccessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
// 创建表格
_, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `domainId` int(11) unsigned DEFAULT '0' COMMENT '域名ID',\n `recordId` int(11) unsigned DEFAULT '0' COMMENT '记录ID',\n `content` json DEFAULT NULL COMMENT '访问数据',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n PRIMARY KEY (`id`),\n KEY `nodeId` (`nodeId`),\n KEY `domainId` (`domainId`),\n KEY `recordId` (`recordId`),\n KEY `requestId` (`requestId`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='域名服务访问日志';")
if err != nil {
return tableName, err
}
accessLogLocker.Lock()
nsAccessLogTableMapping[cacheKey] = true
accessLogLocker.Unlock()
return tableName, nil
}
// DBNodeInitializer 初始化数据库连接
type DBNodeInitializer struct {
}
@@ -130,7 +229,7 @@ func NewDBNodeInitializer() *DBNodeInitializer {
return &DBNodeInitializer{}
}
// 启动
// Start 启动
func (this *DBNodeInitializer) Start() {
// 初始运行
err := this.loop()
@@ -167,7 +266,8 @@ func (this *DBNodeInitializer) loop() error {
if !lists.ContainsInt64(nodeIds, nodeId) {
closingDbs = append(closingDbs, db)
delete(accessLogDBMapping, nodeId)
delete(accessLogDAOMapping, nodeId)
delete(httpAccessLogDAOMapping, nodeId)
delete(nsAccessLogDAOMapping, nodeId)
logs.Println("[DB_NODE]close db node '" + strconv.FormatInt(nodeId, 10) + "'")
}
}
@@ -213,46 +313,94 @@ func (this *DBNodeInitializer) loop() error {
}
// 检查表是否存在
tableName, err := findAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
// httpAccessLog
{
tableName, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
// 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error())
// 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error())
}
continue
} else {
err = nil
}
continue
} else {
err = nil
}
daoObject := dbs.DAOObject{
Instance: db,
DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")",
Table: tableName,
PkName: "id",
Model: new(HTTPAccessLog),
}
err = daoObject.Init()
if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
continue
}
accessLogLocker.Lock()
accessLogDBMapping[nodeId] = db
dao := &HTTPAccessLogDAO{
DAOObject: daoObject,
}
httpAccessLogDAOMapping[nodeId] = &HTTPAccessLogDAOWrapper{
DAO: dao,
NodeId: nodeId,
}
accessLogLocker.Unlock()
}
daoObject := dbs.DAOObject{
Instance: db,
DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")",
Table: tableName,
PkName: "id",
Model: new(HTTPAccessLog),
}
err = daoObject.Init()
if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
continue
}
accessLogLocker.Lock()
accessLogDBMapping[nodeId] = db
dao := &HTTPAccessLogDAO{
DAOObject: daoObject,
// nsAccessLog
{
tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
// 创建节点日志
createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix())
if createLogErr != nil {
logs.Println("[NODE_LOG]" + createLogErr.Error())
}
continue
} else {
err = nil
}
}
daoObject := dbs.DAOObject{
Instance: db,
DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")",
Table: tableName,
PkName: "id",
Model: new(NSAccessLog),
}
err = daoObject.Init()
if err != nil {
logs.Println("[DB_NODE]initialize dao failed: " + err.Error())
continue
}
accessLogLocker.Lock()
accessLogDBMapping[nodeId] = db
dao := &NSAccessLogDAO{
DAOObject: daoObject,
}
nsAccessLogDAOMapping[nodeId] = &NSAccessLogDAOWrapper{
DAO: dao,
NodeId: nodeId,
}
accessLogLocker.Unlock()
}
accessLogDAOMapping[nodeId] = &HTTPAccessLogDAOWrapper{
DAO: dao,
NodeId: nodeId,
}
accessLogLocker.Unlock()
}
}