Hadlock

worst rust database ever, with native .csv support

Sep 12th, 2025
43
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 81.19 KB | None | 0 0
  1. # Canto — Minimal CSV/Binary Table Server
  2.  
  3. **Canto** is a lightweight, embedded database engine that can run:
  4. - As a **standalone RESTful server** on port `5432` (default)
  5. - In **client mode** to send SQL queries
  6. - As a **Rust library** inside another program
  7.  
  8. It supports **two storage backends**:
  9. 1. **Binary file** (`data/canto.db`) — default, fast, append-only
  10. 2. **CSV + schema TOML**for human-readable table storage
  11.  
  12. ## 1. Quick Start
  13.  
  14. ### a. Run the server (default)
  15. ```bash
  16. cargo run --release
  17. ````
  18.  
  19. This starts Canto in **server mode** on `0.0.0.0:5432`.
  20.  
  21. ### b. Run the client
  22.  
  23. ```bash
  24. cargo run --release -- -c -f "queries.sql" -u myuser -p mypass
  25. ```
  26.  
  27. * `-c` = client mode
  28. * `-f` = SQL file to execute
  29. * `-u` / `-p` = username / password (optional; no auth if omitted)
  30.  
  31. ### c. Example with `curl`
  32.  
  33. ```bash
  34. curl -X POST http://localhost:5432/query \
  35.      -H "Content-Type: application/sql" \
  36.      --data "SELECT * FROM movies ORDER BY year DESC LIMIT 3;"
  37. ```
  38.  
  39. ## 2. Valid SQL Statements
  40.  
  41. Canto implements a **subset of SQL** with a custom parser (no external libs).
  42.  
  43. Supported:
  44.  
  45. * `CREATE TABLE table_name (col1 TYPE, col2 TYPE, ...)`
  46. * `DROP TABLE table_name`
  47. * `INSERT INTO table_name VALUES (v1, v2, ...)`
  48. * `SELECT col1, col2 FROM table_name [WHERE col = value] [ORDER BY col [ASC|DESC]] [LIMIT n]`
  49. * `UPDATE table_name SET col = value [WHERE ...]`
  50. * `DELETE FROM table_name [WHERE ...]`
  51. * `CREATE USER username PASSWORD 'pass'`
  52. * `DROP USER username`
  53.  
  54. Column types:
  55.  
  56. * `STRING`
  57. * `INT`
  58. * `FLOAT`
  59. * `BOOL`
  60. * `DATE`
  61.  
  62. ## 3. Storage Backends
  63.  
  64. ### Binary Mode (default)
  65.  
  66. * Single file: `data/canto.db`
  67. * Append-only blocks for crash safety
  68. * Periodic checkpoints store schema and indexes
  69. * Fast startup: load latest checkpoint, replay recent blocks
  70. * No need to keep CSV files
  71.  
  72. ### CSV Mode
  73.  
  74. * Each table: `data/{table}.csv`
  75. * Schema file: `data/{table}.schema.toml`
  76. * Slower than binary, but human-editable
  77.  
  78. ## 4. Example SQL Queries
  79.  
  80. Top 3 movies with director & studio sorted by year:
  81.  
  82. ```sql
  83. SELECT title, director, studio, year
  84. FROM movies
  85. ORDER BY year DESC
  86. LIMIT 3;
  87. ```
  88.  
  89. Join example (albums + bands + members):
  90.  
  91. ```sql
  92. SELECT albums.name, bands.name, members.name, albums.year
  93. FROM albums
  94. JOIN bands ON albums.band_id = bands.id
  95. JOIN members ON bands.id = members.band_id
  96. ORDER BY albums.year DESC;
  97. ```
  98.  
  99. ## 5. Using as a Library
  100.  
  101. Canto can be embedded in your own Rust applications.
  102.  
  103. ### Cargo.toml
  104.  
  105. ```toml
  106. [dependencies]
  107. canto = { path = "../canto" }
  108. ```
  109.  
  110. ### Example
  111.  
  112. ```rust
  113. use canto::engine::Engine;
  114. use canto::storage::StorageMode;
  115.  
  116. fn main() {
  117.     // Start engine in binary mode
  118.     let mut engine = Engine::new(StorageMode::Binary, "data/canto.db");
  119.     engine.execute("CREATE TABLE test (id INT, name STRING);");
  120.     engine.execute("INSERT INTO test VALUES (1, 'Alice');");
  121.     let rows = engine.execute("SELECT * FROM test;");
  122.     println!("{:?}", rows);
  123. }
  124. ```
  125.  
  126. You can initialize in `StorageMode::Csv` to use CSV+schema storage instead.
  127.  
  128. cargo.toml
  129.  
  130. [package]
  131. name = "canto"
  132. version = "0.2.0"
  133. edition = "2021"
  134.  
  135. [dependencies]
  136. reqwest = { version = "0.12", features = ["json", "blocking"] }
  137. regex = "1.10"
  138. anyhow = "1"
  139. thiserror = "1"
  140. csv = "1.3"
  141. serde = { version = "1", features = ["derive"] }
  142. serde_json = "1"
  143. toml = "0.8"
  144. bincode = "1"
  145. fs2 = "0.4"
  146. crc32fast = "1"
  147. lazy_static = "1.4"
  148. clap = { version = "4", features = ["derive"] }
  149. tokio = { version = "1", features = ["full"] }
  150. axum = "0.7"
  151. hyper = { version = "1", features = ["full"] }
  152. http = "1"
  153. base64 = "0.22"
  154.  
  155. main.rs
  156. //! Canto: tiny CSV-as-table DB with a REST server, minimal SQL, and a CLI client.
  157. //! - Default: runs REST server on 0.0.0.0:5432
  158. //! - Client mode: `canto -c -f movie_database.sql [-u user -p pass]`
  159. //! - Very small SQL: CREATE TABLE, INSERT INTO ... VALUES (...), SELECT ... JOIN ... ORDER BY ... LIMIT n
  160. //! - Users: POST /v1/users {username,password}, DELETE /v1/users/{username}
  161. //!   - Start with `--require-auth` to enforce Basic Auth (default: open)
  162.  
  163. use anyhow::Result;
  164. use clap::{ArgAction, Parser};
  165. use std::{fs, path::PathBuf};
  166.  
  167. #[derive(Parser, Debug)]
  168. #[command(name="canto", version, about="Tiny CSV-table DB with REST + minimal SQL")]
  169. struct Args {
  170.     /// Data directory (tables, indexes, schema files)
  171.     #[arg(long, default_value = "./data")]
  172.     data_dir: PathBuf,
  173.  
  174.     /// Server bind address
  175.     #[arg(long, default_value = "0.0.0.0:5432")]
  176.     bind: String,
  177.  
  178.     /// Require HTTP Basic Auth on all endpoints
  179.     #[arg(long, action=ArgAction::SetTrue, default_value_t=false)]
  180.     require_auth: bool,
  181.  
  182.     /// Client mode (send SQL file to server via REST)
  183.     #[arg(short='c', long, action=ArgAction::SetTrue, default_value_t=false)]
  184.     client: bool,
  185.  
  186.     /// SQL file to execute (client mode)
  187.     #[arg(short='f', long)]
  188.     file: Option<PathBuf>,
  189.  
  190.     /// Server URL for client mode
  191.     #[arg(long, default_value = "http://127.0.0.1:5432")]
  192.     url: String,
  193.  
  194.     /// Username for Basic Auth (optional; required if server enforces it)
  195.     #[arg(short='u', long)]
  196.     username: Option<String>,
  197.  
  198.     /// Password for Basic Auth (optional; required if server enforces it)
  199.     #[arg(short='p', long)]
  200.     password: Option<String>,
  201.  
  202.     /// Use legacy CSV storage format instead of binary canto.db
  203.     #[arg(short='l', long, action=ArgAction::SetTrue, default_value_t=false)]
  204.     legacy: bool,
  205. }
  206.  
  207. #[tokio::main]
  208. async fn main() -> Result<()> {
  209.     let args = Args::parse();
  210.     fs::create_dir_all(&args.data_dir).ok();
  211.  
  212.     if args.client {
  213.         return client::run_client(args).await;
  214.     } else {
  215.         return server::run_server(args).await;
  216.     }
  217. }
  218.  
  219. // ================================
  220. // Catalog / Schema
  221. // ================================
  222. mod schema {
  223.     use serde::{Deserialize, Serialize};
  224.     #[derive(Debug, Clone, Serialize, Deserialize)]
  225.     pub enum DataType { String, Int, Float, Bool, Date }
  226.  
  227.     #[derive(Debug, Clone, Serialize, Deserialize)]
  228.     pub struct Column {
  229.         pub name: String,
  230.         pub dtype: DataType,
  231.         pub nullable: bool,
  232.     }
  233.  
  234.     #[derive(Debug, Clone, Serialize, Deserialize)]
  235.     pub struct Schema {
  236.         pub name: String,
  237.         pub columns: Vec<Column>,
  238.         pub primary_key: Option<String>, // by column name
  239.     }
  240.  
  241.     impl Schema {
  242.         pub fn col_index(&self, name: &str) -> Option<usize> {
  243.             self.columns.iter().position(|c| c.name == name)
  244.         }
  245.         pub fn headers(&self) -> Vec<String> {
  246.             self.columns.iter().map(|c| c.name.clone()).collect()
  247.         }
  248.         pub fn table_csv_path(&self, root: &std::path::Path) -> std::path::PathBuf {
  249.             root.join(format!("{}.csv", self.name))
  250.         }
  251.         pub fn schema_toml_path(&self, root: &std::path::Path) -> std::path::PathBuf {
  252.             root.join(format!("{}.schema.toml", self.name))
  253.         }
  254.         pub fn idx_path(&self, root: &std::path::Path) -> std::path::PathBuf {
  255.             root.join(format!("{}.idx", self.name))
  256.         }
  257.         pub fn lock_path(&self, root: &std::path::Path) -> std::path::PathBuf {
  258.             root.join(format!("{}.lock", self.name))
  259.         }
  260.     }
  261. }
  262.  
  263. // ================================
  264. // Storage (bytes on disk)
  265. // ================================
  266. mod storage {
  267.     use anyhow::Result;
  268.     use csv::{ReaderBuilder, WriterBuilder};
  269.     use fs2::FileExt;
  270.     use std::{
  271.         fs::{self, File, OpenOptions},
  272.         io::{BufReader, BufWriter, Read, Write, Seek, SeekFrom},
  273.         path::{Path, PathBuf},
  274.         collections::BTreeMap,
  275.     };
  276.  
  277.     pub struct CsvStorage {
  278.     pub csv_path: PathBuf,
  279.     pub schema_path: PathBuf,
  280.     pub idx_path: PathBuf,
  281.     pub lock_path: PathBuf,
  282.     }
  283.  
  284.     impl CsvStorage {
  285.         pub fn open(
  286.             csv_path: PathBuf,
  287.             schema_path: PathBuf,
  288.             idx_path: PathBuf,
  289.             lock_path: PathBuf,
  290.             schema: &super::schema::Schema,
  291.         ) -> Result<Self> {
  292.             let s = Self { csv_path, schema_path, idx_path, lock_path };
  293.             s.init_if_missing(schema)?;
  294.             Ok(s)
  295.         }
  296.  
  297.         fn init_if_missing(&self, schema: &super::schema::Schema) -> Result<()> {
  298.             if !self.csv_path.exists() {
  299.                 let file = File::create(&self.csv_path)?;
  300.                 let mut w = WriterBuilder::new().has_headers(true).from_writer(BufWriter::new(file));
  301.                 let headers = schema.headers();
  302.                 let hdr: Vec<&str> = headers.iter().map(|s| s.as_str()).collect();
  303.                 w.write_record(hdr)?;
  304.                 w.flush()?;
  305.             }
  306.             Ok(())
  307.         }
  308.  
  309.         fn lock_exclusive(&self) -> Result<File> {
  310.             let f = OpenOptions::new().create(true).read(true).write(true).open(&self.lock_path)?;
  311.             f.lock_exclusive()?; // unlock on drop
  312.             Ok(f)
  313.         }
  314.  
  315.         pub fn write_schema_toml(&self, toml_str: &str) -> Result<()> {
  316.             let tmp = self.schema_path.with_extension("toml.tmp");
  317.             fs::write(&tmp, toml_str)?;
  318.             fs::rename(tmp, &self.schema_path)?;
  319.             Ok(())
  320.         }
  321.  
  322.         pub fn append_record(&self, record: &[String]) -> Result<()> {
  323.             let _guard = self.lock_exclusive()?;
  324.             let file = OpenOptions::new().create(true).append(true).open(&self.csv_path)?;
  325.             let mut wtr = WriterBuilder::new().has_headers(false).from_writer(file);
  326.             wtr.write_record(record)?;
  327.             wtr.flush()?;
  328.             Ok(())
  329.         }
  330.  
  331.         pub fn scan<F: FnMut(&[String]) -> Result<()>>(&self, mut f: F) -> Result<()> {
  332.             let file = File::open(&self.csv_path)?;
  333.             let rdr = BufReader::new(file);
  334.             let mut rdr = ReaderBuilder::new().has_headers(true).from_reader(rdr);
  335.             for rec in rdr.records() {
  336.                 let rec = rec?;
  337.                 let row: Vec<String> = rec.iter().map(|s| s.to_string()).collect();
  338.                 f(&row)?;
  339.             }
  340.             Ok(())
  341.         }
  342.  
  343.         pub fn rewrite(&self, rows: &[Vec<String>]) -> Result<()> {
  344.             let _guard = self.lock_exclusive()?;
  345.             let tmp = self.csv_path.with_extension("csv.tmp");
  346.             {
  347.                 let file = File::create(&tmp)?;
  348.                 let mut w = WriterBuilder::new().has_headers(true).from_writer(BufWriter::new(file));
  349.                 // Use schema headers for rewrite; assume schema is available via self.schema_path
  350.                 let schema_str = std::fs::read_to_string(&self.schema_path)?;
  351.                 let schema: super::schema::Schema = toml::from_str(&schema_str)?;
  352.                 let headers = schema.headers();
  353.                 let hdr: Vec<&str> = headers.iter().map(|s| s.as_str()).collect();
  354.                 w.write_record(hdr)?;
  355.                 for r in rows {
  356.                     w.write_record(r)?;
  357.                 }
  358.                 w.flush()?;
  359.             }
  360.             fs::rename(tmp, &self.csv_path)?;
  361.             Ok(())
  362.         }
  363.  
  364.         pub fn write_index_bytes(&self, bytes: &[u8]) -> Result<()> {
  365.             let _guard = self.lock_exclusive()?;
  366.             let tmp = self.idx_path.with_extension("idx.tmp");
  367.             fs::write(&tmp, bytes)?;
  368.             fs::rename(tmp, &self.idx_path)?;
  369.             Ok(())
  370.         }
  371.  
  372.         pub fn read_index_bytes(&self) -> Result<Option<Vec<u8>>> {
  373.             if self.idx_path.exists() {
  374.                 let b = fs::read(&self.idx_path)?;
  375.                 Ok(Some(b))
  376.             } else {
  377.                 Ok(None)
  378.             }
  379.         }
  380.     }
  381.  
  382.     pub fn schema_to_toml<T: serde::Serialize>(value: &T) -> Result<String> {
  383.         Ok(toml::to_string_pretty(value)?)
  384.     }
  385.    
  386.     // ================================
  387.     // Storage Backend Enum
  388.     // ================================
  389.    
  390.     pub enum StorageBackend {
  391.         Csv(CsvStorage),
  392.         Binary(BinaryStorage, String), // Binary storage + table name
  393.     }
  394.    
  395.     impl StorageBackend {
  396.         pub fn append_record(&self, record: &[String]) -> Result<()> {
  397.             match self {
  398.                 StorageBackend::Csv(csv) => csv.append_record(record),
  399.                 StorageBackend::Binary(binary, table_name) => {
  400.                     // For binary storage, we need to write individual records as single-row batches
  401.                     // Use a simple sequence number (this could be improved with proper tracking)
  402.                     let seq_no = std::time::SystemTime::now()
  403.                         .duration_since(std::time::UNIX_EPOCH)
  404.                         .unwrap_or_default()
  405.                         .as_millis() as u64;
  406.                     let rows = vec![record.to_vec()];
  407.                     binary.append_table_data(table_name, seq_no, &rows)?;
  408.                     Ok(())
  409.                 }
  410.             }
  411.         }
  412.        
  413.         pub fn scan<F: FnMut(&[String]) -> Result<()>>(&self, mut f: F) -> Result<()> {
  414.             match self {
  415.                 StorageBackend::Csv(csv) => csv.scan(f),
  416.                 StorageBackend::Binary(binary, table_name) => {
  417.                     // Scan only the specified table's data
  418.                     binary.scan_table_data(|scan_table_name, _seq_no, rows| {
  419.                         if scan_table_name == table_name {
  420.                             for row in rows {
  421.                                 f(row)?;
  422.                             }
  423.                         }
  424.                         Ok(())
  425.                     })
  426.                 }
  427.             }
  428.         }
  429.        
  430.         pub fn rewrite(&self, rows: &[Vec<String>]) -> Result<()> {
  431.             match self {
  432.                 StorageBackend::Csv(csv) => csv.rewrite(rows),
  433.                 StorageBackend::Binary(_, _) => {
  434.                     // Binary storage rewrite will be different
  435.                     Ok(())
  436.                 }
  437.             }
  438.         }
  439.        
  440.         pub fn write_index_bytes(&self, bytes: &[u8]) -> Result<()> {
  441.             match self {
  442.                 StorageBackend::Csv(csv) => csv.write_index_bytes(bytes),
  443.                 StorageBackend::Binary(_, _) => {
  444.                     // Binary storage handles indexes differently
  445.                     Ok(())
  446.                 }
  447.             }
  448.         }
  449.        
  450.         pub fn read_index_bytes(&self) -> Result<Option<Vec<u8>>> {
  451.             match self {
  452.                 StorageBackend::Csv(csv) => csv.read_index_bytes(),
  453.                 StorageBackend::Binary(_, _) => {
  454.                     // Binary storage handles indexes differently
  455.                     Ok(None)
  456.                 }
  457.             }
  458.         }
  459.     }
  460.  
  461.     // ================================
  462.     // Binary Storage Implementation
  463.     // ================================
  464.    
  465.     // Constants for binary file format
  466.     const MAGIC: &[u8; 8] = b"CANTO\0\0\0";
  467.     const VERSION: u16 = 1;
  468.     const HEADER_SIZE: usize = 64;
  469.    
  470.     // Block tags
  471.     const TAG_CATALOG: u8 = 0x10;
  472.     const TAG_CHECKPOINT: u8 = 0x20;
  473.     const TAG_DATA: u8 = 0x30;
  474.     const TAG_INDEX: u8 = 0x40;
  475.    
  476.     // Header offsets
  477.     const HEADER_LAST_CP_OFFSET: u64 = 24; // offset in header where last_checkpoint_offset is stored
  478.    
  479.     #[derive(Debug)]
  480.     pub struct FileHeader {
  481.         pub version: u16,
  482.         pub flags: u16,
  483.         pub creation_unix_ms: u64,
  484.         pub last_checkpoint_offset: u64,
  485.         pub file_size_at_checkpoint: u64,
  486.     }
  487.    
  488.     impl FileHeader {
  489.         fn new() -> Self {
  490.             Self {
  491.                 version: VERSION,
  492.                 flags: 0,
  493.                 creation_unix_ms: std::time::SystemTime::now()
  494.                     .duration_since(std::time::UNIX_EPOCH)
  495.                     .unwrap_or_default()
  496.                     .as_millis() as u64,
  497.                 last_checkpoint_offset: 0,
  498.                 file_size_at_checkpoint: 0,
  499.             }
  500.         }
  501.        
  502.         fn write_to<W: Write>(&self, mut writer: W) -> Result<()> {
  503.             writer.write_all(MAGIC)?;
  504.             writer.write_all(&self.version.to_le_bytes())?;
  505.             writer.write_all(&self.flags.to_le_bytes())?;
  506.             writer.write_all(&self.creation_unix_ms.to_le_bytes())?;
  507.             writer.write_all(&self.last_checkpoint_offset.to_le_bytes())?;
  508.             writer.write_all(&self.file_size_at_checkpoint.to_le_bytes())?;
  509.            
  510.             // Pad to 64 bytes
  511.             let remaining = HEADER_SIZE - 8 - 2 - 2 - 8 - 8 - 8;
  512.             writer.write_all(&vec![0u8; remaining])?;
  513.             Ok(())
  514.         }
  515.        
  516.         fn read_from<R: Read>(mut reader: R) -> Result<Self> {
  517.             let mut magic = [0u8; 8];
  518.             reader.read_exact(&mut magic)?;
  519.             if &magic != MAGIC {
  520.                 return Err(anyhow::anyhow!("Invalid magic bytes"));
  521.             }
  522.            
  523.             let mut buf = [0u8; 8];
  524.             reader.read_exact(&mut buf[0..2])?;
  525.             let version = u16::from_le_bytes([buf[0], buf[1]]);
  526.            
  527.             reader.read_exact(&mut buf[0..2])?;
  528.             let flags = u16::from_le_bytes([buf[0], buf[1]]);
  529.            
  530.             reader.read_exact(&mut buf)?;
  531.             let creation_unix_ms = u64::from_le_bytes(buf);
  532.            
  533.             reader.read_exact(&mut buf)?;
  534.             let last_checkpoint_offset = u64::from_le_bytes(buf);
  535.            
  536.             reader.read_exact(&mut buf)?;
  537.             let file_size_at_checkpoint = u64::from_le_bytes(buf);
  538.            
  539.             // Skip remaining padding
  540.             let remaining = HEADER_SIZE - 8 - 2 - 2 - 8 - 8 - 8;
  541.             let mut padding = vec![0u8; remaining];
  542.             reader.read_exact(&mut padding)?;
  543.            
  544.             Ok(Self {
  545.                 version,
  546.                 flags,
  547.                 creation_unix_ms,
  548.                 last_checkpoint_offset,
  549.                 file_size_at_checkpoint,
  550.             })
  551.         }
  552.     }
  553.    
  554.     #[derive(Debug, Clone)]
  555.     pub struct TableCheckpoint {
  556.         pub table_name: String,
  557.         pub last_seq_no: u64,
  558.         pub last_data_offset: u64,
  559.         pub pk_index_offset: u64,
  560.     }
  561.    
  562.     pub struct BinaryStorage {
  563.         pub db_path: PathBuf,
  564.         pub lock_path: PathBuf,
  565.     }
  566.    
  567.     impl BinaryStorage {
  568.         pub fn open(db_path: PathBuf, lock_path: PathBuf) -> Result<Self> {
  569.             let storage = Self { db_path, lock_path };
  570.             storage.init_if_missing()?;
  571.             Ok(storage)
  572.         }
  573.        
  574.         fn init_if_missing(&self) -> Result<()> {
  575.             if !self.db_path.exists() {
  576.                 let mut file = File::create(&self.db_path)?;
  577.                 let header = FileHeader::new();
  578.                 header.write_to(&mut file)?;
  579.                 file.flush()?;
  580.             }
  581.             Ok(())
  582.         }
  583.        
  584.         fn lock_exclusive(&self) -> Result<File> {
  585.             let f = OpenOptions::new().create(true).read(true).write(true).open(&self.lock_path)?;
  586.             f.lock_exclusive()?; // unlock on drop
  587.             Ok(f)
  588.         }
  589.        
  590.         fn crc32(bytes: &[u8]) -> u32 {
  591.             crc32fast::hash(bytes)
  592.         }
  593.        
  594.         fn append_block(&self, tag: u8, payload: &[u8]) -> Result<u64> {
  595.             let _guard = self.lock_exclusive()?;
  596.             let mut file = OpenOptions::new().read(true).write(true).open(&self.db_path)?;
  597.             let offset = file.seek(SeekFrom::End(0))?;
  598.             let length = payload.len() as u32;
  599.             let crc = Self::crc32(payload);
  600.            
  601.             file.write_all(&[tag])?;
  602.             file.write_all(&length.to_le_bytes())?;
  603.             file.write_all(&crc.to_le_bytes())?;
  604.             file.write_all(payload)?;
  605.             file.flush()?;
  606.            
  607.             Ok(offset)
  608.         }
  609.        
  610.         pub fn write_checkpoint(&self, checkpoints: &[TableCheckpoint]) -> Result<u64> {
  611.             let mut payload = Vec::new();
  612.            
  613.             // Serialize checkpoints
  614.             for cp in checkpoints {
  615.                 let name_bytes = cp.table_name.as_bytes();
  616.                 payload.extend((name_bytes.len() as u16).to_le_bytes());
  617.                 payload.extend(name_bytes);
  618.                 payload.extend(cp.last_seq_no.to_le_bytes());
  619.                 payload.extend(cp.last_data_offset.to_le_bytes());
  620.                 payload.extend(cp.pk_index_offset.to_le_bytes());
  621.             }
  622.            
  623.             let offset = self.append_block(TAG_CHECKPOINT, &payload)?;
  624.            
  625.             // Update header with checkpoint offset
  626.             let mut file = OpenOptions::new().read(true).write(true).open(&self.db_path)?;
  627.             file.seek(SeekFrom::Start(HEADER_LAST_CP_OFFSET))?;
  628.             file.write_all(&offset.to_le_bytes())?;
  629.             file.flush()?;
  630.            
  631.             Ok(offset)
  632.         }
  633.        
  634.         pub fn write_catalog(&self, catalog_toml: &str) -> Result<u64> {
  635.             self.append_block(TAG_CATALOG, catalog_toml.as_bytes())
  636.         }
  637.        
  638.         pub fn append_table_data(&self, table_name: &str, seq_no: u64, rows: &[Vec<String>]) -> Result<u64> {
  639.             let mut payload = Vec::new();
  640.            
  641.             // Table name
  642.             let name_bytes = table_name.as_bytes();
  643.             payload.extend((name_bytes.len() as u16).to_le_bytes());
  644.             payload.extend(name_bytes);
  645.            
  646.             // Sequence number and row count
  647.             payload.extend(seq_no.to_le_bytes());
  648.             payload.extend((rows.len() as u32).to_le_bytes());
  649.            
  650.             // Column count (from first row if any)
  651.             let col_count = rows.first().map(|r| r.len()).unwrap_or(0) as u16;
  652.             payload.extend(col_count.to_le_bytes());
  653.            
  654.             // Rows data
  655.             for row in rows {
  656.                 for col in row {
  657.                     let col_bytes = col.as_bytes();
  658.                     payload.extend((col_bytes.len() as u32).to_le_bytes());
  659.                     payload.extend(col_bytes);
  660.                 }
  661.             }
  662.            
  663.             self.append_block(TAG_DATA, &payload)
  664.         }
  665.        
  666.         pub fn write_pk_index(&self, table_name: &str, index: &BTreeMap<String, u64>) -> Result<u64> {
  667.             let mut payload = Vec::new();
  668.            
  669.             // Table name
  670.             let name_bytes = table_name.as_bytes();
  671.             payload.extend((name_bytes.len() as u16).to_le_bytes());
  672.             payload.extend(name_bytes);
  673.            
  674.             // PK column index (placeholder - we'll improve this later)
  675.             payload.extend(0u16.to_le_bytes());
  676.            
  677.             // Entry count
  678.             payload.extend((index.len() as u32).to_le_bytes());
  679.            
  680.             // Entries
  681.             for (key, row_pointer) in index {
  682.                 let key_bytes = key.as_bytes();
  683.                 payload.extend((key_bytes.len() as u16).to_le_bytes());
  684.                 payload.extend(key_bytes);
  685.                 payload.extend(row_pointer.to_le_bytes());
  686.             }
  687.            
  688.             self.append_block(TAG_INDEX, &payload)
  689.         }
  690.        
  691.         fn read_header(&self) -> Result<FileHeader> {
  692.             let mut file = File::open(&self.db_path)?;
  693.             FileHeader::read_from(&mut file)
  694.         }
  695.        
  696.         pub fn read_catalog(&self) -> Result<Option<String>> {
  697.             let _header = self.read_header()?;
  698.            
  699.             // Always start from after the header to look for catalog blocks
  700.             let start_offset = HEADER_SIZE as u64;
  701.            
  702.             let mut file = File::open(&self.db_path)?;
  703.             file.seek(SeekFrom::Start(start_offset))?;
  704.            
  705.             // Read blocks looking for catalog - keep the LAST one found
  706.             let mut latest_catalog: Option<String> = None;
  707.             loop {
  708.                 let mut tag_buf = [0u8; 1];
  709.                 if file.read_exact(&mut tag_buf).is_err() {
  710.                     break;
  711.                 }
  712.                 let tag = tag_buf[0];
  713.                
  714.                 let mut len_buf = [0u8; 4];
  715.                 file.read_exact(&mut len_buf)?;
  716.                 let length = u32::from_le_bytes(len_buf);
  717.                
  718.                 let mut crc_buf = [0u8; 4];
  719.                 file.read_exact(&mut crc_buf)?;
  720.                 let expected_crc = u32::from_le_bytes(crc_buf);
  721.                
  722.                 if tag == TAG_CATALOG {
  723.                     let mut payload = vec![0u8; length as usize];
  724.                     file.read_exact(&mut payload)?;
  725.                    
  726.                     // Verify CRC
  727.                     let computed_crc = Self::crc32(&payload);
  728.                     if computed_crc != expected_crc {
  729.                         return Err(anyhow::anyhow!("Catalog block CRC mismatch"));
  730.                     }
  731.                    
  732.                     // Keep this catalog and continue looking for newer ones
  733.                     latest_catalog = Some(String::from_utf8(payload)?);
  734.                 } else {
  735.                     // Skip this block
  736.                     file.seek(SeekFrom::Current(length as i64))?;
  737.                 }
  738.             }
  739.            
  740.             Ok(latest_catalog)
  741.         }
  742.        
  743.         pub fn scan_table_data<F: FnMut(&str, u64, &[Vec<String>]) -> Result<()>>(&self, mut callback: F) -> Result<()> {
  744.             // For now, scan all data blocks from the beginning to ensure we don't miss any data
  745.             // This is simpler and ensures we read all data, including data written before the latest checkpoint
  746.             let start_offset = HEADER_SIZE as u64;
  747.            
  748.             let mut file = File::open(&self.db_path)?;
  749.             file.seek(SeekFrom::Start(start_offset))?;
  750.            
  751.             loop {
  752.                 let mut tag_buf = [0u8; 1];
  753.                 if file.read_exact(&mut tag_buf).is_err() {
  754.                     break;
  755.                 }
  756.                 let tag = tag_buf[0];
  757.                
  758.                 let mut len_buf = [0u8; 4];
  759.                 file.read_exact(&mut len_buf)?;
  760.                 let length = u32::from_le_bytes(len_buf);
  761.                
  762.                 let mut crc_buf = [0u8; 4];
  763.                 file.read_exact(&mut crc_buf)?;
  764.                 let _crc = u32::from_le_bytes(crc_buf);
  765.                
  766.                 if tag == TAG_DATA {
  767.                     let mut payload = vec![0u8; length as usize];
  768.                     file.read_exact(&mut payload)?;
  769.                    
  770.                     // Verify CRC
  771.                     let computed_crc = Self::crc32(&payload);
  772.                     if computed_crc != _crc {
  773.                         return Err(anyhow::anyhow!("Data block CRC mismatch"));
  774.                     }
  775.                    
  776.                     // Parse data block
  777.                     let mut cursor = 0;
  778.                    
  779.                     // Table name
  780.                     let name_len = u16::from_le_bytes([payload[cursor], payload[cursor + 1]]) as usize;
  781.                     cursor += 2;
  782.                     let table_name = String::from_utf8(payload[cursor..cursor + name_len].to_vec())?;
  783.                     cursor += name_len;
  784.                    
  785.                     // Sequence number
  786.                     let seq_no = u64::from_le_bytes([
  787.                         payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3],
  788.                         payload[cursor + 4], payload[cursor + 5], payload[cursor + 6], payload[cursor + 7]
  789.                     ]);
  790.                     cursor += 8;
  791.                    
  792.                     // Row count
  793.                     let row_count = u32::from_le_bytes([
  794.                         payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3]
  795.                     ]) as usize;
  796.                     cursor += 4;
  797.                    
  798.                     // Column count
  799.                     let col_count = u16::from_le_bytes([payload[cursor], payload[cursor + 1]]) as usize;
  800.                     cursor += 2;
  801.                    
  802.                     // Parse rows
  803.                     let mut rows = Vec::with_capacity(row_count);
  804.                     for _ in 0..row_count {
  805.                         let mut row = Vec::with_capacity(col_count);
  806.                         for _ in 0..col_count {
  807.                             let col_len = u32::from_le_bytes([
  808.                                 payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3]
  809.                             ]) as usize;
  810.                             cursor += 4;
  811.                             let col_str = String::from_utf8(payload[cursor..cursor + col_len].to_vec())?;
  812.                             cursor += col_len;
  813.                             row.push(col_str);
  814.                         }
  815.                         rows.push(row);
  816.                     }
  817.                    
  818.                     callback(&table_name, seq_no, &rows)?;
  819.                 } else {
  820.                     // Skip this block
  821.                     file.seek(SeekFrom::Current(length as i64))?;
  822.                 }
  823.             }
  824.            
  825.             Ok(())
  826.         }
  827.     }
  828. }
  829.  
  830. // ================================
  831. // Engine (ops on rows)
  832. // ================================
  833. mod engine {
  834.     use super::schema::Schema;
  835.     use super::storage::{schema_to_toml, CsvStorage, StorageBackend};
  836.     use anyhow::{anyhow, Context, Result};
  837.     use bincode;
  838.     use std::{collections::BTreeMap, path::Path};
  839.  
  840.     #[derive(Clone, Debug, Serialize, Deserialize)]
  841.     pub enum Value { S(String), I(i64), F(f64), B(bool) }
  842.     use serde::{Serialize, Deserialize};
  843.  
  844.     pub struct Table {
  845.         pub schema: Schema,
  846.         storage: StorageBackend,
  847.         pk_index: Option<BTreeMap<String, u64>>, // pk -> rowno
  848.     // headers: Vec<String>,
  849.     }
  850.  
  851.     impl Table {
  852.         pub fn open(root: &Path, schema: Schema, legacy: bool) -> Result<Self> {
  853.             let canto_db_path = root.join("canto.db");
  854.            
  855.             let storage = if legacy {
  856.                 // Use CSV storage (legacy mode explicitly requested)
  857.                 let schema_toml = schema_to_toml(&schema)?;
  858.                 let csv_storage = super::storage::CsvStorage::open(
  859.                     schema.table_csv_path(root),
  860.                     schema.schema_toml_path(root),
  861.                     schema.idx_path(root),
  862.                     schema.lock_path(root),
  863.                     &schema,
  864.                 )?;
  865.                 csv_storage.write_schema_toml(&schema_toml)?;
  866.                 StorageBackend::Csv(csv_storage)
  867.             } else {
  868.                 // Use binary storage (default)
  869.                 let lock_path = root.join("canto.lock");
  870.                 let binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
  871.                
  872.                 // If this is a new binary storage and this table doesn't exist in catalog yet,
  873.                 // we need to add it to the catalog
  874.                 if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
  875.                     let existing_schemas: std::collections::HashMap<String, super::schema::Schema> =
  876.                         toml::from_str(&catalog_toml).unwrap_or_default();
  877.                     if !existing_schemas.contains_key(&schema.name) {
  878.                         // Add this schema to the catalog
  879.                         let mut all_schemas = existing_schemas;
  880.                         all_schemas.insert(schema.name.clone(), schema.clone());
  881.                         let new_catalog_toml = toml::to_string_pretty(&all_schemas)?;
  882.                         binary_storage.write_catalog(&new_catalog_toml)?;
  883.                     }
  884.                 } else {
  885.                     // No catalog exists yet, create one with this schema
  886.                     let mut all_schemas = std::collections::HashMap::new();
  887.                     all_schemas.insert(schema.name.clone(), schema.clone());
  888.                     let catalog_toml = toml::to_string_pretty(&all_schemas)?;
  889.                     binary_storage.write_catalog(&catalog_toml)?;
  890.                 }
  891.                
  892.                 StorageBackend::Binary(binary_storage, schema.name.clone())
  893.             };
  894.  
  895.             let mut t = Self { schema, storage, pk_index: None };
  896.             if let Some(pk) = t.schema.primary_key.clone() {
  897.                 let pk_idx = t.schema.col_index(&pk).ok_or_else(|| anyhow!("bad PK"))?;
  898.                 if let Some(bytes) = t.storage.read_index_bytes()? {
  899.                     let map: BTreeMap<String, u64> = bincode::deserialize(&bytes)?;
  900.                     t.pk_index = Some(map);
  901.                 } else {
  902.                     let mut map = BTreeMap::new();
  903.                     let mut rowno: u64 = 0;
  904.                     t.storage.scan(|row| {
  905.                         if let Some(val) = row.get(pk_idx) { map.insert(val.clone(), rowno); }
  906.                         rowno += 1;
  907.                         Ok(())
  908.                     })?;
  909.                     t.pk_index = Some(map.clone());
  910.                     let bytes = bincode::serialize(&map)?;
  911.                     t.storage.write_index_bytes(&bytes)?;
  912.                 }
  913.             }
  914.             Ok(t)
  915.         }
  916.  
  917.         pub fn insert(&mut self, row: Vec<Value>) -> Result<()> {
  918.             if row.len() != self.schema.columns.len() { return Err(anyhow!("column count mismatch")); }
  919.             let ser: Vec<String> = row.into_iter().map(|v| match v {
  920.                 Value::S(s) => s, Value::I(i) => i.to_string(), Value::F(f)=>f.to_string(), Value::B(b)=>b.to_string()
  921.             }).collect();
  922.  
  923.             if let Some(pk) = self.schema.primary_key.clone() {
  924.                 let pk_idx = self.schema.col_index(&pk).unwrap();
  925.                 if let Some(idx) = &mut self.pk_index {
  926.                     let key = ser[pk_idx].clone();
  927.                     let new_rowno = idx.len() as u64;
  928.                     idx.insert(key, new_rowno);
  929.                     let bytes = bincode::serialize(idx)?;
  930.                     self.storage.write_index_bytes(&bytes)?;
  931.                 }
  932.             }
  933.             self.storage.append_record(&ser)
  934.         }
  935.  
  936.         pub fn select<F: Fn(&[String]) -> bool>(&self, pred: F) -> Result<Vec<Vec<String>>> {
  937.             let mut out = Vec::new();
  938.             self.storage.scan(|row| { if pred(row) { out.push(row.to_vec()); } Ok(()) })?;
  939.             Ok(out)
  940.         }
  941.  
  942.     // pub fn update<F: Fn(&[String]) -> bool, G: Fn(&mut [String])>(&mut self, pred: F, patch: G) -> Result<usize> {
  943.     //     let mut rows: Vec<Vec<String>> = Vec::new();
  944.     //     self.storage.scan(|row| { rows.push(row.to_vec()); Ok(()) })?;
  945.     //
  946.     //     let mut n = 0usize;
  947.     //     for r in rows.iter_mut() { if pred(r) { patch(r); n += 1; } }
  948.     //     self.storage.rewrite(&rows)?;
  949.     //
  950.     //     if let Some(pk) = self.schema.primary_key.clone() {
  951.     //         let pk_idx = self.schema.col_index(&pk).context("PK not found")?;
  952.     //         let mut map = BTreeMap::new();
  953.     //         for (i, r) in rows.iter().enumerate() {
  954.     //             if let Some(k) = r.get(pk_idx) { map.insert(k.clone(), i as u64); }
  955.     //         }
  956.     //         self.pk_index = Some(map.clone());
  957.     //         let bytes = bincode::serialize(&map)?;
  958.     //         self.storage.write_index_bytes(&bytes)?;
  959.     //     }
  960.     //     Ok(n)
  961.     // }
  962.  
  963.         pub fn delete<F: Fn(&[String]) -> bool>(&mut self, pred: F) -> Result<usize> {
  964.             let mut kept: Vec<Vec<String>> = Vec::new(); let mut n = 0usize;
  965.             self.storage.scan(|row| { if pred(row) { n += 1; } else { kept.push(row.to_vec()); } Ok(()) })?;
  966.             self.storage.rewrite(&kept)?;
  967.             if let Some(pk) = self.schema.primary_key.clone() {
  968.                 let pk_idx = self.schema.col_index(&pk).context("PK not found")?;
  969.                 let mut map = BTreeMap::new();
  970.                 for (i, r) in kept.iter().enumerate() {
  971.                     if let Some(k) = r.get(pk_idx) { map.insert(k.clone(), i as u64); }
  972.                 }
  973.                 self.pk_index = Some(map.clone());
  974.                 let bytes = bincode::serialize(&map)?;
  975.                 self.storage.write_index_bytes(&bytes)?;
  976.             }
  977.             Ok(n)
  978.         }
  979.     }
  980. }
  981.  
  982. // ================================
  983. // Minimal SQL (hand-rolled, tiny subset)
  984. // ================================
  985. mod sql {
  986.     use super::{engine, schema};
  987.     use anyhow::{anyhow, Result};
  988.     use regex::Regex;
  989.     use serde_json::json;
  990.     use std::fs;
  991.     use std::collections::BTreeMap;
  992.     use std::path::Path;
  993.     // use anyhow::Context;
  994.  
  995.     // Tiny helper: strip comments and compress whitespace.
  996.     pub fn clean_sql(s: &str) -> String {
  997.         let mut out = String::new();
  998.         for line in s.lines() {
  999.             let l = line.trim();
  1000.             if l.starts_with("--") { continue; }
  1001.             out.push_str(l);
  1002.             out.push(' ');
  1003.         }
  1004.         // collapse multiple spaces
  1005.         let re = Regex::new(r"\s+").unwrap();
  1006.         re.replace_all(&out, " ").trim().to_string()
  1007.     }
  1008.  
  1009.     #[derive(Debug)]
  1010.     pub enum Stmt {
  1011.         CreateTable { name: String, cols: Vec<ColDef>, primary_key: Option<String> },
  1012.         Insert { table: String, cols: Vec<String>, values: Vec<Vec<String>> },
  1013.         Select { projection: Vec<String>, from: String, joins: Vec<Join>, order_by: Option<(String, bool)>, limit: Option<usize> },
  1014.     }
  1015.  
  1016.     #[derive(Debug)]
  1017.     pub struct ColDef {
  1018.         pub name: String,
  1019.         pub dtype: schema::DataType,
  1020.         pub nullable: bool,
  1021.     }
  1022.  
  1023.     #[derive(Debug)]
  1024.     pub struct Join {
  1025.         pub table: String,
  1026.         pub left_col: String,
  1027.         pub right_col: String,
  1028.     }
  1029.  
  1030.     pub fn parse_statements(sql: &str) -> Result<Vec<Stmt>> {
  1031.         let cleaned = clean_sql(sql);
  1032.         let mut out = Vec::new();
  1033.         for chunk in cleaned.split(';') {
  1034.             let s = chunk.trim();
  1035.             if s.is_empty() { continue; }
  1036.             if s.to_uppercase().starts_with("CREATE TABLE") {
  1037.                 out.push(parse_create_table_owned(s)?);
  1038.             } else if s.to_uppercase().starts_with("INSERT INTO") {
  1039.                 out.push(parse_insert_owned(s)?);
  1040.             } else if s.to_uppercase().starts_with("SELECT") {
  1041.                 out.push(parse_select_owned(s)?);
  1042.             } else {
  1043.                 return Err(anyhow!("Unsupported SQL: {}", s));
  1044.             }
  1045.         }
  1046.         Ok(out)
  1047.     }
  1048.  
  1049.     // Helper functions to parse and return owned Stmt/ColDef/Join
  1050.     fn parse_create_table_owned(s: &str) -> Result<Stmt> {
  1051.         let stmt = parse_create_table(s)?;
  1052.         if let Stmt::CreateTable { name, cols, primary_key } = stmt {
  1053.             Ok(Stmt::CreateTable {
  1054.                 name: name.to_string(),
  1055.                 cols: cols.into_iter().map(|c| ColDef {
  1056.                     name: c.name.to_string(),
  1057.                     dtype: c.dtype,
  1058.                     nullable: c.nullable,
  1059.                 }).collect(),
  1060.                 primary_key: primary_key.map(|s| s.to_string()),
  1061.             })
  1062.         } else {
  1063.             unreachable!()
  1064.         }
  1065.     }
  1066.  
  1067.     fn parse_insert_owned(s: &str) -> Result<Stmt> {
  1068.         let stmt = parse_insert(s)?;
  1069.         if let Stmt::Insert { table, cols, values } = stmt {
  1070.             Ok(Stmt::Insert {
  1071.                 table: table.to_string(),
  1072.                 cols: cols.into_iter().map(|c| c.to_string()).collect(),
  1073.                 values: values.into_iter().map(|row| row.into_iter().map(|v| v.to_string()).collect()).collect(),
  1074.             })
  1075.         } else {
  1076.             unreachable!()
  1077.         }
  1078.     }
  1079.  
  1080.     fn parse_select_owned(s: &str) -> Result<Stmt> {
  1081.         let stmt = parse_select(s)?;
  1082.         if let Stmt::Select { projection, from, joins, order_by, limit } = stmt {
  1083.             Ok(Stmt::Select {
  1084.                 projection: projection.into_iter().map(|s| s.to_string()).collect(),
  1085.                 from: from.to_string(),
  1086.                 joins: joins.into_iter().map(|j| Join {
  1087.                     table: j.table.to_string(),
  1088.                     left_col: j.left_col.to_string(),
  1089.                     right_col: j.right_col.to_string(),
  1090.                 }).collect(),
  1091.                 order_by: order_by.map(|(s, b)| (s.to_string(), b)),
  1092.                 limit,
  1093.             })
  1094.         } else {
  1095.             unreachable!()
  1096.         }
  1097.     }
  1098.  
  1099.     fn parse_create_table(s: &str) -> Result<Stmt> {
  1100.     let mut dtype;
  1101.         // CREATE TABLE name ( col defs ... )
  1102.         let re = Regex::new(r"(?i)^CREATE TABLE\s+([A-Za-z0-9_]+)\s*\((.+)\)$").unwrap();
  1103.         let caps = re.captures(s).ok_or_else(|| anyhow!("bad CREATE TABLE"))?;
  1104.         let name = caps.get(1).unwrap().as_str().to_string();
  1105.         let body = caps.get(2).unwrap().as_str();
  1106.  
  1107.         let mut cols = Vec::new();
  1108.         let mut pk: Option<String> = None;
  1109.  
  1110.         for part in body.split(',') {
  1111.             let p = part.trim();
  1112.             if p.is_empty() { continue; }
  1113.             if p.to_uppercase().starts_with("PRIMARY KEY") {
  1114.                 continue;
  1115.             }
  1116.             let toks: Vec<&str> = p.split_whitespace().collect();
  1117.             if toks.len() < 2 { continue; }
  1118.             let col = toks[0].to_string();
  1119.             let up = toks[1].to_uppercase();
  1120.             if up.starts_with("INT") { dtype = schema::DataType::Int; }
  1121.             else if up.starts_with("FLOAT") || up.starts_with("REAL") { dtype = schema::DataType::Float; }
  1122.             else if up.starts_with("BOOL") { dtype = schema::DataType::Bool; }
  1123.             else if up.starts_with("DATE") { dtype = schema::DataType::Date; }
  1124.             else { dtype = schema::DataType::String; }
  1125.  
  1126.             let nullable = !p.to_uppercase().contains("NOT NULL");
  1127.             if p.to_uppercase().contains("PRIMARY KEY") {
  1128.                 pk = Some(col.clone());
  1129.             }
  1130.             cols.push(ColDef { name: col, dtype, nullable });
  1131.         }
  1132.  
  1133.         Ok(Stmt::CreateTable { name, cols, primary_key: pk })
  1134.     }
  1135.  
  1136.     fn parse_insert(s: &str) -> Result<Stmt> {
  1137.         // INSERT INTO table (a,b,c) VALUES (..),(..)
  1138.         let re = Regex::new(r"(?i)^INSERT INTO\s+([A-Za-z0-9_]+)\s*\(([^)]+)\)\s*VALUES\s*(.+)$").unwrap();
  1139.         let caps = re.captures(s).ok_or_else(|| anyhow!("bad INSERT"))?;
  1140.         let table = caps.get(1).unwrap().as_str().to_string();
  1141.         let cols_str = caps.get(2).unwrap().as_str();
  1142.         let values_str = caps.get(3).unwrap().as_str().trim();
  1143.  
  1144.         let cols: Vec<String> = cols_str.split(',').map(|c| c.trim().to_string()).collect();
  1145.  
  1146.         // split values by top-level parentheses
  1147.         let mut values = Vec::new();
  1148.         let mut depth = 0usize;
  1149.         let mut start = 0usize;
  1150.         for (i, ch) in values_str.char_indices() {
  1151.             match ch {
  1152.                 '(' => { if depth == 0 { start = i+1; } depth += 1; }
  1153.                 ')' => { depth -= 1; if depth == 0 {
  1154.                     let tuple_str = &values_str[start..i];
  1155.                     let tuple_vals = split_args(tuple_str).into_iter().map(|v| v.to_string()).collect();
  1156.                     values.push(tuple_vals);
  1157.                 }}
  1158.                 _ => {}
  1159.             }
  1160.         }
  1161.         Ok(Stmt::Insert { table, cols, values })
  1162.     }
  1163.  
  1164.     fn split_args<'a>(s: &'a str) -> Vec<&'a str> {
  1165.        // split by commas not inside quotes
  1166.        let mut out = Vec::new();
  1167.        let mut start = 0usize;
  1168.        let mut in_str = false;
  1169.        let bytes = s.as_bytes();
  1170.        for i in 0..bytes.len() {
  1171.            let ch = bytes[i] as char;
  1172.            if ch == '\'' { in_str = !in_str; }
  1173.             if ch == ',' && !in_str {
  1174.                 out.push(s[start..i].trim());
  1175.                 start = i+1;
  1176.             }
  1177.         }
  1178.         if start < s.len() { out.push(s[start..].trim()); }
  1179.         out
  1180.     }
  1181.  
  1182.     fn parse_select(s: &str) -> Result<Stmt> {
  1183.         // SELECT a,b FROM t [JOIN u ON t.x=u.y]* [ORDER BY c (ASC|DESC)] [LIMIT n]
  1184.         let mut rest = s.trim();
  1185.         if !rest.to_uppercase().starts_with("SELECT ") { return Err(anyhow!("bad SELECT")); }
  1186.         rest = &rest[7..];
  1187.  
  1188.         let (projection_str, rest) = split_once_upper(rest, " FROM ").ok_or_else(|| anyhow!("missing FROM"))?;
  1189.         let projection: Vec<String> = projection_str.split(',').map(|c| c.trim().to_string()).collect();
  1190.  
  1191.     let (from_str, rest) = split_once_any_upper(rest, &[" JOIN ", " ORDER BY ", " LIMIT "]).unwrap_or((rest, ""));
  1192.         let from = from_str.trim().to_string();
  1193.  
  1194.         let mut joins = Vec::new();
  1195.         let mut order_by: Option<(String, bool)> = None;
  1196.         let mut limit: Option<usize> = None;
  1197.  
  1198.         let mut tmp = rest;
  1199.         while !tmp.is_empty() {
  1200.             if let Some((after, _chunk)) = take_prefix_upper(tmp, " JOIN ") {
  1201.                 // chunk starts at table name
  1202.                 let (table, after2) = split_once_upper(after, " ON ").ok_or_else(|| anyhow!("JOIN missing ON"))?;
  1203.                 let table = table.trim().to_string();
  1204.                 let (cond, after3) = split_once_any_upper(after2, &[" JOIN ", " ORDER BY ", " LIMIT "]).unwrap_or((after2, ""));
  1205.                 // cond: a.x = b.y
  1206.                 let parts: Vec<&str> = cond.split('=').map(|x| x.trim()).collect();
  1207.                 if parts.len() != 2 { return Err(anyhow!("bad JOIN condition")); }
  1208.                 let (l_tab, l_col) = split_qual(parts[0])?;
  1209.                 let (r_tab, r_col) = split_qual(parts[1])?;
  1210.                 if l_tab.is_empty() || r_tab.is_empty() { return Err(anyhow!("JOIN needs qualified columns")); }
  1211.                 joins.push(super::sql::Join { table, left_col: l_tab + "." + &l_col, right_col: r_tab + "." + &r_col });
  1212.                 tmp = after3;
  1213.                 continue;
  1214.             }
  1215.             if let Some((after, _chunk)) = take_prefix_upper(tmp, " ORDER BY ") {
  1216.                 let (spec, after2) = split_once_any_upper(after, &[" LIMIT "]).unwrap_or((after, ""));
  1217.                 let mut toks = spec.split_whitespace();
  1218.                 let col = toks.next().ok_or_else(|| anyhow!("ORDER BY col"))?.to_string();
  1219.                 let dir = toks.next().unwrap_or("ASC").eq_ignore_ascii_case("DESC");
  1220.                 order_by = Some((col, dir)); // dir==true => DESC
  1221.                 tmp = after2;
  1222.                 continue;
  1223.             }
  1224.             if let Some((after, _)) = take_prefix_upper(tmp, " LIMIT ") {
  1225.                 let n = after.trim().split_whitespace().next().unwrap_or("0").parse::<usize>()?;
  1226.                 limit = Some(n);
  1227.                 tmp = "";
  1228.                 continue;
  1229.             }
  1230.             break;
  1231.         }
  1232.  
  1233.         Ok(Stmt::Select { projection, from, joins, order_by, limit })
  1234.     }
  1235.  
  1236.     fn split_qual(s: &str) -> Result<(String, String)> {
  1237.         let parts: Vec<&str> = s.split('.').collect();
  1238.         if parts.len() != 2 { return Err(anyhow!("need qualified name like t.col")); }
  1239.         Ok((parts[0].trim().to_string(), parts[1].trim().to_string()))
  1240.     }
  1241.  
  1242.     fn split_once_upper<'a>(s: &'a str, needle: &str) -> Option<(&'a str, &'a str)> {
  1243.         let up = s.to_uppercase();
  1244.         let nup = needle.to_uppercase();
  1245.         if let Some(i) = up.find(&nup) {
  1246.             Some((&s[..i], &s[i + needle.len()..]))
  1247.         } else { None }
  1248.     }
  1249.     fn split_once_any_upper<'a>(s: &'a str, needles: &[&str]) -> Option<(&'a str, &'a str)> {
  1250.         let up = s.to_uppercase();
  1251.         let mut best: Option<(usize, &str)> = None;
  1252.         for &n in needles {
  1253.             if let Some(i) = up.find(&n.to_uppercase()) {
  1254.                 if best.map_or(true, |(bi, _)| i < bi) { best = Some((i, n)); }
  1255.             }
  1256.         }
  1257.         if let Some((i, _n)) = best {
  1258.             Some((&s[..i], &s[i..]))
  1259.         } else {
  1260.             None
  1261.         }
  1262.     }
  1263.     fn take_prefix_upper<'a>(s: &'a str, needle: &'a str) -> Option<(&'a str, &'a str)> {
  1264.        let up = s.to_uppercase();
  1265.        let nup = needle.to_uppercase();
  1266.        if up.starts_with(&nup) {
  1267.            Some((&s[needle.len()..], needle))
  1268.        } else { None }
  1269.    }
  1270.  
  1271.    // ----- Execution: map parsed SQL to engine operations -----
  1272.  
  1273.    use crate::engine::{Table, Value};
  1274.    use std::collections::HashMap;
  1275.    use std::sync::Mutex;
  1276.    
  1277.    // Simple schema cache to avoid re-reading TOML files
  1278.    lazy_static::lazy_static! {
  1279.        static ref SCHEMA_CACHE: Mutex<HashMap<String, schema::Schema>> = Mutex::new(HashMap::new());
  1280.    }
  1281.    
  1282.    fn get_cached_schema(root: &Path, table_name: &str, legacy: bool) -> Result<schema::Schema> {
  1283.        let cache_key = format!("{}:{}:{}", root.display(), table_name, legacy);
  1284.        
  1285.        // Try cache first
  1286.        {
  1287.            let cache = SCHEMA_CACHE.lock().unwrap();
  1288.            if let Some(schema) = cache.get(&cache_key) {
  1289.                return Ok(schema.clone());
  1290.            }
  1291.        }
  1292.        
  1293.        let schema = if legacy {
  1294.            // Load from CSV schema file (legacy)
  1295.            let schema_path = root.join(format!("{table_name}.schema.toml"));
  1296.            let sch_str = fs::read_to_string(&schema_path)
  1297.                .map_err(|_| anyhow!("schema not found for table {table_name}"))?;
  1298.            toml::from_str(&sch_str)?
  1299.        } else {
  1300.            // Load from binary storage catalog
  1301.            let canto_db_path = root.join("canto.db");
  1302.            let lock_path = root.join("canto.lock");
  1303.            let binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
  1304.            if let Some(catalog_toml) = binary_storage.read_catalog()? {
  1305.                let all_schemas: std::collections::HashMap<String, schema::Schema> = toml::from_str(&catalog_toml)?;
  1306.                all_schemas.get(table_name)
  1307.                    .ok_or_else(|| anyhow!("table {} not found in catalog", table_name))?
  1308.                    .clone()
  1309.            } else {
  1310.                return Err(anyhow!("no catalog found in binary storage"));
  1311.            }
  1312.        };
  1313.        
  1314.        // Cache it
  1315.        {
  1316.            let mut cache = SCHEMA_CACHE.lock().unwrap();
  1317.            cache.insert(cache_key, schema.clone());
  1318.        }
  1319.        
  1320.        Ok(schema)
  1321.    }
  1322.  
  1323.    pub fn exec_statements(root: &Path, stmts: Vec<Stmt>, legacy: bool) -> Result<Vec<serde_json::Value>> {
  1324.        let mut results = Vec::new();
  1325.        for st in stmts {
  1326.            match st {
  1327.                Stmt::CreateTable { name, cols, primary_key } => {
  1328.                    let schema = schema::Schema {
  1329.                        name: name.to_string(),
  1330.                        columns: cols.into_iter().map(|c| schema::Column {
  1331.                            name: c.name.to_string(),
  1332.                            dtype: c.dtype,
  1333.                            nullable: c.nullable,
  1334.                        }).collect(),
  1335.                        primary_key: primary_key.map(|s| s.to_string()),
  1336.                    };
  1337.                    let _ = Table::open(root, schema, legacy)?; // ensures files exist
  1338.                    results.push(json!({"ok": true, "op": "create_table", "table": name}));
  1339.                }
  1340.                Stmt::Insert { table, cols, values } => {
  1341.                    // Load existing schema from cache
  1342.                    let schema = get_cached_schema(root, &table, legacy)?;
  1343.                    let mut tbl = Table::open(root, schema, legacy)?;
  1344.                    let col_index: Vec<usize> = cols.iter().map(|c| tbl.schema.col_index(c).ok_or_else(|| anyhow!("unknown col {}", c))).collect::<Result<_>>()?;
  1345.  
  1346.                    let values_len = values.len();
  1347.                    for tup in values {
  1348.                        let mut row = vec![Value::S(String::new()); tbl.schema.columns.len()];
  1349.                        for (i, raw) in tup.iter().enumerate() {
  1350.                            let colpos = col_index[i];
  1351.                            let v = parse_value(raw);
  1352.                            row[colpos] = v;
  1353.                        }
  1354.                        tbl.insert(row)?;
  1355.                    }
  1356.                    results.push(json!({"ok": true, "op": "insert", "table": table, "rows": values_len}));
  1357.                }
  1358.                Stmt::Select { projection, from, joins, order_by, limit } => {
  1359.                    // Naive: materialize FROM table, then nested-loop JOINs on equality
  1360.                    let from_schema = get_cached_schema(root, &from, legacy)?;
  1361.                    let from_tbl = Table::open(root, from_schema.clone(), legacy)?;
  1362.  
  1363.                    // Load full rows
  1364.                    let mut rows = Vec::<(String, Vec<String>)>::new(); // (table prefix, row)
  1365.                    let prefix_from = from.clone();
  1366.                    from_tbl.select(|_| true)?.into_iter().for_each(|r| rows.push((prefix_from.clone(), r)));
  1367.  
  1368.                    // Build a map from table->(schema, rows)
  1369.                    let mut table_schemas: BTreeMap<String, schema::Schema> = BTreeMap::new();
  1370.                    table_schemas.insert(from.clone(), from_schema);
  1371.  
  1372.                    // Apply joins
  1373.                    for j in joins {
  1374.                        let right_schema = get_cached_schema(root, &j.table, legacy)?;
  1375.                        let right_tbl = Table::open(root, right_schema.clone(), legacy)?;
  1376.                        let right_rows = right_tbl.select(|_| true)?;
  1377.  
  1378.                        // parse qualified eq: left_tab.left_col = right_tab.right_col
  1379.                        let (l_tab, l_col) = split_qual(&j.left_col)?;
  1380.                        let (_r_tab, r_col) = split_qual(&j.right_col)?;
  1381.                        let l_idx = table_schemas.get(&l_tab).ok_or_else(|| anyhow!("unknown table {}", l_tab))?
  1382.                            .col_index(&l_col).ok_or_else(|| anyhow!("unknown col {}", j.left_col))?;
  1383.                        let r_idx = right_schema.col_index(&r_col).ok_or_else(|| anyhow!("unknown col {}", j.right_col))?;
  1384.  
  1385.                        // Hash join optimization for equi-joins
  1386.                        let mut new_rows: Vec<(String, Vec<String>)> = Vec::new();
  1387.                        
  1388.                        // Build hash map for the right side (smaller table typically)
  1389.                        let mut right_map: std::collections::HashMap<String, Vec<usize>> = std::collections::HashMap::new();
  1390.                        for (idx, rrow) in right_rows.iter().enumerate() {
  1391.                            let key = rrow[r_idx].clone();
  1392.                            right_map.entry(key).or_insert_with(Vec::new).push(idx);
  1393.                        }
  1394.                        
  1395.                        // Probe with left side
  1396.                        for (ltab, lrow) in rows.into_iter() {
  1397.                            let kval = lrow[l_idx].clone();
  1398.                            if let Some(right_indices) = right_map.get(&kval) {
  1399.                                for &right_idx in right_indices {
  1400.                                    let rrow = &right_rows[right_idx];
  1401.                                    // merged row = lrow + rrow
  1402.                                    let mut merged = lrow.clone();
  1403.                                    merged.extend(rrow.clone());
  1404.                                    new_rows.push((format!("{}+{}", ltab, j.table), merged));
  1405.                                }
  1406.                            }
  1407.                        }
  1408.                        rows = new_rows;
  1409.                        table_schemas.insert(j.table.to_string(), right_schema);
  1410.                    }
  1411.  
  1412.                    // Build projection indices (allow qualified or bare; resolve by first match)
  1413.                    let joined_headers: Vec<(String,String)> = {
  1414.                        let mut acc = Vec::new();
  1415.                        for (tname, sch) in table_schemas.iter() {
  1416.                            for c in &sch.columns {
  1417.                                acc.push((tname.clone(), c.name.clone()));
  1418.                            }
  1419.                        }
  1420.                        acc
  1421.                    };
  1422.  
  1423.                    let proj_indices: Vec<usize> = projection.iter().map(|colref| {
  1424.                        if colref == "*" {
  1425.                            // expand to all columns: handled specially later
  1426.                            Ok(usize::MAX)
  1427.                        } else if let Some((t,c)) = colref.split_once('.') {
  1428.                            // qualified
  1429.                            let mut idx = 0usize;
  1430.                            for (qt, qc) in joined_headers.iter() {
  1431.                                if qt == t && qc == c { return Ok(idx); }
  1432.                                idx += 1;
  1433.                            }
  1434.                            Err(anyhow!("unknown projection {}", colref))
  1435.                        } else {
  1436.                            // bare: first match
  1437.                            let mut idx = 0usize;
  1438.                            for (_qt, qc) in joined_headers.iter() {
  1439.                                if qc == colref { return Ok(idx); }
  1440.                                idx += 1;
  1441.                            }
  1442.                            Err(anyhow!("unknown column {}", colref))
  1443.                        }
  1444.                    }).collect::<Result<_>>()?;
  1445.  
  1446.                    // ORDER BY with numeric support
  1447.                    if let Some((ob, desc)) = order_by {
  1448.                        // resolve index of order-by column (qualified or bare)
  1449.                        let ob_idx: usize = if let Some((t,c)) = ob.split_once('.') {
  1450.                            let mut idx = 0usize;
  1451.                            for (qt, qc) in joined_headers.iter() {
  1452.                                if qt == t && qc == c { break; }
  1453.                                idx += 1;
  1454.                            }
  1455.                            idx
  1456.                        } else {
  1457.                            let mut idx = 0usize;
  1458.                            for (_qt, qc) in joined_headers.iter() {
  1459.                                if qc == &ob { break; }
  1460.                                idx += 1;
  1461.                            }
  1462.                            idx
  1463.                        };
  1464.                        
  1465.                        // Try numeric sort first, fall back to lexicographic
  1466.                        let all_i64: bool = rows.iter().all(|(_, row)| row[ob_idx].parse::<i64>().is_ok());
  1467.                        if all_i64 {
  1468.                            // Sort as integers
  1469.                            rows.sort_by(|a, b| {
  1470.                                let a_val = a.1[ob_idx].parse::<i64>().unwrap_or(0);
  1471.                                let b_val = b.1[ob_idx].parse::<i64>().unwrap_or(0);
  1472.                                a_val.cmp(&b_val)
  1473.                            });
  1474.                        } else {
  1475.                            let all_f64: bool = rows.iter().all(|(_, row)| row[ob_idx].parse::<f64>().is_ok());
  1476.                            if all_f64 {
  1477.                                // Sort as floats
  1478.                                rows.sort_by(|a, b| {
  1479.                                    let a_val = a.1[ob_idx].parse::<f64>().unwrap_or(0.0);
  1480.                                    let b_val = b.1[ob_idx].parse::<f64>().unwrap_or(0.0);
  1481.                                    a_val.partial_cmp(&b_val).unwrap_or(std::cmp::Ordering::Equal)
  1482.                                });
  1483.                            } else {
  1484.                                // Sort lexicographically
  1485.                                rows.sort_by(|a,b| a.1[ob_idx].cmp(&b.1[ob_idx]));
  1486.                            }
  1487.                        }
  1488.                        
  1489.                        if desc { rows.reverse(); }
  1490.                    }
  1491.  
  1492.                    // LIMIT
  1493.                    let rows = if let Some(n) = limit { rows.into_iter().take(n).collect::<Vec<_>>() } else { rows };
  1494.  
  1495.                    // Materialize projection (optimized: compute indices once)
  1496.                    let out_rows: Vec<Vec<String>> = if proj_indices.len()==1 && proj_indices[0]==usize::MAX {
  1497.                        // SELECT * - return all columns
  1498.                        rows.into_iter().map(|(_tname, row)| row).collect()
  1499.                    } else {
  1500.                        // Specific columns - project only needed ones
  1501.                        rows.into_iter().map(|(_tname, row)| {
  1502.                            let mut projected_row = Vec::with_capacity(proj_indices.len());
  1503.                            for &i in &proj_indices {
  1504.                                projected_row.push(row[i].clone());
  1505.                            }
  1506.                            projected_row
  1507.                        }).collect()
  1508.                    };
  1509.  
  1510.                    results.push(json!({
  1511.                        "ok": true,
  1512.                        "op": "select",
  1513.                        "columns": projection,
  1514.                        "rows": out_rows
  1515.                    }));
  1516.                }
  1517.            }
  1518.        }
  1519.        Ok(results)
  1520.    }
  1521.  
  1522.    fn parse_value(s: &str) -> engine::Value {
  1523.        let t = s.trim();
  1524.        if t.starts_with('\'') && t.ends_with('\'') {
  1525.             let inner = &t[1..t.len()-1];
  1526.             engine::Value::S(inner.to_string())
  1527.         } else if let Ok(i) = t.parse::<i64>() {
  1528.             engine::Value::I(i)
  1529.         } else if let Ok(f) = t.parse::<f64>() {
  1530.             engine::Value::F(f)
  1531.         } else if t.eq_ignore_ascii_case("true") || t.eq_ignore_ascii_case("false") {
  1532.             engine::Value::B(t.eq_ignore_ascii_case("true"))
  1533.         } else {
  1534.             engine::Value::S(t.to_string())
  1535.         }
  1536.     }
  1537. }
  1538. // use sql::*; // for json!, regex dependency
  1539. // use serde_json::json;
  1540.  
  1541. // ================================
  1542. // REST server (Axum) + Auth + Users
  1543. // ================================
  1544. mod server {
  1545.     use super::{Args};
  1546.     use super::{sql, engine, schema};
  1547.     use axum::{routing::{get, post, delete}, Router, extract::State, http::{StatusCode, HeaderMap}, Json};
  1548.     use serde::Deserialize;
  1549.     use std::{sync::Arc, path::PathBuf, fs};
  1550.     use anyhow::{Result, anyhow};
  1551.     use base64::engine::general_purpose::STANDARD;
  1552.     use base64::Engine as _;
  1553.     use axum::extract::Path;
  1554.     use serde_json::json;
  1555.  
  1556.     #[derive(Clone)]
  1557.     pub struct AppState {
  1558.         pub root: Arc<PathBuf>,
  1559.         pub require_auth: bool,
  1560.         pub legacy: bool,
  1561.     }
  1562.  
  1563.     #[derive(Deserialize)]
  1564.     pub struct QueryReq { pub sql: String }
  1565.  
  1566.     #[derive(Deserialize)]
  1567.     pub struct UserReq { pub username: String, pub password: String }
  1568.  
  1569.     pub async fn run_server(args: Args) -> Result<()> {
  1570.         let app_state = AppState { root: Arc::new(args.data_dir.clone()), require_auth: args.require_auth, legacy: args.legacy };
  1571.  
  1572.         // Migrate users table from CSV to binary storage (if needed)
  1573.         if !args.legacy {
  1574.             super::migration::migrate_users_to_binary(&args.data_dir)?;
  1575.            
  1576.             // Ensure canto.db exists for binary storage (even if no migration was needed)
  1577.             let canto_db_path = args.data_dir.join("canto.db");
  1578.             if !canto_db_path.exists() {
  1579.                 let lock_path = args.data_dir.join("canto.lock");
  1580.                 let _binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
  1581.             }
  1582.         }
  1583.  
  1584.         // Check for migration from CSV to binary storage for other tables
  1585.         let canto_db_path = args.data_dir.join("canto.db");
  1586.         if !args.legacy && !canto_db_path.exists() {
  1587.             // Check if there are any CSV files that need migration
  1588.             if let Ok(entries) = std::fs::read_dir(&args.data_dir) {
  1589.                 let csv_files: Vec<_> = entries
  1590.                     .filter_map(|e| e.ok())
  1591.                     .filter(|e| e.path().extension().map_or(false, |ext| ext == "csv"))
  1592.                     .filter(|e| e.file_name() != "users.csv") // Skip users table - handled above
  1593.                     .collect();
  1594.                
  1595.                 if !csv_files.is_empty() {
  1596.                     println!("Found legacy CSV files, migrating to binary storage...");
  1597.                     migrate_csv_to_binary(&args.data_dir, &canto_db_path)?;
  1598.                     println!("Migration completed successfully!");
  1599.                 }
  1600.             }
  1601.         }
  1602.  
  1603.         // Ensure "users" table exists
  1604.         ensure_users_table(&app_state)?;
  1605.  
  1606.         let app = Router::new()
  1607.             .route("/healthz", get(|| async { "ok" }))
  1608.             .route("/v1/query", post(handle_query))
  1609.             .route("/v1/users", post(create_user))
  1610.             .route("/v1/users/:username", delete(delete_user))
  1611.             .with_state(app_state);
  1612.  
  1613.         let listener = tokio::net::TcpListener::bind(&args.bind).await?;
  1614.         println!("canto server listening on http://{}", &args.bind);
  1615.         axum::serve(listener, app).await?;
  1616.         Ok(())
  1617.     }
  1618.    
  1619.     fn migrate_csv_to_binary(data_dir: &std::path::Path, canto_db_path: &std::path::Path) -> Result<()> {
  1620.         use super::storage::{BinaryStorage, StorageBackend};
  1621.         use super::schema::Schema;
  1622.         use std::collections::HashMap;
  1623.        
  1624.         // Create binary storage
  1625.         let lock_path = data_dir.join("canto.lock");
  1626.         let binary_storage = BinaryStorage::open(canto_db_path.to_path_buf(), lock_path)?;
  1627.        
  1628.         // Collect all schemas
  1629.         let mut all_schemas = HashMap::new();
  1630.         for entry in std::fs::read_dir(data_dir)? {
  1631.             let entry = entry?;
  1632.             let path = entry.path();
  1633.             if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
  1634.                 if name.ends_with(".schema.toml") {
  1635.                     let table_name = name.strip_suffix(".schema.toml").unwrap();
  1636.                     if table_name != "users" { // Skip users table
  1637.                         let schema_content = std::fs::read_to_string(&path)?;
  1638.                         let schema: Schema = toml::from_str(&schema_content)?;
  1639.                         all_schemas.insert(table_name.to_string(), schema);
  1640.                     }
  1641.                 }
  1642.             }
  1643.         }
  1644.        
  1645.         // Write central catalog
  1646.         let catalog_toml = toml::to_string_pretty(&all_schemas)?;
  1647.         binary_storage.write_catalog(&catalog_toml)?;
  1648.        
  1649.         // Migrate each table's data
  1650.         let mut checkpoints = Vec::new();
  1651.         for (table_name, schema) in &all_schemas {
  1652.             let csv_path = data_dir.join(format!("{}.csv", table_name));
  1653.             if csv_path.exists() {
  1654.                 // Read CSV data
  1655.                 let mut rows = Vec::new();
  1656.                 let file = std::fs::File::open(&csv_path)?;
  1657.                 let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(file);
  1658.                 for record in rdr.records() {
  1659.                     let record = record?;
  1660.                     let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
  1661.                     // Only take columns that exist in schema (ignore FOREIGN KEY artifacts)
  1662.                     let filtered_row: Vec<String> = row.into_iter()
  1663.                         .take(schema.columns.len())
  1664.                         .collect();
  1665.                     rows.push(filtered_row);
  1666.                 }
  1667.                
  1668.                 // Write to binary storage
  1669.                 let seq_no = 1u64; // Start with sequence 1
  1670.                 let data_offset = binary_storage.append_table_data(table_name, seq_no, &rows)?;
  1671.                
  1672.                 // Handle primary key index if present
  1673.                 let pk_index_offset = if let Some(pk_col) = &schema.primary_key {
  1674.                     if let Some(pk_idx) = schema.col_index(pk_col) {
  1675.                         let mut pk_index = std::collections::BTreeMap::new();
  1676.                         for (row_no, row) in rows.iter().enumerate() {
  1677.                             if let Some(pk_val) = row.get(pk_idx) {
  1678.                                 pk_index.insert(pk_val.clone(), row_no as u64);
  1679.                             }
  1680.                         }
  1681.                         binary_storage.write_pk_index(table_name, &pk_index)?
  1682.                     } else { 0 }
  1683.                 } else { 0 };
  1684.                
  1685.                 checkpoints.push(super::storage::TableCheckpoint {
  1686.                     table_name: table_name.clone(),
  1687.                     last_seq_no: seq_no,
  1688.                     last_data_offset: data_offset,
  1689.                     pk_index_offset,
  1690.                 });
  1691.             }
  1692.         }
  1693.        
  1694.         // Write checkpoint
  1695.         binary_storage.write_checkpoint(&checkpoints)?;
  1696.        
  1697.         // Move old files to legacy directory
  1698.         let legacy_dir = data_dir.join("legacy");
  1699.         std::fs::create_dir_all(&legacy_dir)?;
  1700.        
  1701.         for entry in std::fs::read_dir(data_dir)? {
  1702.             let entry = entry?;
  1703.             let path = entry.path();
  1704.             if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
  1705.                 // Move CSV, schema, idx, and lock files (but not users table)
  1706.                 if (name.ends_with(".csv") || name.ends_with(".schema.toml") ||
  1707.                     name.ends_with(".idx") || name.ends_with(".lock")) &&
  1708.                    !name.starts_with("users.") && !name.starts_with("canto.") {
  1709.                     let dest = legacy_dir.join(name);
  1710.                     std::fs::rename(&path, &dest)?;
  1711.                 }
  1712.             }
  1713.         }
  1714.        
  1715.         Ok(())
  1716.     }
  1717.  
  1718.     async fn handle_query(State(st): State<AppState>, headers: HeaderMap, Json(req): Json<QueryReq>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
  1719.         if st.require_auth {
  1720.             auth_check(&st, &headers).map_err(internal)?;
  1721.         }
  1722.         let stmts = sql::parse_statements(&req.sql).map_err(internal)?;
  1723.         // Use binary storage if canto.db exists, otherwise fall back to CSV
  1724.         let use_legacy = st.legacy || !st.root.join("canto.db").exists();
  1725.         let res = sql::exec_statements(&st.root, stmts, use_legacy).map_err(internal)?;
  1726.         Ok(Json(json!({"results": res})))
  1727.     }
  1728.  
  1729.     async fn create_user(State(st): State<AppState>, headers: HeaderMap, Json(req): Json<UserReq>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
  1730.         // If auth is required, only an authenticated caller can manage users.
  1731.         if st.require_auth {
  1732.             auth_check(&st, &headers).map_err(internal)?;
  1733.         }
  1734.     let (mut tbl, idx_user, _idx_pass) = open_users(&st).map_err(internal)?;
  1735.         // naive: check exists
  1736.         let exists = tbl.select(|r| r[idx_user] == req.username).map_err(internal)?.len() > 0;
  1737.         if exists { return Err((StatusCode::BAD_REQUEST, "user exists".into())); }
  1738.         tbl.insert(vec![engine::Value::S(req.username), engine::Value::S(req.password)]).map_err(internal)?;
  1739.         Ok(Json(json!({"ok": true})))
  1740.     }
  1741.  
  1742.     async fn delete_user(State(st): State<AppState>, headers: HeaderMap, Path(username): Path<String>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
  1743.         if st.require_auth {
  1744.             auth_check(&st, &headers).map_err(internal)?;
  1745.         }
  1746.         let (mut tbl, idx_user, _idx_pass) = open_users(&st).map_err(internal)?;
  1747.         let n = tbl.delete(|r| r[idx_user] == username).map_err(internal)?;
  1748.         Ok(Json(json!({"ok": true, "deleted": n})))
  1749.     }
  1750.  
  1751.     fn ensure_users_table(st: &AppState) -> Result<()> {
  1752.         let schema = schema::Schema {
  1753.             name: "users".into(),
  1754.             columns: vec![
  1755.                 schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
  1756.                 schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
  1757.             ],
  1758.             primary_key: Some("username".into()),
  1759.         };
  1760.        
  1761.         // Use binary storage if canto.db exists, otherwise fall back to CSV
  1762.         let use_legacy = st.legacy || !st.root.join("canto.db").exists();
  1763.         let _ = engine::Table::open(&st.root, schema, use_legacy)?;
  1764.         Ok(())
  1765.     }
  1766.  
  1767.     fn open_users(st: &AppState) -> Result<(engine::Table, usize, usize)> {
  1768.         let sch: schema::Schema = {
  1769.             // Use binary storage if canto.db exists, otherwise read from CSV schema file
  1770.             if !st.legacy && st.root.join("canto.db").exists() {
  1771.                 // Users schema should be in binary catalog
  1772.                 schema::Schema {
  1773.                     name: "users".into(),
  1774.                     columns: vec![
  1775.                         schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
  1776.                         schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
  1777.                     ],
  1778.                     primary_key: Some("username".into()),
  1779.                 }
  1780.             } else {
  1781.                 // Read schema from TOML file
  1782.                 let s = fs::read_to_string(st.root.join("users.schema.toml"))?;
  1783.                 toml::from_str(&s)?
  1784.             }
  1785.         };
  1786.         let idx_user = sch.col_index("username").ok_or_else(|| anyhow!("users.username missing"))?;
  1787.         let idx_pass = sch.col_index("password").ok_or_else(|| anyhow!("users.password missing"))?;
  1788.        
  1789.         // Use binary storage if canto.db exists, otherwise use CSV storage
  1790.         let use_legacy = st.legacy || !st.root.join("canto.db").exists();
  1791.         let tbl = engine::Table::open(&st.root, sch, use_legacy)?;
  1792.         Ok((tbl, idx_user, idx_pass))
  1793.     }
  1794.  
  1795.     fn internal<E: std::fmt::Display>(e: E) -> (StatusCode, String) {
  1796.         (StatusCode::BAD_REQUEST, e.to_string())
  1797.     }
  1798.  
  1799.     fn auth_check(st: &AppState, headers: &HeaderMap) -> Result<()> {
  1800.         let auth = headers.get(http::header::AUTHORIZATION).ok_or_else(|| anyhow!("missing Authorization"))?;
  1801.         let s = auth.to_str().map_err(|_| anyhow!("bad Authorization header"))?;
  1802.         if !s.starts_with("Basic ") { return Err(anyhow!("expect Basic auth")); }
  1803.         let b64 = &s[6..];
  1804.         let decoded = STANDARD.decode(b64)?;
  1805.         let creds = String::from_utf8(decoded).map_err(|_| anyhow!("utf8"))?;
  1806.         let (u, p) = creds.split_once(':').ok_or_else(|| anyhow!("bad basic pair"))?;
  1807.  
  1808.         // verify in users table
  1809.         let (tbl, idx_user, idx_pass) = open_users(st)?;
  1810.         let rows = tbl.select(|r| r[idx_user] == u && r[idx_pass] == p)?;
  1811.         if rows.is_empty() { return Err(anyhow!("invalid credentials")); }
  1812.         Ok(())
  1813.     }
  1814. }
  1815.  
  1816. // ================================
  1817. // Migration (Users CSV -> Binary)
  1818. // ================================
  1819. mod migration {
  1820.     use super::{schema, storage};
  1821.     use anyhow::Result;
  1822.     use std::path::Path;
  1823.     use std::fs;
  1824.  
  1825.     pub fn migrate_users_to_binary(root: &Path) -> Result<()> {
  1826.         // Step 1: Check if migration is needed
  1827.         if !legacy_users_exist(root) {
  1828.             // No legacy users files to migrate
  1829.             return Ok(());
  1830.         }
  1831.  
  1832.         if binary_exists(root) {
  1833.             // Binary storage exists, check if users table is already there
  1834.             if users_present_in_binary(root)? {
  1835.                 // Users already migrated, just archive legacy files if they still exist
  1836.                 println!("Users table already exists in binary storage, archiving legacy files...");
  1837.                 archive_legacy_users(root)?;
  1838.                 return Ok(());
  1839.             }
  1840.         }
  1841.  
  1842.         println!("Migrating users table from CSV to binary storage...");
  1843.  
  1844.         // Step 2: Read legacy users data
  1845.         let legacy_rows = read_legacy_users(root)?;
  1846.         println!("Found {} user(s) in legacy CSV", legacy_rows.len());
  1847.  
  1848.         // Step 3: Ensure binary storage and users schema exist
  1849.         ensure_users_in_binary(root)?;
  1850.  
  1851.         // Step 4: Import users data
  1852.         import_users_to_binary(root, &legacy_rows)?;
  1853.  
  1854.         // Step 5: Archive legacy files
  1855.         archive_legacy_users(root)?;
  1856.  
  1857.         println!("Users migration completed successfully!");
  1858.         Ok(())
  1859.     }
  1860.  
  1861.     fn legacy_users_exist(root: &Path) -> bool {
  1862.         let csv_path = root.join("users.csv");
  1863.         let schema_path = root.join("users.schema.toml");
  1864.         csv_path.exists() && schema_path.exists()
  1865.     }
  1866.  
  1867.     fn binary_exists(root: &Path) -> bool {
  1868.         root.join("canto.db").exists()
  1869.     }
  1870.  
  1871.     fn users_present_in_binary(root: &Path) -> Result<bool> {
  1872.         if !binary_exists(root) {
  1873.             return Ok(false);
  1874.         }
  1875.  
  1876.         let lock_path = root.join("canto.lock");
  1877.         let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
  1878.        
  1879.         if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
  1880.             let existing_schemas: std::collections::HashMap<String, schema::Schema> =
  1881.                 toml::from_str(&catalog_toml).unwrap_or_default();
  1882.             Ok(existing_schemas.contains_key("users"))
  1883.         } else {
  1884.             Ok(false)
  1885.         }
  1886.     }
  1887.  
  1888.     fn read_legacy_users(root: &Path) -> Result<Vec<Vec<String>>> {
  1889.         let csv_path = root.join("users.csv");
  1890.         let mut rows = Vec::new();
  1891.  
  1892.         if csv_path.exists() {
  1893.             let file = std::fs::File::open(&csv_path)?;
  1894.             let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(file);
  1895.             for record in rdr.records() {
  1896.                 let record = record?;
  1897.                 let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
  1898.                 rows.push(row);
  1899.             }
  1900.         }
  1901.  
  1902.         Ok(rows)
  1903.     }
  1904.  
  1905.     fn ensure_users_in_binary(root: &Path) -> Result<()> {
  1906.         let lock_path = root.join("canto.lock");
  1907.         let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
  1908.  
  1909.         // Create users schema
  1910.         let users_schema = schema::Schema {
  1911.             name: "users".into(),
  1912.             columns: vec![
  1913.                 schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
  1914.                 schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
  1915.             ],
  1916.             primary_key: Some("username".into()),
  1917.         };
  1918.  
  1919.         // Read existing catalog or create new one
  1920.         let mut all_schemas: std::collections::HashMap<String, schema::Schema> =
  1921.             if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
  1922.                 toml::from_str(&catalog_toml).unwrap_or_default()
  1923.             } else {
  1924.                 std::collections::HashMap::new()
  1925.             };
  1926.  
  1927.         // Add or update users schema
  1928.         all_schemas.insert("users".to_string(), users_schema);
  1929.  
  1930.         // Write updated catalog
  1931.         let catalog_toml = toml::to_string_pretty(&all_schemas)?;
  1932.         binary_storage.write_catalog(&catalog_toml)?;
  1933.  
  1934.         Ok(())
  1935.     }
  1936.  
  1937.     fn import_users_to_binary(root: &Path, rows: &[Vec<String>]) -> Result<()> {
  1938.         if rows.is_empty() {
  1939.             return Ok(());
  1940.         }
  1941.  
  1942.         let lock_path = root.join("canto.lock");
  1943.         let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
  1944.  
  1945.         // Import user rows in a single batch
  1946.         let seq_no = 1u64; // Start with sequence 1 for users
  1947.         let data_offset = binary_storage.append_table_data("users", seq_no, rows)?;
  1948.  
  1949.         // Build primary key index for users (username is PK)
  1950.         let mut pk_index = std::collections::BTreeMap::new();
  1951.         for (row_no, row) in rows.iter().enumerate() {
  1952.             if let Some(username) = row.get(0) { // username is first column
  1953.                 pk_index.insert(username.clone(), row_no as u64);
  1954.             }
  1955.         }
  1956.         let pk_index_offset = binary_storage.write_pk_index("users", &pk_index)?;
  1957.  
  1958.         // Write checkpoint with users table info
  1959.         let checkpoint = storage::TableCheckpoint {
  1960.             table_name: "users".to_string(),
  1961.             last_seq_no: seq_no,
  1962.             last_data_offset: data_offset,
  1963.             pk_index_offset,
  1964.         };
  1965.         binary_storage.write_checkpoint(&[checkpoint])?;
  1966.  
  1967.         Ok(())
  1968.     }
  1969.  
  1970.     fn archive_legacy_users(root: &Path) -> Result<()> {
  1971.         let legacy_dir = root.join("legacy");
  1972.         fs::create_dir_all(&legacy_dir)?;
  1973.  
  1974.         let files_to_archive = ["users.csv", "users.schema.toml", "users.idx", "users.lock"];
  1975.        
  1976.         for filename in &files_to_archive {
  1977.             let src_path = root.join(filename);
  1978.             if src_path.exists() {
  1979.                 let dest_path = legacy_dir.join(filename);
  1980.                 fs::rename(&src_path, &dest_path)?;
  1981.                 println!("Archived {} to legacy/", filename);
  1982.             }
  1983.         }
  1984.  
  1985.         Ok(())
  1986.     }
  1987. }
  1988.  
  1989. // ================================
  1990. // CLI client
  1991. // ================================
  1992. mod client {
  1993.     use super::Args;
  1994.     use anyhow::{anyhow, Result};
  1995.     use base64::engine::general_purpose::STANDARD;
  1996.     use base64::Engine as _;
  1997.     use std::fs;
  1998.     use serde_json::json;
  1999.     use reqwest::Client;
  2000.  
  2001.     pub async fn run_client(args: Args) -> Result<()> {
  2002.         let sql = if let Some(f) = args.file {
  2003.             fs::read_to_string(&f)?
  2004.         } else {
  2005.             return Err(anyhow!("client mode requires -f <sql-file>"));
  2006.         };
  2007.  
  2008.         let url = format!("{}/v1/query", args.url.trim_end_matches('/'));
  2009.         let client = Client::new();
  2010.         let mut req = client.post(&url).header("content-type", "application/json");
  2011.         if let (Some(u), Some(p)) = (args.username.as_ref(), args.password.as_ref()) {
  2012.             let token = STANDARD.encode(format!("{}:{}", u, p));
  2013.             req = req.header("authorization", format!("Basic {}", token));
  2014.         }
  2015.         let resp = req.body(serde_json::to_vec(&json!({"sql": sql}))?).send().await?;
  2016.         let status = resp.status();
  2017.         let bytes = resp.bytes().await?;
  2018.         if !status.is_success() {
  2019.             println!("Error {}: {}", status, String::from_utf8_lossy(&bytes));
  2020.             return Ok(());
  2021.         }
  2022.         println!("{}", String::from_utf8_lossy(&bytes));
  2023.         Ok(())
  2024.     }
  2025. }
  2026.  
  2027.  
Add Comment
Please, Sign In to add comment