add log position tracking
This commit is contained in:
@@ -26,3 +26,21 @@ func (r *StateHistoryRepository) GetAll(ctx context.Context, filter *model.State
|
|||||||
func (r *StateHistoryRepository) Insert(ctx context.Context, model *model.StateHistory) error {
|
func (r *StateHistoryRepository) Insert(ctx context.Context, model *model.StateHistory) error {
|
||||||
return r.BaseRepository.Insert(ctx, model)
|
return r.BaseRepository.Insert(ctx, model)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLastSessionID gets the last session ID for a server
|
||||||
|
func (r *StateHistoryRepository) GetLastSessionID(ctx context.Context, serverID uint) (uint, error) {
|
||||||
|
var lastSession model.StateHistory
|
||||||
|
result := r.BaseRepository.db.WithContext(ctx).
|
||||||
|
Where("server_id = ?", serverID).
|
||||||
|
Order("session_id DESC").
|
||||||
|
First(&lastSession)
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
if result.Error == gorm.ErrRecordNotFound {
|
||||||
|
return 0, nil // Return 0 if no sessions found
|
||||||
|
}
|
||||||
|
return 0, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastSession.SessionID, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -118,10 +118,12 @@ func (s *ServerService) shouldInsertStateHistory(serverID uint) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServerService) getNextSessionID(serverID uint) uint {
|
func (s *ServerService) getNextSessionID(serverID uint) uint {
|
||||||
currentID, _ := s.sessionIDs.LoadOrStore(serverID, uint(0))
|
lastID, err := s.stateHistoryRepo.GetLastSessionID(context.Background(), serverID)
|
||||||
nextID := currentID.(uint) + 1
|
if err != nil {
|
||||||
s.sessionIDs.Store(serverID, nextID)
|
logging.Error("Failed to get last session ID for server %d: %v", serverID, err)
|
||||||
return nextID
|
return 1 // Return 1 as fallback
|
||||||
|
}
|
||||||
|
return lastID + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServerService) insertStateHistory(serverID uint, state *model.ServerState) {
|
func (s *ServerService) insertStateHistory(serverID uint, state *model.ServerState) {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ type LogTailer struct {
|
|||||||
handleLine func(string)
|
handleLine func(string)
|
||||||
stopChan chan struct{}
|
stopChan chan struct{}
|
||||||
isRunning bool
|
isRunning bool
|
||||||
|
tracker *PositionTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogTailer(filePath string, handleLine func(string)) *LogTailer {
|
func NewLogTailer(filePath string, handleLine func(string)) *LogTailer {
|
||||||
@@ -18,6 +19,7 @@ func NewLogTailer(filePath string, handleLine func(string)) *LogTailer {
|
|||||||
filePath: filePath,
|
filePath: filePath,
|
||||||
handleLine: handleLine,
|
handleLine: handleLine,
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
|
tracker: NewPositionTracker(filePath),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,7 +30,13 @@ func (t *LogTailer) Start() {
|
|||||||
t.isRunning = true
|
t.isRunning = true
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var lastSize int64 = 0
|
// Load last position from tracker
|
||||||
|
pos, err := t.tracker.LoadPosition()
|
||||||
|
if err != nil {
|
||||||
|
pos = &LogPosition{} // Start from beginning if error
|
||||||
|
}
|
||||||
|
lastSize := pos.LastPosition
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.stopChan:
|
case <-t.stopChan:
|
||||||
@@ -56,8 +64,15 @@ func (t *LogTailer) Start() {
|
|||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
t.handleLine(scanner.Text())
|
line := scanner.Text()
|
||||||
|
t.handleLine(line)
|
||||||
lastSize, _ = file.Seek(0, 1) // Get current position
|
lastSize, _ = file.Seek(0, 1) // Get current position
|
||||||
|
|
||||||
|
// Save position periodically
|
||||||
|
t.tracker.SavePosition(&LogPosition{
|
||||||
|
LastPosition: lastSize,
|
||||||
|
LastRead: line,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
file.Close()
|
file.Close()
|
||||||
|
|||||||
54
local/utl/tracking/position_tracker.go
Normal file
54
local/utl/tracking/position_tracker.go
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
package tracking
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LogPosition struct {
|
||||||
|
LastPosition int64 `json:"last_position"`
|
||||||
|
LastRead string `json:"last_read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PositionTracker struct {
|
||||||
|
positionFile string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPositionTracker(logPath string) *PositionTracker {
|
||||||
|
// Create position file in same directory as log file
|
||||||
|
dir := filepath.Dir(logPath)
|
||||||
|
base := filepath.Base(logPath)
|
||||||
|
positionFile := filepath.Join(dir, "."+base+".position")
|
||||||
|
|
||||||
|
return &PositionTracker{
|
||||||
|
positionFile: positionFile,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *PositionTracker) LoadPosition() (*LogPosition, error) {
|
||||||
|
data, err := os.ReadFile(t.positionFile)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// Return empty position if file doesn't exist
|
||||||
|
return &LogPosition{}, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pos LogPosition
|
||||||
|
if err := json.Unmarshal(data, &pos); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pos, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *PositionTracker) SavePosition(pos *LogPosition) error {
|
||||||
|
data, err := json.Marshal(pos)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.WriteFile(t.positionFile, data, 0644)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user