ilpcw/drone-black-box/main.go
2025-12-06 11:40:29 +00:00

222 lines
6 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
_ "modernc.org/sqlite"
)
// DroneEvent defines the DroneEvent struct as JSON
type DroneEvent struct {
DroneID string `json:"droneId"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Timestamp string `json:"timestamp"`
}
type Server struct {
db *sql.DB
}
var allowedOrigins = map[string]struct{}{
"http://localhost:4173": {},
"http://127.0.0.1:4173": {},
"http://localhost:5173": {},
"http://127.0.0.1:5173": {},
}
// corsMiddleware adds the headers needed for cross-origin requests from the frontend.
func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if _, ok := allowedOrigins[origin]; ok {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Vary", "Origin")
}
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "86400")
// Handle preflight without hitting the underlying handlers.
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
// Ingest handler
func (s *Server) ingestHandler(w http.ResponseWriter, r *http.Request) {
var event DroneEvent
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
slog.Error("Failed to decode request body", "error", err)
return
}
slog.Info("Ingesting event", "drone_id", event.DroneID, "timestamp", event.Timestamp)
_, err := s.db.Exec("INSERT INTO drone_events (drone_id, latitude, longitude, timestamp) VALUES (?, ?, ?, ?)",
event.DroneID, event.Latitude, event.Longitude, event.Timestamp)
if err != nil {
slog.Error("Failed to insert event", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
// Snapshot handler
func (s *Server) snapshotHandler(w http.ResponseWriter, r *http.Request) {
timeParam := r.URL.Query().Get("time")
if timeParam == "" {
http.Error(w, "Missing 'time' query parameter", http.StatusBadRequest)
return
}
query := `
SELECT drone_id, latitude, longitude, timestamp
FROM drone_events t1
WHERE timestamp = (
SELECT MAX(timestamp)
FROM drone_events t2
WHERE t2.drone_id = t1.drone_id AND t2.timestamp <= ?
)
`
rows, err := s.db.Query(query, timeParam)
if err != nil {
slog.Error("Failed to query snapshot", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer rows.Close()
results := []DroneEvent{}
for rows.Next() {
var event DroneEvent
if err := rows.Scan(&event.DroneID, &event.Latitude, &event.Longitude, &event.Timestamp); err != nil {
slog.Error("Failed to scan row", "error", err)
continue
}
results = append(results, event)
}
slog.Info("Snapshot retrieved", "time", timeParam, "count", len(results))
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(results); err != nil {
slog.Error("Failed to encode response", "error", err)
}
}
// Health check handler
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
if err := s.db.Ping(); err != nil {
slog.Error("Health check failed", "error", err)
http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
func main() {
// Setup structured logging
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(logger)
port := os.Getenv("BLACKBOX_PORT")
if port == "" {
port = "3000"
}
// Open database
db, err := sql.Open("sqlite", "./drone_black_box.db")
if err != nil {
slog.Error("Failed to open database", "error", err)
os.Exit(1)
}
defer db.Close()
// Performance optimizations for SQLite
// WAL mode allows simultaneous readers and one writer
if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil {
slog.Warn("Failed to enable WAL mode", "error", err)
}
// Busy timeout prevents "database is locked" errors during high concurrency
if _, err := db.Exec("PRAGMA busy_timeout=5000;"); err != nil {
slog.Warn("Failed to set busy timeout", "error", err)
}
db.SetMaxOpenConns(1)
// Create table
createTableSQL := `CREATE TABLE IF NOT EXISTS drone_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
drone_id TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
timestamp TEXT NOT NULL
);`
if _, err := db.Exec(createTableSQL); err != nil {
slog.Error("Failed to create table", "error", err)
os.Exit(1)
}
// Create index for performance
// Indexing drone_id and timestamp significantly speeds up the snapshot query
createIndexSQL := `CREATE INDEX IF NOT EXISTS idx_drone_timestamp ON drone_events(drone_id, timestamp);`
if _, err := db.Exec(createIndexSQL); err != nil {
slog.Error("Failed to create index", "error", err)
os.Exit(1)
}
server := &Server{db: db}
mux := http.NewServeMux()
mux.HandleFunc("POST /ingest", server.ingestHandler)
mux.HandleFunc("GET /snapshot", server.snapshotHandler)
mux.HandleFunc("GET /health", server.healthHandler)
httpServer := &http.Server{
Addr: ":" + port,
Handler: corsMiddleware(mux),
}
// Graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
go func() {
slog.Info("Black Box Service is running", "port", port)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("HTTP server error", "error", err)
os.Exit(1)
}
}()
<-stop
slog.Info("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
slog.Error("Server forced to shutdown", "error", err)
}
slog.Info("Server exited properly")
}