support periodically cleaning up expired tokens

This commit is contained in:
MaysWind
2024-08-12 00:49:07 +08:00
parent 80b8b9afdd
commit 52dfee9ca6
22 changed files with 506 additions and 7 deletions
+101
View File
@@ -0,0 +1,101 @@
package cron
import (
"time"
"github.com/go-co-op/gocron/v2"
"github.com/mayswind/ezbookkeeping/pkg/errs"
"github.com/mayswind/ezbookkeeping/pkg/log"
"github.com/mayswind/ezbookkeeping/pkg/settings"
)
// CronJobSchedulerContainer contains the current cron job scheduler
type CronJobSchedulerContainer struct {
scheduler gocron.Scheduler
allJobs []*CronJob
allJobsMap map[string]*CronJob
allGocronJobsMap map[string]gocron.Job
}
// Initialize a cron job scheduler container singleton instance
var (
Container = &CronJobSchedulerContainer{
allJobsMap: make(map[string]*CronJob),
allGocronJobsMap: make(map[string]gocron.Job),
}
)
// InitializeCronJobSchedulerContainer initializes the cron job scheduler according to the config
func InitializeCronJobSchedulerContainer(config *settings.Config, startScheduler bool) error {
var err error
Container.scheduler, err = gocron.NewScheduler(
gocron.WithLocation(time.Local),
gocron.WithLogger(NewGocronLoggerAdapter()),
)
if err != nil {
return err
}
Container.registerAllJobs(config)
if startScheduler {
Container.scheduler.Start()
}
return nil
}
// GetAllJobs returns all the cron jobs
func (c *CronJobSchedulerContainer) GetAllJobs() []*CronJob {
return c.allJobs
}
// SyncRunJobNow runs the specified cron job synchronously now
func (c *CronJobSchedulerContainer) SyncRunJobNow(jobName string) error {
if jobName == "" {
return errs.ErrCronJobNameIsEmpty
}
job := c.allJobsMap[jobName]
if job == nil {
return errs.ErrCronJobNotExistsOrNotEnabled
}
gocronJob := c.allGocronJobsMap[jobName]
if gocronJob == nil {
return errs.ErrCronJobNotExistsOrNotEnabled
}
job.doRun()
return nil
}
func (c *CronJobSchedulerContainer) registerAllJobs(config *settings.Config) {
if config.EnableRemoveExpiredTokens {
Container.registerIntervalJob(RemoveExpiredTokensJob)
}
}
func (c *CronJobSchedulerContainer) registerIntervalJob(job *CronJob) {
gocronJob, err := c.scheduler.NewJob(
gocron.DurationJob(job.Interval),
gocron.NewTask(job.doRun),
gocron.WithName(job.Name),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err == nil {
c.allJobs = append(c.allJobs, job)
c.allJobsMap[job.Name] = job
c.allGocronJobsMap[job.Name] = gocronJob
log.Infof("[cron_container.registerJob] job \"%s\" has been registered", job.Name)
log.Debugf("[cron_container.registerJob] job \"%s\" gocron id is %s", job.Name, gocronJob.ID())
} else {
log.Errorf("[cron_container.registerJob] job \"%s\" cannot be been registered, because %s", job.Name, err.Error())
}
}
+49
View File
@@ -0,0 +1,49 @@
package cron
import (
"fmt"
"time"
"github.com/mayswind/ezbookkeeping/pkg/duplicatechecker"
"github.com/mayswind/ezbookkeeping/pkg/log"
"github.com/mayswind/ezbookkeeping/pkg/utils"
)
type CronJob struct {
Name string
Description string
Interval time.Duration
Run func() error
}
func (c *CronJob) doRun() {
start := time.Now()
localAddr, err := utils.GetLocalIPAddressesString()
if err != nil {
log.Warnf("[cron_job.doRun] job \"%s\" cannot get local ipv4 address, because %s", c.Name, err.Error())
return
}
currentInfo := fmt.Sprintf("ip: %s, startTime: %d", localAddr, time.Now().Unix())
found, runningInfo := duplicatechecker.Container.GetOrSetCronJobRunningInfo(c.Name, currentInfo, c.Interval)
if found {
log.Warnf("[cron_job.doRun] job \"%s\" is already running (%s)", c.Name, runningInfo)
return
}
err = c.Run()
duplicatechecker.Container.Current.RemoveCronJobRunningInfo(c.Name)
now := time.Now()
if err != nil {
log.Errorf("[cron_job.doRun] failed to run job \"%s\", because %s", c.Name, err.Error())
return
}
cost := now.Sub(start).Nanoseconds() / 1e6
log.Infof("[cron_job.doRun] run job \"%s\" successfully, cost %dms", c.Name, cost)
}
+16
View File
@@ -0,0 +1,16 @@
package cron
import (
"time"
"github.com/mayswind/ezbookkeeping/pkg/services"
)
var RemoveExpiredTokensJob = &CronJob{
Name: "RemoveExpiredTokens",
Description: "Periodically remove expired user tokens from the database.",
Interval: 24 * time.Hour,
Run: func() error {
return services.Tokens.DeleteAllExpiredTokens(nil)
},
}
+36
View File
@@ -0,0 +1,36 @@
package cron
import (
"github.com/go-co-op/gocron/v2"
"github.com/mayswind/ezbookkeeping/pkg/log"
)
// GocronLoggerAdapter represents the logger adapter for gocron
type GocronLoggerAdapter struct {
}
// Debug logs debug log
func (logger GocronLoggerAdapter) Debug(msg string, args ...any) {
log.Debugf(msg, args...)
}
// Info logs info log
func (logger GocronLoggerAdapter) Info(msg string, args ...any) {
log.Infof(msg, args...)
}
// Warn logs warn log
func (logger GocronLoggerAdapter) Warn(msg string, args ...any) {
log.Warnf(msg, args...)
}
// Error logs error log
func (logger GocronLoggerAdapter) Error(msg string, args ...any) {
log.Errorf(msg, args...)
}
// NewGocronLoggerAdapter returns a new GocronLoggerAdapter instance
func NewGocronLoggerAdapter() gocron.Logger {
return GocronLoggerAdapter{}
}
+10
View File
@@ -12,6 +12,16 @@ type DataStore struct {
databases []*Database
}
// Count returns total count of database instances
func (s *DataStore) Count() int {
return len(s.databases)
}
// Get returns a database instance by index
func (s *DataStore) Get(index int) *Database {
return s.databases[index]
}
// Choose returns a database instance by sharding key
func (s *DataStore) Choose(key int64) *Database {
return s.databases[0]
@@ -1,7 +1,11 @@
package duplicatechecker
import "time"
// DuplicateChecker is common duplicate checker interface
type DuplicateChecker interface {
GetSubmissionRemark(checkerType DuplicateCheckerType, uid int64, identification string) (bool, string)
SetSubmissionRemark(checkerType DuplicateCheckerType, uid int64, identification string, remark string)
GetOrSetCronJobRunningInfo(jobName string, runningInfo string, runningInterval time.Duration) (bool, string)
RemoveCronJobRunningInfo(jobName string)
}
@@ -1,6 +1,8 @@
package duplicatechecker
import (
"time"
"github.com/mayswind/ezbookkeeping/pkg/errs"
"github.com/mayswind/ezbookkeeping/pkg/settings"
)
@@ -36,3 +38,13 @@ func (c *DuplicateCheckerContainer) GetSubmissionRemark(checkerType DuplicateChe
func (c *DuplicateCheckerContainer) SetSubmissionRemark(checkerType DuplicateCheckerType, uid int64, identification string, remark string) {
c.Current.SetSubmissionRemark(checkerType, uid, identification, remark)
}
// GetOrSetCronJobRunningInfo returns the running info when the cron job is running or saves the running info by the current duplicate checker
func (c *DuplicateCheckerContainer) GetOrSetCronJobRunningInfo(jobName string, runningInfo string, runningInterval time.Duration) (bool, string) {
return c.Current.GetOrSetCronJobRunningInfo(jobName, runningInfo, runningInterval)
}
// RemoveCronJobRunningInfo removes the running info of the cron job by the current duplicate checker
func (c *DuplicateCheckerContainer) RemoveCronJobRunningInfo(jobName string) {
c.Current.RemoveCronJobRunningInfo(jobName)
}
@@ -5,9 +5,9 @@ type DuplicateCheckerType uint8
// Types of uuid
const (
DUPLICATE_CHECKER_TYPE_DEFAULT DuplicateCheckerType = 0
DUPLICATE_CHECKER_TYPE_NEW_ACCOUNT DuplicateCheckerType = 1
DUPLICATE_CHECKER_TYPE_NEW_CATEGORY DuplicateCheckerType = 2
DUPLICATE_CHECKER_TYPE_NEW_TRANSACTION DuplicateCheckerType = 3
DUPLICATE_CHECKER_TYPE_NEW_TEMPLATE DuplicateCheckerType = 4
DUPLICATE_CHECKER_TYPE_BACKGROUND_CRON_JOB DuplicateCheckerType = 0
DUPLICATE_CHECKER_TYPE_NEW_ACCOUNT DuplicateCheckerType = 1
DUPLICATE_CHECKER_TYPE_NEW_CATEGORY DuplicateCheckerType = 2
DUPLICATE_CHECKER_TYPE_NEW_TRANSACTION DuplicateCheckerType = 3
DUPLICATE_CHECKER_TYPE_NEW_TEMPLATE DuplicateCheckerType = 4
)
@@ -2,6 +2,8 @@ package duplicatechecker
import (
"fmt"
"sync"
"time"
"github.com/patrickmn/go-cache"
@@ -11,6 +13,8 @@ import (
// InMemoryDuplicateChecker represents in-memory duplicate checker
type InMemoryDuplicateChecker struct {
cache *cache.Cache
mutex sync.Mutex
}
// NewInMemoryDuplicateChecker returns a new in-memory duplicate checker
@@ -38,6 +42,33 @@ func (c *InMemoryDuplicateChecker) SetSubmissionRemark(checkerType DuplicateChec
c.cache.Set(c.getCacheKey(checkerType, uid, identification), remark, cache.DefaultExpiration)
}
// GetOrSetCronJobRunningInfo returns the running info when the cron job is running or saves the running info by the current duplicate checker
func (c *InMemoryDuplicateChecker) GetOrSetCronJobRunningInfo(jobName string, runningInfo string, runningInterval time.Duration) (bool, string) {
c.mutex.Lock()
defer c.mutex.Unlock()
existedRunningInfo, found := c.cache.Get(c.getCacheKey(DUPLICATE_CHECKER_TYPE_BACKGROUND_CRON_JOB, 0, jobName))
if found {
return true, existedRunningInfo.(string)
}
expiration := runningInterval
if expiration > 1*time.Second {
expiration = expiration - 1*time.Second
}
c.cache.Set(c.getCacheKey(DUPLICATE_CHECKER_TYPE_BACKGROUND_CRON_JOB, 0, jobName), runningInfo, expiration)
return false, ""
}
// RemoveCronJobRunningInfo removes the running info of the cron job by the current duplicate checker
func (c *InMemoryDuplicateChecker) RemoveCronJobRunningInfo(jobName string) {
c.cache.Delete(c.getCacheKey(DUPLICATE_CHECKER_TYPE_BACKGROUND_CRON_JOB, 0, jobName))
}
func (c *InMemoryDuplicateChecker) getCacheKey(checkerType DuplicateCheckerType, uid int64, identification string) string {
return fmt.Sprintf("%d|%d|%s", checkerType, uid, identification)
}
+9
View File
@@ -0,0 +1,9 @@
package errs
import "net/http"
// Error codes related to cron jobs
var (
ErrCronJobNameIsEmpty = NewSystemError(SystemSubcategoryCron, 0, http.StatusInternalServerError, "cron job name is empty")
ErrCronJobNotExistsOrNotEnabled = NewSystemError(SystemSubcategoryCron, 1, http.StatusInternalServerError, "cron job not exists or not enabled")
)
+52
View File
@@ -1,5 +1,9 @@
package errs
import (
"strings"
)
// ErrorCategory represents error category
type ErrorCategory int32
@@ -16,6 +20,7 @@ const (
SystemSubcategoryDatabase = 2
SystemSubcategoryMail = 3
SystemSubcategoryLogging = 4
SystemSubcategoryCron = 5
)
// Sub categories of normal error
@@ -44,6 +49,10 @@ type Error struct {
Context any
}
type MultiErrors struct {
errors []error
}
// Error returns the error message
func (err *Error) Error() string {
return err.Message
@@ -66,6 +75,34 @@ func New(category ErrorCategory, subCategory int32, index int32, httpStatusCode
}
}
// Error returns the error message
func (err *MultiErrors) Error() string {
if len(err.errors) == 1 {
return err.errors[0].Error()
}
var ret strings.Builder
var lastErrorChar byte
ret.WriteString("multi errors: ")
for i := 0; i < len(err.errors); i++ {
if i > 0 {
if lastErrorChar == '.' {
ret.WriteString(" ")
} else {
ret.WriteString(", ")
}
}
errorContent := err.errors[i].Error()
lastErrorChar = errorContent[len(errorContent)-1]
ret.WriteString(errorContent)
}
return ret.String()
}
// NewSystemError returns a new system error instance
func NewSystemError(subCategory int32, index int32, httpStatusCode int, message string) *Error {
return New(CATEGORY_SYSTEM, subCategory, index, httpStatusCode, message)
@@ -107,6 +144,21 @@ func NewErrorWithContext(baseError *Error, context any) *Error {
}
}
// NewMultiErrorOrNil returns a new multi error instance
func NewMultiErrorOrNil(errors ...error) error {
count := len(errors)
if count < 1 {
return nil
} else if count == 1 {
return errors[0]
}
return &MultiErrors{
errors: errors,
}
}
// Or would return the error from err parameter if the this error is defined in this project,
// or return the default error
func Or(err error, defaultErr *Error) *Error {
+2 -2
View File
@@ -7,13 +7,13 @@ const TokenMaxUserAgentLength = 255
// TokenRecord represents token data stored in database
type TokenRecord struct {
Uid int64 `xorm:"PK INDEX(IDX_token_record_uid_type_expired_time)"`
Uid int64 `xorm:"PK INDEX(IDX_token_record_uid_type_expired_time) INDEX(IDX_token_record_expired_time)"`
UserTokenId int64 `xorm:"PK"`
TokenType core.TokenType `xorm:"INDEX(IDX_token_record_uid_type_expired_time) TINYINT NOT NULL"`
Secret string `xorm:"VARCHAR(10) NOT NULL"`
UserAgent string `xorm:"VARCHAR(255)"`
CreatedUnixTime int64 `xorm:"PK"`
ExpiredUnixTime int64 `xorm:"INDEX(IDX_token_record_uid_type_expired_time)"`
ExpiredUnixTime int64 `xorm:"INDEX(IDX_token_record_uid_type_expired_time) INDEX(IDX_token_record_expired_time)"`
LastSeenUnixTime int64
}
+10
View File
@@ -23,6 +23,16 @@ func (s *ServiceUsingDB) TokenDB(uid int64) *datastore.Database {
return s.container.TokenStore.Choose(uid)
}
// TokenDBByIndex returns the datastore by index
func (s *ServiceUsingDB) TokenDBByIndex(index int) *datastore.Database {
return s.container.TokenStore.Get(index)
}
// TokenDBCount returns the count of datastores which contains user token
func (s *ServiceUsingDB) TokenDBCount() int {
return s.container.TokenStore.Count()
}
// UserDataDB returns the datastore which contains user data
func (s *ServiceUsingDB) UserDataDB(uid int64) *datastore.Database {
return s.container.UserDataStore.Choose(uid)
+26
View File
@@ -207,6 +207,32 @@ func (s *TokenService) DeleteTokensByType(c *core.Context, uid int64, tokenType
})
}
// DeleteAllExpiredTokens deletes all expired tokens
func (s *TokenService) DeleteAllExpiredTokens(c *core.Context) error {
var errors []error
totalCount := int64(0)
for i := 0; i < s.TokenDBCount(); i++ {
err := s.TokenDBByIndex(i).DoTransaction(c, func(sess *xorm.Session) error {
count, err := sess.Where("expired_unix_time<=?", time.Now().Unix()).Delete(&models.TokenRecord{})
totalCount += count
return err
})
if err != nil {
errors = append(errors, err)
}
}
if totalCount > 0 {
log.Infof("[tokens.DeleteAllExpiredTokens] %d expired tokens have been deleted", totalCount)
} else if len(errors) == 0 {
log.Infof("[tokens.DeleteAllExpiredTokens] no expired tokens have been deleted")
}
return errs.NewMultiErrorOrNil(errors...)
}
// ExistsValidTokenByType returns whether the given token type exists
func (s *TokenService) ExistsValidTokenByType(c *core.Context, uid int64, tokenType core.TokenType) (bool, error) {
if uid <= 0 {
+15
View File
@@ -252,6 +252,9 @@ type Config struct {
DuplicateSubmissionsInterval uint32
DuplicateSubmissionsIntervalDuration time.Duration
// Cron
EnableRemoveExpiredTokens bool
// Secret
SecretKeyNoSet bool
SecretKey string
@@ -373,6 +376,12 @@ func LoadConfiguration(configFilePath string) (*Config, error) {
return nil, err
}
err = loadCronConfiguration(config, cfgFile, "cron")
if err != nil {
return nil, err
}
err = loadSecurityConfiguration(config, cfgFile, "security")
if err != nil {
@@ -674,6 +683,12 @@ func loadDuplicateCheckerConfiguration(config *Config, configFile *ini.File, sec
return nil
}
func loadCronConfiguration(config *Config, configFile *ini.File, sectionName string) error {
config.EnableRemoveExpiredTokens = getConfigItemBoolValue(configFile, sectionName, "enable_remove_expired_tokens", false)
return nil
}
func loadSecurityConfiguration(config *Config, configFile *ini.File, sectionName string) error {
config.SecretKeyNoSet = !getConfigItemIsSet(configFile, sectionName, "secret_key")
config.SecretKey = getConfigItemStringValue(configFile, sectionName, "secret_key", defaultSecretKey)