package main import ( "context" "database/sql" "encoding/json" "log/slog" "net/http" "os" "os/signal" "syscall" "time" _ "github.com/mattn/go-sqlite3" ) // DroneEvent defines the DroneEvent struct as JSON type DroneEvent struct { DroneID string `json:"droneId"` Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` Timestamp string `json:"timestamp"` } type Server struct { db *sql.DB } // Ingest handler func (s *Server) ingestHandler(w http.ResponseWriter, r *http.Request) { var event DroneEvent if err := json.NewDecoder(r.Body).Decode(&event); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) slog.Error("Failed to decode request body", "error", err) return } slog.Info("Ingesting event", "drone_id", event.DroneID, "timestamp", event.Timestamp) _, err := s.db.Exec("INSERT INTO drone_events (drone_id, latitude, longitude, timestamp) VALUES (?, ?, ?, ?)", event.DroneID, event.Latitude, event.Longitude, event.Timestamp) if err != nil { slog.Error("Failed to insert event", "error", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.WriteHeader(http.StatusCreated) } // Snapshot handler func (s *Server) snapshotHandler(w http.ResponseWriter, r *http.Request) { timeParam := r.URL.Query().Get("time") if timeParam == "" { http.Error(w, "Missing 'time' query parameter", http.StatusBadRequest) return } query := ` SELECT drone_id, latitude, longitude, timestamp FROM drone_events t1 WHERE timestamp = ( SELECT MAX(timestamp) FROM drone_events t2 WHERE t2.drone_id = t1.drone_id AND t2.timestamp <= ? ) ` rows, err := s.db.Query(query, timeParam) if err != nil { slog.Error("Failed to query snapshot", "error", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } defer rows.Close() results := []DroneEvent{} for rows.Next() { var event DroneEvent if err := rows.Scan(&event.DroneID, &event.Latitude, &event.Longitude, &event.Timestamp); err != nil { slog.Error("Failed to scan row", "error", err) continue } results = append(results, event) } w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(results); err != nil { slog.Error("Failed to encode response", "error", err) } } // Health check handler func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { if err := s.db.Ping(); err != nil { slog.Error("Health check failed", "error", err) http.Error(w, "Database unavailable", http.StatusServiceUnavailable) return } w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) } func main() { // Setup structured logging logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) port := os.Getenv("BLACKBOX_PORT") if port == "" { port = "3000" } // Open database db, err := sql.Open("sqlite3", "./drone_black_box.db") if err != nil { slog.Error("Failed to open database", "error", err) os.Exit(1) } defer db.Close() // Performance optimizations for SQLite // WAL mode allows simultaneous readers and one writer if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil { slog.Warn("Failed to enable WAL mode", "error", err) } // Busy timeout prevents "database is locked" errors during high concurrency if _, err := db.Exec("PRAGMA busy_timeout=5000;"); err != nil { slog.Warn("Failed to set busy timeout", "error", err) } // Create table createTableSQL := `CREATE TABLE IF NOT EXISTS drone_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, drone_id TEXT NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, timestamp TEXT NOT NULL );` if _, err := db.Exec(createTableSQL); err != nil { slog.Error("Failed to create table", "error", err) os.Exit(1) } // Create index for performance // Indexing drone_id and timestamp significantly speeds up the snapshot query createIndexSQL := `CREATE INDEX IF NOT EXISTS idx_drone_timestamp ON drone_events(drone_id, timestamp);` if _, err := db.Exec(createIndexSQL); err != nil { slog.Error("Failed to create index", "error", err) os.Exit(1) } server := &Server{db: db} mux := http.NewServeMux() mux.HandleFunc("POST /ingest", server.ingestHandler) mux.HandleFunc("GET /snapshot", server.snapshotHandler) mux.HandleFunc("GET /health", server.healthHandler) httpServer := &http.Server{ Addr: ":" + port, Handler: mux, } // Graceful shutdown stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) go func() { slog.Info("Black Box Service is running", "port", port) if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { slog.Error("HTTP server error", "error", err) os.Exit(1) } }() <-stop slog.Info("Shutting down server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := httpServer.Shutdown(ctx); err != nil { slog.Error("Server forced to shutdown", "error", err) } slog.Info("Server exited properly") }