From f218a6b7cd91fdc6333f0942eead358ca6d03a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=20Jurmanovi=C4=87?= Date: Tue, 3 Jun 2025 20:57:09 +0200 Subject: [PATCH] add log position tracking --- local/repository/state_history.go | 18 +++++++++ local/service/server.go | 10 +++-- local/utl/tracking/log_tailer.go | 19 ++++++++- local/utl/tracking/position_tracker.go | 54 ++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 local/utl/tracking/position_tracker.go diff --git a/local/repository/state_history.go b/local/repository/state_history.go index cbd5e09..13e7095 100644 --- a/local/repository/state_history.go +++ b/local/repository/state_history.go @@ -26,3 +26,21 @@ func (r *StateHistoryRepository) GetAll(ctx context.Context, filter *model.State func (r *StateHistoryRepository) Insert(ctx context.Context, model *model.StateHistory) error { return r.BaseRepository.Insert(ctx, model) } + +// GetLastSessionID gets the last session ID for a server +func (r *StateHistoryRepository) GetLastSessionID(ctx context.Context, serverID uint) (uint, error) { + var lastSession model.StateHistory + result := r.BaseRepository.db.WithContext(ctx). + Where("server_id = ?", serverID). + Order("session_id DESC"). + First(&lastSession) + + if result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return 0, nil // Return 0 if no sessions found + } + return 0, result.Error + } + + return lastSession.SessionID, nil +} diff --git a/local/service/server.go b/local/service/server.go index 0518426..bcd19e1 100644 --- a/local/service/server.go +++ b/local/service/server.go @@ -118,10 +118,12 @@ func (s *ServerService) shouldInsertStateHistory(serverID uint) bool { } func (s *ServerService) getNextSessionID(serverID uint) uint { - currentID, _ := s.sessionIDs.LoadOrStore(serverID, uint(0)) - nextID := currentID.(uint) + 1 - s.sessionIDs.Store(serverID, nextID) - return nextID + lastID, err := s.stateHistoryRepo.GetLastSessionID(context.Background(), serverID) + if err != nil { + logging.Error("Failed to get last session ID for server %d: %v", serverID, err) + return 1 // Return 1 as fallback + } + return lastID + 1 } func (s *ServerService) insertStateHistory(serverID uint, state *model.ServerState) { diff --git a/local/utl/tracking/log_tailer.go b/local/utl/tracking/log_tailer.go index 422119c..fad0304 100644 --- a/local/utl/tracking/log_tailer.go +++ b/local/utl/tracking/log_tailer.go @@ -11,6 +11,7 @@ type LogTailer struct { handleLine func(string) stopChan chan struct{} isRunning bool + tracker *PositionTracker } func NewLogTailer(filePath string, handleLine func(string)) *LogTailer { @@ -18,6 +19,7 @@ func NewLogTailer(filePath string, handleLine func(string)) *LogTailer { filePath: filePath, handleLine: handleLine, stopChan: make(chan struct{}), + tracker: NewPositionTracker(filePath), } } @@ -28,7 +30,13 @@ func (t *LogTailer) Start() { t.isRunning = true go func() { - var lastSize int64 = 0 + // Load last position from tracker + pos, err := t.tracker.LoadPosition() + if err != nil { + pos = &LogPosition{} // Start from beginning if error + } + lastSize := pos.LastPosition + for { select { case <-t.stopChan: @@ -56,8 +64,15 @@ func (t *LogTailer) Start() { scanner := bufio.NewScanner(file) for scanner.Scan() { - t.handleLine(scanner.Text()) + line := scanner.Text() + t.handleLine(line) lastSize, _ = file.Seek(0, 1) // Get current position + + // Save position periodically + t.tracker.SavePosition(&LogPosition{ + LastPosition: lastSize, + LastRead: line, + }) } file.Close() diff --git a/local/utl/tracking/position_tracker.go b/local/utl/tracking/position_tracker.go new file mode 100644 index 0000000..3dc1075 --- /dev/null +++ b/local/utl/tracking/position_tracker.go @@ -0,0 +1,54 @@ +package tracking + +import ( + "encoding/json" + "os" + "path/filepath" +) + +type LogPosition struct { + LastPosition int64 `json:"last_position"` + LastRead string `json:"last_read"` +} + +type PositionTracker struct { + positionFile string +} + +func NewPositionTracker(logPath string) *PositionTracker { + // Create position file in same directory as log file + dir := filepath.Dir(logPath) + base := filepath.Base(logPath) + positionFile := filepath.Join(dir, "."+base+".position") + + return &PositionTracker{ + positionFile: positionFile, + } +} + +func (t *PositionTracker) LoadPosition() (*LogPosition, error) { + data, err := os.ReadFile(t.positionFile) + if err != nil { + if os.IsNotExist(err) { + // Return empty position if file doesn't exist + return &LogPosition{}, nil + } + return nil, err + } + + var pos LogPosition + if err := json.Unmarshal(data, &pos); err != nil { + return nil, err + } + + return &pos, nil +} + +func (t *PositionTracker) SavePosition(pos *LogPosition) error { + data, err := json.Marshal(pos) + if err != nil { + return err + } + + return os.WriteFile(t.positionFile, data, 0644) +} \ No newline at end of file