From fde16fa283598df743f0ef53eb3ca596d69e0390 Mon Sep 17 00:00:00 2001 From: js0ny Date: Sun, 30 Nov 2025 01:30:29 +0000 Subject: [PATCH] feat: use sqlite --- .gitignore | 3 +- drone-black-box/go.mod | 2 + drone-black-box/go.sum | 2 + drone-black-box/main.go | 159 ++++++++++++++++++++++++++++++++-------- 4 files changed, 133 insertions(+), 33 deletions(-) create mode 100644 drone-black-box/go.sum diff --git a/.gitignore b/.gitignore index 3dc4ef4..614e06c 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ localjson ilp-rest-service/ilp-cw-api/results.json target -drone-black-box/drone-black-box \ No newline at end of file +drone-black-box/drone-black-box +drone-black-box/drone_black_box.db* diff --git a/drone-black-box/go.mod b/drone-black-box/go.mod index 95eb60e..92a5046 100644 --- a/drone-black-box/go.mod +++ b/drone-black-box/go.mod @@ -1,3 +1,5 @@ module drone-black-box go 1.25.3 + +require github.com/mattn/go-sqlite3 v1.14.32 diff --git a/drone-black-box/go.sum b/drone-black-box/go.sum new file mode 100644 index 0000000..66f7516 --- /dev/null +++ b/drone-black-box/go.sum @@ -0,0 +1,2 @@ +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/drone-black-box/main.go b/drone-black-box/main.go index be41a48..40dd7c5 100644 --- a/drone-black-box/main.go +++ b/drone-black-box/main.go @@ -1,12 +1,17 @@ package main import ( + "context" + "database/sql" "encoding/json" - "fmt" - "log" + "log/slog" "net/http" "os" - "sync" + "os/signal" + "syscall" + "time" + + _ "github.com/mattn/go-sqlite3" ) // Define the DroneEvent struct as JSON @@ -17,12 +22,8 @@ type DroneEvent struct { Timestamp string `json:"timestamp"` } -// Shared state type alias type Server struct { - // This is a shared state between handlers - // It is protected by a mutex to prevent concurrent access - mu sync.RWMutex - history []DroneEvent + db *sql.DB } // Ingest handler @@ -30,14 +31,19 @@ func (s *Server) ingestHandler(w http.ResponseWriter, r *http.Request) { var event DroneEvent if err := json.NewDecoder(r.Body).Decode(&event); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) + slog.Error("Failed to decode request body", "error", err) return } - fmt.Printf("Data ingested: %+v\n", event) + slog.Info("Ingesting event", "drone_id", event.DroneID, "timestamp", event.Timestamp) - s.mu.Lock() - s.history = append(s.history, event) - s.mu.Unlock() + _, err := s.db.Exec("INSERT INTO drone_events (drone_id, latitude, longitude, timestamp) VALUES (?, ?, ?, ?)", + event.DroneID, event.Latitude, event.Longitude, event.Timestamp) + if err != nil { + slog.Error("Failed to insert event", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } w.WriteHeader(http.StatusCreated) } @@ -50,46 +56,135 @@ func (s *Server) snapshotHandler(w http.ResponseWriter, r *http.Request) { return } - s.mu.RLock() - defer s.mu.RUnlock() + // Optimized query using window function (requires SQLite 3.25+) or standard group by max + // Using correlated subquery which is standard and works well with the index + query := ` + SELECT drone_id, latitude, longitude, timestamp + FROM drone_events t1 + WHERE timestamp = ( + SELECT MAX(timestamp) + FROM drone_events t2 + WHERE t2.drone_id = t1.drone_id AND t2.timestamp <= ? + ) + ` - latestStates := make(map[string]DroneEvent) - - for _, event := range s.history { - if event.Timestamp <= timeParam { - latestStates[event.DroneID] = event - } + rows, err := s.db.Query(query, timeParam) + if err != nil { + slog.Error("Failed to query snapshot", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return } + defer rows.Close() - result := make([]DroneEvent, 0, len(latestStates)) - for _, event := range latestStates { - result = append(result, event) + results := []DroneEvent{} + for rows.Next() { + var event DroneEvent + if err := rows.Scan(&event.DroneID, &event.Latitude, &event.Longitude, &event.Timestamp); err != nil { + slog.Error("Failed to scan row", "error", err) + continue + } + results = append(results, event) } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(result); err != nil { - log.Printf("Error encoding response: %v", err) + if err := json.NewEncoder(w).Encode(results); err != nil { + slog.Error("Failed to encode response", "error", err) } } +// Health check handler +func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { + if err := s.db.Ping(); err != nil { + slog.Error("Health check failed", "error", err) + http.Error(w, "Database unavailable", http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) +} + func main() { - // By default, listen on port 3000 + // Setup structured logging + logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + slog.SetDefault(logger) + port := os.Getenv("BLACKBOX_PORT") if port == "" { port = "3000" } - server := &Server{ - history: make([]DroneEvent, 0), + // Open database + db, err := sql.Open("sqlite3", "./drone_black_box.db") + if err != nil { + slog.Error("Failed to open database", "error", err) + os.Exit(1) } + defer db.Close() + + // Performance optimizations for SQLite + // WAL mode allows simultaneous readers and one writer + if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil { + slog.Warn("Failed to enable WAL mode", "error", err) + } + // Busy timeout prevents "database is locked" errors during high concurrency + if _, err := db.Exec("PRAGMA busy_timeout=5000;"); err != nil { + slog.Warn("Failed to set busy timeout", "error", err) + } + + // Create table + createTableSQL := `CREATE TABLE IF NOT EXISTS drone_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + drone_id TEXT NOT NULL, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + timestamp TEXT NOT NULL + );` + if _, err := db.Exec(createTableSQL); err != nil { + slog.Error("Failed to create table", "error", err) + os.Exit(1) + } + + // Create index for performance + // Indexing drone_id and timestamp significantly speeds up the snapshot query + createIndexSQL := `CREATE INDEX IF NOT EXISTS idx_drone_timestamp ON drone_events(drone_id, timestamp);` + if _, err := db.Exec(createIndexSQL); err != nil { + slog.Error("Failed to create index", "error", err) + os.Exit(1) + } + + server := &Server{db: db} mux := http.NewServeMux() mux.HandleFunc("POST /ingest", server.ingestHandler) mux.HandleFunc("GET /snapshot", server.snapshotHandler) + mux.HandleFunc("GET /health", server.healthHandler) - fmt.Printf("Black Box Service is running on port %s...\n", port) - if err := http.ListenAndServe(":"+port, mux); err != nil { - log.Fatal(err) - os.Exit(1) + httpServer := &http.Server{ + Addr: ":" + port, + Handler: mux, } + + // Graceful shutdown + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + go func() { + slog.Info("Black Box Service is running", "port", port) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("HTTP server error", "error", err) + os.Exit(1) + } + }() + + <-stop + slog.Info("Shutting down server...") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := httpServer.Shutdown(ctx); err != nil { + slog.Error("Server forced to shutdown", "error", err) + } + + slog.Info("Server exited properly") }