show process when importing a lot of transactions
This commit is contained in:
+69
-6
@@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -1344,11 +1345,21 @@ func (a *TransactionsApi) TransactionImportHandler(c *core.WebContext) (any, *er
|
||||
found, remark := a.GetSubmissionRemark(duplicatechecker.DUPLICATE_CHECKER_TYPE_IMPORT_TRANSACTIONS, uid, transactionImportReq.ClientSessionId)
|
||||
|
||||
if found {
|
||||
log.Infof(c, "[transactions.TransactionImportHandler] another \"%s\" transactions has been imported for user \"uid:%d\"", remark, uid)
|
||||
count, err := utils.StringToInt(remark)
|
||||
items := strings.Split(remark, ":")
|
||||
|
||||
if err == nil {
|
||||
return count, nil
|
||||
if len(items) >= 2 {
|
||||
if items[0] == "finished" {
|
||||
log.Infof(c, "[transactions.TransactionImportHandler] another \"%s\" transactions has been imported for user \"uid:%d\"", items[1], uid)
|
||||
count, err := utils.StringToInt(items[1])
|
||||
|
||||
if err == nil {
|
||||
return count, nil
|
||||
}
|
||||
} else if items[0] == "processing" {
|
||||
return nil, errs.ErrRepeatedRequest
|
||||
}
|
||||
} else {
|
||||
log.Warnf(c, "[transactions.TransactionImportHandler] another transaction import task may be executing, but remark \"%s\" is invalid", remark)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1422,7 +1433,9 @@ func (a *TransactionsApi) TransactionImportHandler(c *core.WebContext) (any, *er
|
||||
newTransactions[i] = transaction
|
||||
}
|
||||
|
||||
err = a.transactions.BatchCreateTransactions(c, user.Uid, newTransactions, newTransactionTagIdsMap)
|
||||
err = a.transactions.BatchCreateTransactions(c, user.Uid, newTransactions, newTransactionTagIdsMap, func(currentProcess float64) {
|
||||
a.SetSubmissionRemarkIfEnable(duplicatechecker.DUPLICATE_CHECKER_TYPE_IMPORT_TRANSACTIONS, uid, transactionImportReq.ClientSessionId, fmt.Sprintf("processing:%.2f", currentProcess))
|
||||
})
|
||||
count := len(newTransactions)
|
||||
|
||||
if err != nil {
|
||||
@@ -1432,11 +1445,61 @@ func (a *TransactionsApi) TransactionImportHandler(c *core.WebContext) (any, *er
|
||||
|
||||
log.Infof(c, "[transactions.TransactionImportHandler] user \"uid:%d\" has imported %d transactions successfully", uid, count)
|
||||
|
||||
a.SetSubmissionRemarkIfEnable(duplicatechecker.DUPLICATE_CHECKER_TYPE_IMPORT_TRANSACTIONS, uid, transactionImportReq.ClientSessionId, utils.IntToString(count))
|
||||
a.SetSubmissionRemarkIfEnable(duplicatechecker.DUPLICATE_CHECKER_TYPE_IMPORT_TRANSACTIONS, uid, transactionImportReq.ClientSessionId, fmt.Sprintf("finished:%d", count))
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// TransactionImportProcessHandler returns the process of specified transaction import task by request parameters for current user
|
||||
func (a *TransactionsApi) TransactionImportProcessHandler(c *core.WebContext) (any, *errs.Error) {
|
||||
var transactionImportProcessReq models.TransactionImportProcessRequest
|
||||
err := c.ShouldBindQuery(&transactionImportProcessReq)
|
||||
|
||||
if err != nil {
|
||||
log.Warnf(c, "[transactions.TransactionImportProcessHandler] parse request failed, because %s", err.Error())
|
||||
return nil, errs.NewIncompleteOrIncorrectSubmissionError(err)
|
||||
}
|
||||
|
||||
uid := c.GetCurrentUid()
|
||||
|
||||
if !a.CurrentConfig().EnableDuplicateSubmissionsCheck {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
found, remark := a.GetSubmissionRemark(duplicatechecker.DUPLICATE_CHECKER_TYPE_IMPORT_TRANSACTIONS, uid, transactionImportProcessReq.ClientSessionId)
|
||||
|
||||
if !found {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
items := strings.Split(remark, ":")
|
||||
|
||||
if len(items) < 2 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if items[0] == "finished" {
|
||||
return 100, nil
|
||||
} else if items[0] != "processing" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
process, err := utils.StringToFloat64(items[1])
|
||||
|
||||
if err != nil {
|
||||
log.Warnf(c, "[transactions.TransactionImportProcessHandler] parse process failed, because %s", err.Error())
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if process < 0 {
|
||||
return nil, nil
|
||||
} else if process >= 100 {
|
||||
process = 100
|
||||
}
|
||||
|
||||
return process, nil
|
||||
}
|
||||
|
||||
func (a *TransactionsApi) filterTransactions(c *core.WebContext, uid int64, transactions []*models.Transaction, accountMap map[int64]*models.Account) []*models.Transaction {
|
||||
finalTransactions := make([]*models.Transaction, 0, len(transactions))
|
||||
|
||||
|
||||
@@ -810,7 +810,7 @@ func (l *UserDataCli) ImportTransaction(c *core.CliContext, username string, fil
|
||||
return errs.ErrOperationFailed
|
||||
}
|
||||
|
||||
err = l.transactions.BatchCreateTransactions(c, user.Uid, newTransactions, newTransactionTagIdsMap)
|
||||
err = l.transactions.BatchCreateTransactions(c, user.Uid, newTransactions, newTransactionTagIdsMap, nil)
|
||||
|
||||
if err != nil {
|
||||
log.CliErrorf(c, "[user_data.ImportTransaction] failed to create transaction, because %s", err.Error())
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package core
|
||||
|
||||
// TaskProcessUpdateHandler represents the task process update handler
|
||||
type TaskProcessUpdateHandler func(currentProcess float64)
|
||||
@@ -26,6 +26,7 @@ var (
|
||||
ErrUploadedFileEmpty = NewNormalError(NormalSubcategoryGlobal, 16, http.StatusBadRequest, "uploaded file is empty")
|
||||
ErrExceedMaxUploadFileSize = NewNormalError(NormalSubcategoryGlobal, 17, http.StatusBadRequest, "uploaded file size exceeds the maximum allowed size")
|
||||
ErrFailureCountLimitReached = NewNormalError(NormalSubcategoryGlobal, 18, http.StatusBadRequest, "failure count exceeded maximum limit")
|
||||
ErrRepeatedRequest = NewNormalError(NormalSubcategoryGlobal, 19, http.StatusBadRequest, "repeated request")
|
||||
)
|
||||
|
||||
// GetParameterInvalidMessage returns specific error message for invalid parameter error
|
||||
|
||||
@@ -149,6 +149,11 @@ type TransactionImportRequest struct {
|
||||
ClientSessionId string `json:"clientSessionId"`
|
||||
}
|
||||
|
||||
// TransactionImportProcessRequest represents all parameters of transaction import process request
|
||||
type TransactionImportProcessRequest struct {
|
||||
ClientSessionId string `form:"client_session_id"`
|
||||
}
|
||||
|
||||
// TransactionCountRequest represents transaction count request
|
||||
type TransactionCountRequest struct {
|
||||
Type TransactionDbType `form:"type" binding:"min=0,max=4"`
|
||||
|
||||
@@ -2,6 +2,7 @@ package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -260,8 +261,11 @@ func (s *TransactionService) CreateTransaction(c core.Context, transaction *mode
|
||||
}
|
||||
|
||||
// BatchCreateTransactions saves new transactions to database
|
||||
func (s *TransactionService) BatchCreateTransactions(c core.Context, uid int64, transactions []*models.Transaction, allTagIds map[int][]int64) error {
|
||||
func (s *TransactionService) BatchCreateTransactions(c core.Context, uid int64, transactions []*models.Transaction, allTagIds map[int][]int64, processHandler core.TaskProcessUpdateHandler) error {
|
||||
now := time.Now().Unix()
|
||||
currentProcess := float64(0)
|
||||
processUpdateStep := int(math.Max(100.0, float64(len(transactions)/100.0)))
|
||||
|
||||
needTransactionUuidCount := uint16(0)
|
||||
needTagIndexUuidCount := uint16(0)
|
||||
|
||||
@@ -366,6 +370,12 @@ func (s *TransactionService) BatchCreateTransactions(c core.Context, uid int64,
|
||||
transactionTagIds := allTransactionTagIds[transaction.TransactionId]
|
||||
err := s.doCreateTransaction(c, userDataDb, sess, transaction, transactionTagIndexes, transactionTagIds, nil, nil)
|
||||
|
||||
currentProcess = float64(i) / float64(len(transactions)) * 100
|
||||
|
||||
if processHandler != nil && i%processUpdateStep == 0 {
|
||||
processHandler(currentProcess)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
transactionUnixTime := utils.GetUnixTimeFromTransactionTime(transaction.TransactionTime)
|
||||
transactionTimeZone := time.FixedZone("Transaction Timezone", int(transaction.TimezoneUtcOffset)*60)
|
||||
|
||||
Reference in New Issue
Block a user