Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Canto — Minimal CSV/Binary Table Server
- **Canto** is a lightweight, embedded database engine that can run:
- - As a **standalone RESTful server** on port `5432` (default)
- - In **client mode** to send SQL queries
- - As a **Rust library** inside another program
- It supports **two storage backends**:
- 1. **Binary file** (`data/canto.db`) — default, fast, append-only
- 2. **CSV + schema TOML** — for human-readable table storage
- ## 1. Quick Start
- ### a. Run the server (default)
- ```bash
- cargo run --release
- ````
- This starts Canto in **server mode** on `0.0.0.0:5432`.
- ### b. Run the client
- ```bash
- cargo run --release -- -c -f "queries.sql" -u myuser -p mypass
- ```
- * `-c` = client mode
- * `-f` = SQL file to execute
- * `-u` / `-p` = username / password (optional; no auth if omitted)
- ### c. Example with `curl`
- ```bash
- curl -X POST http://localhost:5432/query \
- -H "Content-Type: application/sql" \
- --data "SELECT * FROM movies ORDER BY year DESC LIMIT 3;"
- ```
- ## 2. Valid SQL Statements
- Canto implements a **subset of SQL** with a custom parser (no external libs).
- Supported:
- * `CREATE TABLE table_name (col1 TYPE, col2 TYPE, ...)`
- * `DROP TABLE table_name`
- * `INSERT INTO table_name VALUES (v1, v2, ...)`
- * `SELECT col1, col2 FROM table_name [WHERE col = value] [ORDER BY col [ASC|DESC]] [LIMIT n]`
- * `UPDATE table_name SET col = value [WHERE ...]`
- * `DELETE FROM table_name [WHERE ...]`
- * `CREATE USER username PASSWORD 'pass'`
- * `DROP USER username`
- Column types:
- * `STRING`
- * `INT`
- * `FLOAT`
- * `BOOL`
- * `DATE`
- ## 3. Storage Backends
- ### Binary Mode (default)
- * Single file: `data/canto.db`
- * Append-only blocks for crash safety
- * Periodic checkpoints store schema and indexes
- * Fast startup: load latest checkpoint, replay recent blocks
- * No need to keep CSV files
- ### CSV Mode
- * Each table: `data/{table}.csv`
- * Schema file: `data/{table}.schema.toml`
- * Slower than binary, but human-editable
- ## 4. Example SQL Queries
- Top 3 movies with director & studio sorted by year:
- ```sql
- SELECT title, director, studio, year
- FROM movies
- ORDER BY year DESC
- LIMIT 3;
- ```
- Join example (albums + bands + members):
- ```sql
- SELECT albums.name, bands.name, members.name, albums.year
- FROM albums
- JOIN bands ON albums.band_id = bands.id
- JOIN members ON bands.id = members.band_id
- ORDER BY albums.year DESC;
- ```
- ## 5. Using as a Library
- Canto can be embedded in your own Rust applications.
- ### Cargo.toml
- ```toml
- [dependencies]
- canto = { path = "../canto" }
- ```
- ### Example
- ```rust
- use canto::engine::Engine;
- use canto::storage::StorageMode;
- fn main() {
- // Start engine in binary mode
- let mut engine = Engine::new(StorageMode::Binary, "data/canto.db");
- engine.execute("CREATE TABLE test (id INT, name STRING);");
- engine.execute("INSERT INTO test VALUES (1, 'Alice');");
- let rows = engine.execute("SELECT * FROM test;");
- println!("{:?}", rows);
- }
- ```
- You can initialize in `StorageMode::Csv` to use CSV+schema storage instead.
- cargo.toml
- [package]
- name = "canto"
- version = "0.2.0"
- edition = "2021"
- [dependencies]
- reqwest = { version = "0.12", features = ["json", "blocking"] }
- regex = "1.10"
- anyhow = "1"
- thiserror = "1"
- csv = "1.3"
- serde = { version = "1", features = ["derive"] }
- serde_json = "1"
- toml = "0.8"
- bincode = "1"
- fs2 = "0.4"
- crc32fast = "1"
- lazy_static = "1.4"
- clap = { version = "4", features = ["derive"] }
- tokio = { version = "1", features = ["full"] }
- axum = "0.7"
- hyper = { version = "1", features = ["full"] }
- http = "1"
- base64 = "0.22"
- main.rs
- //! Canto: tiny CSV-as-table DB with a REST server, minimal SQL, and a CLI client.
- //! - Default: runs REST server on 0.0.0.0:5432
- //! - Client mode: `canto -c -f movie_database.sql [-u user -p pass]`
- //! - Very small SQL: CREATE TABLE, INSERT INTO ... VALUES (...), SELECT ... JOIN ... ORDER BY ... LIMIT n
- //! - Users: POST /v1/users {username,password}, DELETE /v1/users/{username}
- //! - Start with `--require-auth` to enforce Basic Auth (default: open)
- use anyhow::Result;
- use clap::{ArgAction, Parser};
- use std::{fs, path::PathBuf};
- #[derive(Parser, Debug)]
- #[command(name="canto", version, about="Tiny CSV-table DB with REST + minimal SQL")]
- struct Args {
- /// Data directory (tables, indexes, schema files)
- #[arg(long, default_value = "./data")]
- data_dir: PathBuf,
- /// Server bind address
- #[arg(long, default_value = "0.0.0.0:5432")]
- bind: String,
- /// Require HTTP Basic Auth on all endpoints
- #[arg(long, action=ArgAction::SetTrue, default_value_t=false)]
- require_auth: bool,
- /// Client mode (send SQL file to server via REST)
- #[arg(short='c', long, action=ArgAction::SetTrue, default_value_t=false)]
- client: bool,
- /// SQL file to execute (client mode)
- #[arg(short='f', long)]
- file: Option<PathBuf>,
- /// Server URL for client mode
- #[arg(long, default_value = "http://127.0.0.1:5432")]
- url: String,
- /// Username for Basic Auth (optional; required if server enforces it)
- #[arg(short='u', long)]
- username: Option<String>,
- /// Password for Basic Auth (optional; required if server enforces it)
- #[arg(short='p', long)]
- password: Option<String>,
- /// Use legacy CSV storage format instead of binary canto.db
- #[arg(short='l', long, action=ArgAction::SetTrue, default_value_t=false)]
- legacy: bool,
- }
- #[tokio::main]
- async fn main() -> Result<()> {
- let args = Args::parse();
- fs::create_dir_all(&args.data_dir).ok();
- if args.client {
- return client::run_client(args).await;
- } else {
- return server::run_server(args).await;
- }
- }
- // ================================
- // Catalog / Schema
- // ================================
- mod schema {
- use serde::{Deserialize, Serialize};
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub enum DataType { String, Int, Float, Bool, Date }
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct Column {
- pub name: String,
- pub dtype: DataType,
- pub nullable: bool,
- }
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct Schema {
- pub name: String,
- pub columns: Vec<Column>,
- pub primary_key: Option<String>, // by column name
- }
- impl Schema {
- pub fn col_index(&self, name: &str) -> Option<usize> {
- self.columns.iter().position(|c| c.name == name)
- }
- pub fn headers(&self) -> Vec<String> {
- self.columns.iter().map(|c| c.name.clone()).collect()
- }
- pub fn table_csv_path(&self, root: &std::path::Path) -> std::path::PathBuf {
- root.join(format!("{}.csv", self.name))
- }
- pub fn schema_toml_path(&self, root: &std::path::Path) -> std::path::PathBuf {
- root.join(format!("{}.schema.toml", self.name))
- }
- pub fn idx_path(&self, root: &std::path::Path) -> std::path::PathBuf {
- root.join(format!("{}.idx", self.name))
- }
- pub fn lock_path(&self, root: &std::path::Path) -> std::path::PathBuf {
- root.join(format!("{}.lock", self.name))
- }
- }
- }
- // ================================
- // Storage (bytes on disk)
- // ================================
- mod storage {
- use anyhow::Result;
- use csv::{ReaderBuilder, WriterBuilder};
- use fs2::FileExt;
- use std::{
- fs::{self, File, OpenOptions},
- io::{BufReader, BufWriter, Read, Write, Seek, SeekFrom},
- path::{Path, PathBuf},
- collections::BTreeMap,
- };
- pub struct CsvStorage {
- pub csv_path: PathBuf,
- pub schema_path: PathBuf,
- pub idx_path: PathBuf,
- pub lock_path: PathBuf,
- }
- impl CsvStorage {
- pub fn open(
- csv_path: PathBuf,
- schema_path: PathBuf,
- idx_path: PathBuf,
- lock_path: PathBuf,
- schema: &super::schema::Schema,
- ) -> Result<Self> {
- let s = Self { csv_path, schema_path, idx_path, lock_path };
- s.init_if_missing(schema)?;
- Ok(s)
- }
- fn init_if_missing(&self, schema: &super::schema::Schema) -> Result<()> {
- if !self.csv_path.exists() {
- let file = File::create(&self.csv_path)?;
- let mut w = WriterBuilder::new().has_headers(true).from_writer(BufWriter::new(file));
- let headers = schema.headers();
- let hdr: Vec<&str> = headers.iter().map(|s| s.as_str()).collect();
- w.write_record(hdr)?;
- w.flush()?;
- }
- Ok(())
- }
- fn lock_exclusive(&self) -> Result<File> {
- let f = OpenOptions::new().create(true).read(true).write(true).open(&self.lock_path)?;
- f.lock_exclusive()?; // unlock on drop
- Ok(f)
- }
- pub fn write_schema_toml(&self, toml_str: &str) -> Result<()> {
- let tmp = self.schema_path.with_extension("toml.tmp");
- fs::write(&tmp, toml_str)?;
- fs::rename(tmp, &self.schema_path)?;
- Ok(())
- }
- pub fn append_record(&self, record: &[String]) -> Result<()> {
- let _guard = self.lock_exclusive()?;
- let file = OpenOptions::new().create(true).append(true).open(&self.csv_path)?;
- let mut wtr = WriterBuilder::new().has_headers(false).from_writer(file);
- wtr.write_record(record)?;
- wtr.flush()?;
- Ok(())
- }
- pub fn scan<F: FnMut(&[String]) -> Result<()>>(&self, mut f: F) -> Result<()> {
- let file = File::open(&self.csv_path)?;
- let rdr = BufReader::new(file);
- let mut rdr = ReaderBuilder::new().has_headers(true).from_reader(rdr);
- for rec in rdr.records() {
- let rec = rec?;
- let row: Vec<String> = rec.iter().map(|s| s.to_string()).collect();
- f(&row)?;
- }
- Ok(())
- }
- pub fn rewrite(&self, rows: &[Vec<String>]) -> Result<()> {
- let _guard = self.lock_exclusive()?;
- let tmp = self.csv_path.with_extension("csv.tmp");
- {
- let file = File::create(&tmp)?;
- let mut w = WriterBuilder::new().has_headers(true).from_writer(BufWriter::new(file));
- // Use schema headers for rewrite; assume schema is available via self.schema_path
- let schema_str = std::fs::read_to_string(&self.schema_path)?;
- let schema: super::schema::Schema = toml::from_str(&schema_str)?;
- let headers = schema.headers();
- let hdr: Vec<&str> = headers.iter().map(|s| s.as_str()).collect();
- w.write_record(hdr)?;
- for r in rows {
- w.write_record(r)?;
- }
- w.flush()?;
- }
- fs::rename(tmp, &self.csv_path)?;
- Ok(())
- }
- pub fn write_index_bytes(&self, bytes: &[u8]) -> Result<()> {
- let _guard = self.lock_exclusive()?;
- let tmp = self.idx_path.with_extension("idx.tmp");
- fs::write(&tmp, bytes)?;
- fs::rename(tmp, &self.idx_path)?;
- Ok(())
- }
- pub fn read_index_bytes(&self) -> Result<Option<Vec<u8>>> {
- if self.idx_path.exists() {
- let b = fs::read(&self.idx_path)?;
- Ok(Some(b))
- } else {
- Ok(None)
- }
- }
- }
- pub fn schema_to_toml<T: serde::Serialize>(value: &T) -> Result<String> {
- Ok(toml::to_string_pretty(value)?)
- }
- // ================================
- // Storage Backend Enum
- // ================================
- pub enum StorageBackend {
- Csv(CsvStorage),
- Binary(BinaryStorage, String), // Binary storage + table name
- }
- impl StorageBackend {
- pub fn append_record(&self, record: &[String]) -> Result<()> {
- match self {
- StorageBackend::Csv(csv) => csv.append_record(record),
- StorageBackend::Binary(binary, table_name) => {
- // For binary storage, we need to write individual records as single-row batches
- // Use a simple sequence number (this could be improved with proper tracking)
- let seq_no = std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap_or_default()
- .as_millis() as u64;
- let rows = vec![record.to_vec()];
- binary.append_table_data(table_name, seq_no, &rows)?;
- Ok(())
- }
- }
- }
- pub fn scan<F: FnMut(&[String]) -> Result<()>>(&self, mut f: F) -> Result<()> {
- match self {
- StorageBackend::Csv(csv) => csv.scan(f),
- StorageBackend::Binary(binary, table_name) => {
- // Scan only the specified table's data
- binary.scan_table_data(|scan_table_name, _seq_no, rows| {
- if scan_table_name == table_name {
- for row in rows {
- f(row)?;
- }
- }
- Ok(())
- })
- }
- }
- }
- pub fn rewrite(&self, rows: &[Vec<String>]) -> Result<()> {
- match self {
- StorageBackend::Csv(csv) => csv.rewrite(rows),
- StorageBackend::Binary(_, _) => {
- // Binary storage rewrite will be different
- Ok(())
- }
- }
- }
- pub fn write_index_bytes(&self, bytes: &[u8]) -> Result<()> {
- match self {
- StorageBackend::Csv(csv) => csv.write_index_bytes(bytes),
- StorageBackend::Binary(_, _) => {
- // Binary storage handles indexes differently
- Ok(())
- }
- }
- }
- pub fn read_index_bytes(&self) -> Result<Option<Vec<u8>>> {
- match self {
- StorageBackend::Csv(csv) => csv.read_index_bytes(),
- StorageBackend::Binary(_, _) => {
- // Binary storage handles indexes differently
- Ok(None)
- }
- }
- }
- }
- // ================================
- // Binary Storage Implementation
- // ================================
- // Constants for binary file format
- const MAGIC: &[u8; 8] = b"CANTO\0\0\0";
- const VERSION: u16 = 1;
- const HEADER_SIZE: usize = 64;
- // Block tags
- const TAG_CATALOG: u8 = 0x10;
- const TAG_CHECKPOINT: u8 = 0x20;
- const TAG_DATA: u8 = 0x30;
- const TAG_INDEX: u8 = 0x40;
- // Header offsets
- const HEADER_LAST_CP_OFFSET: u64 = 24; // offset in header where last_checkpoint_offset is stored
- #[derive(Debug)]
- pub struct FileHeader {
- pub version: u16,
- pub flags: u16,
- pub creation_unix_ms: u64,
- pub last_checkpoint_offset: u64,
- pub file_size_at_checkpoint: u64,
- }
- impl FileHeader {
- fn new() -> Self {
- Self {
- version: VERSION,
- flags: 0,
- creation_unix_ms: std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap_or_default()
- .as_millis() as u64,
- last_checkpoint_offset: 0,
- file_size_at_checkpoint: 0,
- }
- }
- fn write_to<W: Write>(&self, mut writer: W) -> Result<()> {
- writer.write_all(MAGIC)?;
- writer.write_all(&self.version.to_le_bytes())?;
- writer.write_all(&self.flags.to_le_bytes())?;
- writer.write_all(&self.creation_unix_ms.to_le_bytes())?;
- writer.write_all(&self.last_checkpoint_offset.to_le_bytes())?;
- writer.write_all(&self.file_size_at_checkpoint.to_le_bytes())?;
- // Pad to 64 bytes
- let remaining = HEADER_SIZE - 8 - 2 - 2 - 8 - 8 - 8;
- writer.write_all(&vec![0u8; remaining])?;
- Ok(())
- }
- fn read_from<R: Read>(mut reader: R) -> Result<Self> {
- let mut magic = [0u8; 8];
- reader.read_exact(&mut magic)?;
- if &magic != MAGIC {
- return Err(anyhow::anyhow!("Invalid magic bytes"));
- }
- let mut buf = [0u8; 8];
- reader.read_exact(&mut buf[0..2])?;
- let version = u16::from_le_bytes([buf[0], buf[1]]);
- reader.read_exact(&mut buf[0..2])?;
- let flags = u16::from_le_bytes([buf[0], buf[1]]);
- reader.read_exact(&mut buf)?;
- let creation_unix_ms = u64::from_le_bytes(buf);
- reader.read_exact(&mut buf)?;
- let last_checkpoint_offset = u64::from_le_bytes(buf);
- reader.read_exact(&mut buf)?;
- let file_size_at_checkpoint = u64::from_le_bytes(buf);
- // Skip remaining padding
- let remaining = HEADER_SIZE - 8 - 2 - 2 - 8 - 8 - 8;
- let mut padding = vec![0u8; remaining];
- reader.read_exact(&mut padding)?;
- Ok(Self {
- version,
- flags,
- creation_unix_ms,
- last_checkpoint_offset,
- file_size_at_checkpoint,
- })
- }
- }
- #[derive(Debug, Clone)]
- pub struct TableCheckpoint {
- pub table_name: String,
- pub last_seq_no: u64,
- pub last_data_offset: u64,
- pub pk_index_offset: u64,
- }
- pub struct BinaryStorage {
- pub db_path: PathBuf,
- pub lock_path: PathBuf,
- }
- impl BinaryStorage {
- pub fn open(db_path: PathBuf, lock_path: PathBuf) -> Result<Self> {
- let storage = Self { db_path, lock_path };
- storage.init_if_missing()?;
- Ok(storage)
- }
- fn init_if_missing(&self) -> Result<()> {
- if !self.db_path.exists() {
- let mut file = File::create(&self.db_path)?;
- let header = FileHeader::new();
- header.write_to(&mut file)?;
- file.flush()?;
- }
- Ok(())
- }
- fn lock_exclusive(&self) -> Result<File> {
- let f = OpenOptions::new().create(true).read(true).write(true).open(&self.lock_path)?;
- f.lock_exclusive()?; // unlock on drop
- Ok(f)
- }
- fn crc32(bytes: &[u8]) -> u32 {
- crc32fast::hash(bytes)
- }
- fn append_block(&self, tag: u8, payload: &[u8]) -> Result<u64> {
- let _guard = self.lock_exclusive()?;
- let mut file = OpenOptions::new().read(true).write(true).open(&self.db_path)?;
- let offset = file.seek(SeekFrom::End(0))?;
- let length = payload.len() as u32;
- let crc = Self::crc32(payload);
- file.write_all(&[tag])?;
- file.write_all(&length.to_le_bytes())?;
- file.write_all(&crc.to_le_bytes())?;
- file.write_all(payload)?;
- file.flush()?;
- Ok(offset)
- }
- pub fn write_checkpoint(&self, checkpoints: &[TableCheckpoint]) -> Result<u64> {
- let mut payload = Vec::new();
- // Serialize checkpoints
- for cp in checkpoints {
- let name_bytes = cp.table_name.as_bytes();
- payload.extend((name_bytes.len() as u16).to_le_bytes());
- payload.extend(name_bytes);
- payload.extend(cp.last_seq_no.to_le_bytes());
- payload.extend(cp.last_data_offset.to_le_bytes());
- payload.extend(cp.pk_index_offset.to_le_bytes());
- }
- let offset = self.append_block(TAG_CHECKPOINT, &payload)?;
- // Update header with checkpoint offset
- let mut file = OpenOptions::new().read(true).write(true).open(&self.db_path)?;
- file.seek(SeekFrom::Start(HEADER_LAST_CP_OFFSET))?;
- file.write_all(&offset.to_le_bytes())?;
- file.flush()?;
- Ok(offset)
- }
- pub fn write_catalog(&self, catalog_toml: &str) -> Result<u64> {
- self.append_block(TAG_CATALOG, catalog_toml.as_bytes())
- }
- pub fn append_table_data(&self, table_name: &str, seq_no: u64, rows: &[Vec<String>]) -> Result<u64> {
- let mut payload = Vec::new();
- // Table name
- let name_bytes = table_name.as_bytes();
- payload.extend((name_bytes.len() as u16).to_le_bytes());
- payload.extend(name_bytes);
- // Sequence number and row count
- payload.extend(seq_no.to_le_bytes());
- payload.extend((rows.len() as u32).to_le_bytes());
- // Column count (from first row if any)
- let col_count = rows.first().map(|r| r.len()).unwrap_or(0) as u16;
- payload.extend(col_count.to_le_bytes());
- // Rows data
- for row in rows {
- for col in row {
- let col_bytes = col.as_bytes();
- payload.extend((col_bytes.len() as u32).to_le_bytes());
- payload.extend(col_bytes);
- }
- }
- self.append_block(TAG_DATA, &payload)
- }
- pub fn write_pk_index(&self, table_name: &str, index: &BTreeMap<String, u64>) -> Result<u64> {
- let mut payload = Vec::new();
- // Table name
- let name_bytes = table_name.as_bytes();
- payload.extend((name_bytes.len() as u16).to_le_bytes());
- payload.extend(name_bytes);
- // PK column index (placeholder - we'll improve this later)
- payload.extend(0u16.to_le_bytes());
- // Entry count
- payload.extend((index.len() as u32).to_le_bytes());
- // Entries
- for (key, row_pointer) in index {
- let key_bytes = key.as_bytes();
- payload.extend((key_bytes.len() as u16).to_le_bytes());
- payload.extend(key_bytes);
- payload.extend(row_pointer.to_le_bytes());
- }
- self.append_block(TAG_INDEX, &payload)
- }
- fn read_header(&self) -> Result<FileHeader> {
- let mut file = File::open(&self.db_path)?;
- FileHeader::read_from(&mut file)
- }
- pub fn read_catalog(&self) -> Result<Option<String>> {
- let _header = self.read_header()?;
- // Always start from after the header to look for catalog blocks
- let start_offset = HEADER_SIZE as u64;
- let mut file = File::open(&self.db_path)?;
- file.seek(SeekFrom::Start(start_offset))?;
- // Read blocks looking for catalog - keep the LAST one found
- let mut latest_catalog: Option<String> = None;
- loop {
- let mut tag_buf = [0u8; 1];
- if file.read_exact(&mut tag_buf).is_err() {
- break;
- }
- let tag = tag_buf[0];
- let mut len_buf = [0u8; 4];
- file.read_exact(&mut len_buf)?;
- let length = u32::from_le_bytes(len_buf);
- let mut crc_buf = [0u8; 4];
- file.read_exact(&mut crc_buf)?;
- let expected_crc = u32::from_le_bytes(crc_buf);
- if tag == TAG_CATALOG {
- let mut payload = vec![0u8; length as usize];
- file.read_exact(&mut payload)?;
- // Verify CRC
- let computed_crc = Self::crc32(&payload);
- if computed_crc != expected_crc {
- return Err(anyhow::anyhow!("Catalog block CRC mismatch"));
- }
- // Keep this catalog and continue looking for newer ones
- latest_catalog = Some(String::from_utf8(payload)?);
- } else {
- // Skip this block
- file.seek(SeekFrom::Current(length as i64))?;
- }
- }
- Ok(latest_catalog)
- }
- pub fn scan_table_data<F: FnMut(&str, u64, &[Vec<String>]) -> Result<()>>(&self, mut callback: F) -> Result<()> {
- // For now, scan all data blocks from the beginning to ensure we don't miss any data
- // This is simpler and ensures we read all data, including data written before the latest checkpoint
- let start_offset = HEADER_SIZE as u64;
- let mut file = File::open(&self.db_path)?;
- file.seek(SeekFrom::Start(start_offset))?;
- loop {
- let mut tag_buf = [0u8; 1];
- if file.read_exact(&mut tag_buf).is_err() {
- break;
- }
- let tag = tag_buf[0];
- let mut len_buf = [0u8; 4];
- file.read_exact(&mut len_buf)?;
- let length = u32::from_le_bytes(len_buf);
- let mut crc_buf = [0u8; 4];
- file.read_exact(&mut crc_buf)?;
- let _crc = u32::from_le_bytes(crc_buf);
- if tag == TAG_DATA {
- let mut payload = vec![0u8; length as usize];
- file.read_exact(&mut payload)?;
- // Verify CRC
- let computed_crc = Self::crc32(&payload);
- if computed_crc != _crc {
- return Err(anyhow::anyhow!("Data block CRC mismatch"));
- }
- // Parse data block
- let mut cursor = 0;
- // Table name
- let name_len = u16::from_le_bytes([payload[cursor], payload[cursor + 1]]) as usize;
- cursor += 2;
- let table_name = String::from_utf8(payload[cursor..cursor + name_len].to_vec())?;
- cursor += name_len;
- // Sequence number
- let seq_no = u64::from_le_bytes([
- payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3],
- payload[cursor + 4], payload[cursor + 5], payload[cursor + 6], payload[cursor + 7]
- ]);
- cursor += 8;
- // Row count
- let row_count = u32::from_le_bytes([
- payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3]
- ]) as usize;
- cursor += 4;
- // Column count
- let col_count = u16::from_le_bytes([payload[cursor], payload[cursor + 1]]) as usize;
- cursor += 2;
- // Parse rows
- let mut rows = Vec::with_capacity(row_count);
- for _ in 0..row_count {
- let mut row = Vec::with_capacity(col_count);
- for _ in 0..col_count {
- let col_len = u32::from_le_bytes([
- payload[cursor], payload[cursor + 1], payload[cursor + 2], payload[cursor + 3]
- ]) as usize;
- cursor += 4;
- let col_str = String::from_utf8(payload[cursor..cursor + col_len].to_vec())?;
- cursor += col_len;
- row.push(col_str);
- }
- rows.push(row);
- }
- callback(&table_name, seq_no, &rows)?;
- } else {
- // Skip this block
- file.seek(SeekFrom::Current(length as i64))?;
- }
- }
- Ok(())
- }
- }
- }
- // ================================
- // Engine (ops on rows)
- // ================================
- mod engine {
- use super::schema::Schema;
- use super::storage::{schema_to_toml, CsvStorage, StorageBackend};
- use anyhow::{anyhow, Context, Result};
- use bincode;
- use std::{collections::BTreeMap, path::Path};
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub enum Value { S(String), I(i64), F(f64), B(bool) }
- use serde::{Serialize, Deserialize};
- pub struct Table {
- pub schema: Schema,
- storage: StorageBackend,
- pk_index: Option<BTreeMap<String, u64>>, // pk -> rowno
- // headers: Vec<String>,
- }
- impl Table {
- pub fn open(root: &Path, schema: Schema, legacy: bool) -> Result<Self> {
- let canto_db_path = root.join("canto.db");
- let storage = if legacy {
- // Use CSV storage (legacy mode explicitly requested)
- let schema_toml = schema_to_toml(&schema)?;
- let csv_storage = super::storage::CsvStorage::open(
- schema.table_csv_path(root),
- schema.schema_toml_path(root),
- schema.idx_path(root),
- schema.lock_path(root),
- &schema,
- )?;
- csv_storage.write_schema_toml(&schema_toml)?;
- StorageBackend::Csv(csv_storage)
- } else {
- // Use binary storage (default)
- let lock_path = root.join("canto.lock");
- let binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
- // If this is a new binary storage and this table doesn't exist in catalog yet,
- // we need to add it to the catalog
- if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
- let existing_schemas: std::collections::HashMap<String, super::schema::Schema> =
- toml::from_str(&catalog_toml).unwrap_or_default();
- if !existing_schemas.contains_key(&schema.name) {
- // Add this schema to the catalog
- let mut all_schemas = existing_schemas;
- all_schemas.insert(schema.name.clone(), schema.clone());
- let new_catalog_toml = toml::to_string_pretty(&all_schemas)?;
- binary_storage.write_catalog(&new_catalog_toml)?;
- }
- } else {
- // No catalog exists yet, create one with this schema
- let mut all_schemas = std::collections::HashMap::new();
- all_schemas.insert(schema.name.clone(), schema.clone());
- let catalog_toml = toml::to_string_pretty(&all_schemas)?;
- binary_storage.write_catalog(&catalog_toml)?;
- }
- StorageBackend::Binary(binary_storage, schema.name.clone())
- };
- let mut t = Self { schema, storage, pk_index: None };
- if let Some(pk) = t.schema.primary_key.clone() {
- let pk_idx = t.schema.col_index(&pk).ok_or_else(|| anyhow!("bad PK"))?;
- if let Some(bytes) = t.storage.read_index_bytes()? {
- let map: BTreeMap<String, u64> = bincode::deserialize(&bytes)?;
- t.pk_index = Some(map);
- } else {
- let mut map = BTreeMap::new();
- let mut rowno: u64 = 0;
- t.storage.scan(|row| {
- if let Some(val) = row.get(pk_idx) { map.insert(val.clone(), rowno); }
- rowno += 1;
- Ok(())
- })?;
- t.pk_index = Some(map.clone());
- let bytes = bincode::serialize(&map)?;
- t.storage.write_index_bytes(&bytes)?;
- }
- }
- Ok(t)
- }
- pub fn insert(&mut self, row: Vec<Value>) -> Result<()> {
- if row.len() != self.schema.columns.len() { return Err(anyhow!("column count mismatch")); }
- let ser: Vec<String> = row.into_iter().map(|v| match v {
- Value::S(s) => s, Value::I(i) => i.to_string(), Value::F(f)=>f.to_string(), Value::B(b)=>b.to_string()
- }).collect();
- if let Some(pk) = self.schema.primary_key.clone() {
- let pk_idx = self.schema.col_index(&pk).unwrap();
- if let Some(idx) = &mut self.pk_index {
- let key = ser[pk_idx].clone();
- let new_rowno = idx.len() as u64;
- idx.insert(key, new_rowno);
- let bytes = bincode::serialize(idx)?;
- self.storage.write_index_bytes(&bytes)?;
- }
- }
- self.storage.append_record(&ser)
- }
- pub fn select<F: Fn(&[String]) -> bool>(&self, pred: F) -> Result<Vec<Vec<String>>> {
- let mut out = Vec::new();
- self.storage.scan(|row| { if pred(row) { out.push(row.to_vec()); } Ok(()) })?;
- Ok(out)
- }
- // pub fn update<F: Fn(&[String]) -> bool, G: Fn(&mut [String])>(&mut self, pred: F, patch: G) -> Result<usize> {
- // let mut rows: Vec<Vec<String>> = Vec::new();
- // self.storage.scan(|row| { rows.push(row.to_vec()); Ok(()) })?;
- //
- // let mut n = 0usize;
- // for r in rows.iter_mut() { if pred(r) { patch(r); n += 1; } }
- // self.storage.rewrite(&rows)?;
- //
- // if let Some(pk) = self.schema.primary_key.clone() {
- // let pk_idx = self.schema.col_index(&pk).context("PK not found")?;
- // let mut map = BTreeMap::new();
- // for (i, r) in rows.iter().enumerate() {
- // if let Some(k) = r.get(pk_idx) { map.insert(k.clone(), i as u64); }
- // }
- // self.pk_index = Some(map.clone());
- // let bytes = bincode::serialize(&map)?;
- // self.storage.write_index_bytes(&bytes)?;
- // }
- // Ok(n)
- // }
- pub fn delete<F: Fn(&[String]) -> bool>(&mut self, pred: F) -> Result<usize> {
- let mut kept: Vec<Vec<String>> = Vec::new(); let mut n = 0usize;
- self.storage.scan(|row| { if pred(row) { n += 1; } else { kept.push(row.to_vec()); } Ok(()) })?;
- self.storage.rewrite(&kept)?;
- if let Some(pk) = self.schema.primary_key.clone() {
- let pk_idx = self.schema.col_index(&pk).context("PK not found")?;
- let mut map = BTreeMap::new();
- for (i, r) in kept.iter().enumerate() {
- if let Some(k) = r.get(pk_idx) { map.insert(k.clone(), i as u64); }
- }
- self.pk_index = Some(map.clone());
- let bytes = bincode::serialize(&map)?;
- self.storage.write_index_bytes(&bytes)?;
- }
- Ok(n)
- }
- }
- }
- // ================================
- // Minimal SQL (hand-rolled, tiny subset)
- // ================================
- mod sql {
- use super::{engine, schema};
- use anyhow::{anyhow, Result};
- use regex::Regex;
- use serde_json::json;
- use std::fs;
- use std::collections::BTreeMap;
- use std::path::Path;
- // use anyhow::Context;
- // Tiny helper: strip comments and compress whitespace.
- pub fn clean_sql(s: &str) -> String {
- let mut out = String::new();
- for line in s.lines() {
- let l = line.trim();
- if l.starts_with("--") { continue; }
- out.push_str(l);
- out.push(' ');
- }
- // collapse multiple spaces
- let re = Regex::new(r"\s+").unwrap();
- re.replace_all(&out, " ").trim().to_string()
- }
- #[derive(Debug)]
- pub enum Stmt {
- CreateTable { name: String, cols: Vec<ColDef>, primary_key: Option<String> },
- Insert { table: String, cols: Vec<String>, values: Vec<Vec<String>> },
- Select { projection: Vec<String>, from: String, joins: Vec<Join>, order_by: Option<(String, bool)>, limit: Option<usize> },
- }
- #[derive(Debug)]
- pub struct ColDef {
- pub name: String,
- pub dtype: schema::DataType,
- pub nullable: bool,
- }
- #[derive(Debug)]
- pub struct Join {
- pub table: String,
- pub left_col: String,
- pub right_col: String,
- }
- pub fn parse_statements(sql: &str) -> Result<Vec<Stmt>> {
- let cleaned = clean_sql(sql);
- let mut out = Vec::new();
- for chunk in cleaned.split(';') {
- let s = chunk.trim();
- if s.is_empty() { continue; }
- if s.to_uppercase().starts_with("CREATE TABLE") {
- out.push(parse_create_table_owned(s)?);
- } else if s.to_uppercase().starts_with("INSERT INTO") {
- out.push(parse_insert_owned(s)?);
- } else if s.to_uppercase().starts_with("SELECT") {
- out.push(parse_select_owned(s)?);
- } else {
- return Err(anyhow!("Unsupported SQL: {}", s));
- }
- }
- Ok(out)
- }
- // Helper functions to parse and return owned Stmt/ColDef/Join
- fn parse_create_table_owned(s: &str) -> Result<Stmt> {
- let stmt = parse_create_table(s)?;
- if let Stmt::CreateTable { name, cols, primary_key } = stmt {
- Ok(Stmt::CreateTable {
- name: name.to_string(),
- cols: cols.into_iter().map(|c| ColDef {
- name: c.name.to_string(),
- dtype: c.dtype,
- nullable: c.nullable,
- }).collect(),
- primary_key: primary_key.map(|s| s.to_string()),
- })
- } else {
- unreachable!()
- }
- }
- fn parse_insert_owned(s: &str) -> Result<Stmt> {
- let stmt = parse_insert(s)?;
- if let Stmt::Insert { table, cols, values } = stmt {
- Ok(Stmt::Insert {
- table: table.to_string(),
- cols: cols.into_iter().map(|c| c.to_string()).collect(),
- values: values.into_iter().map(|row| row.into_iter().map(|v| v.to_string()).collect()).collect(),
- })
- } else {
- unreachable!()
- }
- }
- fn parse_select_owned(s: &str) -> Result<Stmt> {
- let stmt = parse_select(s)?;
- if let Stmt::Select { projection, from, joins, order_by, limit } = stmt {
- Ok(Stmt::Select {
- projection: projection.into_iter().map(|s| s.to_string()).collect(),
- from: from.to_string(),
- joins: joins.into_iter().map(|j| Join {
- table: j.table.to_string(),
- left_col: j.left_col.to_string(),
- right_col: j.right_col.to_string(),
- }).collect(),
- order_by: order_by.map(|(s, b)| (s.to_string(), b)),
- limit,
- })
- } else {
- unreachable!()
- }
- }
- fn parse_create_table(s: &str) -> Result<Stmt> {
- let mut dtype;
- // CREATE TABLE name ( col defs ... )
- let re = Regex::new(r"(?i)^CREATE TABLE\s+([A-Za-z0-9_]+)\s*\((.+)\)$").unwrap();
- let caps = re.captures(s).ok_or_else(|| anyhow!("bad CREATE TABLE"))?;
- let name = caps.get(1).unwrap().as_str().to_string();
- let body = caps.get(2).unwrap().as_str();
- let mut cols = Vec::new();
- let mut pk: Option<String> = None;
- for part in body.split(',') {
- let p = part.trim();
- if p.is_empty() { continue; }
- if p.to_uppercase().starts_with("PRIMARY KEY") {
- continue;
- }
- let toks: Vec<&str> = p.split_whitespace().collect();
- if toks.len() < 2 { continue; }
- let col = toks[0].to_string();
- let up = toks[1].to_uppercase();
- if up.starts_with("INT") { dtype = schema::DataType::Int; }
- else if up.starts_with("FLOAT") || up.starts_with("REAL") { dtype = schema::DataType::Float; }
- else if up.starts_with("BOOL") { dtype = schema::DataType::Bool; }
- else if up.starts_with("DATE") { dtype = schema::DataType::Date; }
- else { dtype = schema::DataType::String; }
- let nullable = !p.to_uppercase().contains("NOT NULL");
- if p.to_uppercase().contains("PRIMARY KEY") {
- pk = Some(col.clone());
- }
- cols.push(ColDef { name: col, dtype, nullable });
- }
- Ok(Stmt::CreateTable { name, cols, primary_key: pk })
- }
- fn parse_insert(s: &str) -> Result<Stmt> {
- // INSERT INTO table (a,b,c) VALUES (..),(..)
- let re = Regex::new(r"(?i)^INSERT INTO\s+([A-Za-z0-9_]+)\s*\(([^)]+)\)\s*VALUES\s*(.+)$").unwrap();
- let caps = re.captures(s).ok_or_else(|| anyhow!("bad INSERT"))?;
- let table = caps.get(1).unwrap().as_str().to_string();
- let cols_str = caps.get(2).unwrap().as_str();
- let values_str = caps.get(3).unwrap().as_str().trim();
- let cols: Vec<String> = cols_str.split(',').map(|c| c.trim().to_string()).collect();
- // split values by top-level parentheses
- let mut values = Vec::new();
- let mut depth = 0usize;
- let mut start = 0usize;
- for (i, ch) in values_str.char_indices() {
- match ch {
- '(' => { if depth == 0 { start = i+1; } depth += 1; }
- ')' => { depth -= 1; if depth == 0 {
- let tuple_str = &values_str[start..i];
- let tuple_vals = split_args(tuple_str).into_iter().map(|v| v.to_string()).collect();
- values.push(tuple_vals);
- }}
- _ => {}
- }
- }
- Ok(Stmt::Insert { table, cols, values })
- }
- fn split_args<'a>(s: &'a str) -> Vec<&'a str> {
- // split by commas not inside quotes
- let mut out = Vec::new();
- let mut start = 0usize;
- let mut in_str = false;
- let bytes = s.as_bytes();
- for i in 0..bytes.len() {
- let ch = bytes[i] as char;
- if ch == '\'' { in_str = !in_str; }
- if ch == ',' && !in_str {
- out.push(s[start..i].trim());
- start = i+1;
- }
- }
- if start < s.len() { out.push(s[start..].trim()); }
- out
- }
- fn parse_select(s: &str) -> Result<Stmt> {
- // SELECT a,b FROM t [JOIN u ON t.x=u.y]* [ORDER BY c (ASC|DESC)] [LIMIT n]
- let mut rest = s.trim();
- if !rest.to_uppercase().starts_with("SELECT ") { return Err(anyhow!("bad SELECT")); }
- rest = &rest[7..];
- let (projection_str, rest) = split_once_upper(rest, " FROM ").ok_or_else(|| anyhow!("missing FROM"))?;
- let projection: Vec<String> = projection_str.split(',').map(|c| c.trim().to_string()).collect();
- let (from_str, rest) = split_once_any_upper(rest, &[" JOIN ", " ORDER BY ", " LIMIT "]).unwrap_or((rest, ""));
- let from = from_str.trim().to_string();
- let mut joins = Vec::new();
- let mut order_by: Option<(String, bool)> = None;
- let mut limit: Option<usize> = None;
- let mut tmp = rest;
- while !tmp.is_empty() {
- if let Some((after, _chunk)) = take_prefix_upper(tmp, " JOIN ") {
- // chunk starts at table name
- let (table, after2) = split_once_upper(after, " ON ").ok_or_else(|| anyhow!("JOIN missing ON"))?;
- let table = table.trim().to_string();
- let (cond, after3) = split_once_any_upper(after2, &[" JOIN ", " ORDER BY ", " LIMIT "]).unwrap_or((after2, ""));
- // cond: a.x = b.y
- let parts: Vec<&str> = cond.split('=').map(|x| x.trim()).collect();
- if parts.len() != 2 { return Err(anyhow!("bad JOIN condition")); }
- let (l_tab, l_col) = split_qual(parts[0])?;
- let (r_tab, r_col) = split_qual(parts[1])?;
- if l_tab.is_empty() || r_tab.is_empty() { return Err(anyhow!("JOIN needs qualified columns")); }
- joins.push(super::sql::Join { table, left_col: l_tab + "." + &l_col, right_col: r_tab + "." + &r_col });
- tmp = after3;
- continue;
- }
- if let Some((after, _chunk)) = take_prefix_upper(tmp, " ORDER BY ") {
- let (spec, after2) = split_once_any_upper(after, &[" LIMIT "]).unwrap_or((after, ""));
- let mut toks = spec.split_whitespace();
- let col = toks.next().ok_or_else(|| anyhow!("ORDER BY col"))?.to_string();
- let dir = toks.next().unwrap_or("ASC").eq_ignore_ascii_case("DESC");
- order_by = Some((col, dir)); // dir==true => DESC
- tmp = after2;
- continue;
- }
- if let Some((after, _)) = take_prefix_upper(tmp, " LIMIT ") {
- let n = after.trim().split_whitespace().next().unwrap_or("0").parse::<usize>()?;
- limit = Some(n);
- tmp = "";
- continue;
- }
- break;
- }
- Ok(Stmt::Select { projection, from, joins, order_by, limit })
- }
- fn split_qual(s: &str) -> Result<(String, String)> {
- let parts: Vec<&str> = s.split('.').collect();
- if parts.len() != 2 { return Err(anyhow!("need qualified name like t.col")); }
- Ok((parts[0].trim().to_string(), parts[1].trim().to_string()))
- }
- fn split_once_upper<'a>(s: &'a str, needle: &str) -> Option<(&'a str, &'a str)> {
- let up = s.to_uppercase();
- let nup = needle.to_uppercase();
- if let Some(i) = up.find(&nup) {
- Some((&s[..i], &s[i + needle.len()..]))
- } else { None }
- }
- fn split_once_any_upper<'a>(s: &'a str, needles: &[&str]) -> Option<(&'a str, &'a str)> {
- let up = s.to_uppercase();
- let mut best: Option<(usize, &str)> = None;
- for &n in needles {
- if let Some(i) = up.find(&n.to_uppercase()) {
- if best.map_or(true, |(bi, _)| i < bi) { best = Some((i, n)); }
- }
- }
- if let Some((i, _n)) = best {
- Some((&s[..i], &s[i..]))
- } else {
- None
- }
- }
- fn take_prefix_upper<'a>(s: &'a str, needle: &'a str) -> Option<(&'a str, &'a str)> {
- let up = s.to_uppercase();
- let nup = needle.to_uppercase();
- if up.starts_with(&nup) {
- Some((&s[needle.len()..], needle))
- } else { None }
- }
- // ----- Execution: map parsed SQL to engine operations -----
- use crate::engine::{Table, Value};
- use std::collections::HashMap;
- use std::sync::Mutex;
- // Simple schema cache to avoid re-reading TOML files
- lazy_static::lazy_static! {
- static ref SCHEMA_CACHE: Mutex<HashMap<String, schema::Schema>> = Mutex::new(HashMap::new());
- }
- fn get_cached_schema(root: &Path, table_name: &str, legacy: bool) -> Result<schema::Schema> {
- let cache_key = format!("{}:{}:{}", root.display(), table_name, legacy);
- // Try cache first
- {
- let cache = SCHEMA_CACHE.lock().unwrap();
- if let Some(schema) = cache.get(&cache_key) {
- return Ok(schema.clone());
- }
- }
- let schema = if legacy {
- // Load from CSV schema file (legacy)
- let schema_path = root.join(format!("{table_name}.schema.toml"));
- let sch_str = fs::read_to_string(&schema_path)
- .map_err(|_| anyhow!("schema not found for table {table_name}"))?;
- toml::from_str(&sch_str)?
- } else {
- // Load from binary storage catalog
- let canto_db_path = root.join("canto.db");
- let lock_path = root.join("canto.lock");
- let binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
- if let Some(catalog_toml) = binary_storage.read_catalog()? {
- let all_schemas: std::collections::HashMap<String, schema::Schema> = toml::from_str(&catalog_toml)?;
- all_schemas.get(table_name)
- .ok_or_else(|| anyhow!("table {} not found in catalog", table_name))?
- .clone()
- } else {
- return Err(anyhow!("no catalog found in binary storage"));
- }
- };
- // Cache it
- {
- let mut cache = SCHEMA_CACHE.lock().unwrap();
- cache.insert(cache_key, schema.clone());
- }
- Ok(schema)
- }
- pub fn exec_statements(root: &Path, stmts: Vec<Stmt>, legacy: bool) -> Result<Vec<serde_json::Value>> {
- let mut results = Vec::new();
- for st in stmts {
- match st {
- Stmt::CreateTable { name, cols, primary_key } => {
- let schema = schema::Schema {
- name: name.to_string(),
- columns: cols.into_iter().map(|c| schema::Column {
- name: c.name.to_string(),
- dtype: c.dtype,
- nullable: c.nullable,
- }).collect(),
- primary_key: primary_key.map(|s| s.to_string()),
- };
- let _ = Table::open(root, schema, legacy)?; // ensures files exist
- results.push(json!({"ok": true, "op": "create_table", "table": name}));
- }
- Stmt::Insert { table, cols, values } => {
- // Load existing schema from cache
- let schema = get_cached_schema(root, &table, legacy)?;
- let mut tbl = Table::open(root, schema, legacy)?;
- let col_index: Vec<usize> = cols.iter().map(|c| tbl.schema.col_index(c).ok_or_else(|| anyhow!("unknown col {}", c))).collect::<Result<_>>()?;
- let values_len = values.len();
- for tup in values {
- let mut row = vec![Value::S(String::new()); tbl.schema.columns.len()];
- for (i, raw) in tup.iter().enumerate() {
- let colpos = col_index[i];
- let v = parse_value(raw);
- row[colpos] = v;
- }
- tbl.insert(row)?;
- }
- results.push(json!({"ok": true, "op": "insert", "table": table, "rows": values_len}));
- }
- Stmt::Select { projection, from, joins, order_by, limit } => {
- // Naive: materialize FROM table, then nested-loop JOINs on equality
- let from_schema = get_cached_schema(root, &from, legacy)?;
- let from_tbl = Table::open(root, from_schema.clone(), legacy)?;
- // Load full rows
- let mut rows = Vec::<(String, Vec<String>)>::new(); // (table prefix, row)
- let prefix_from = from.clone();
- from_tbl.select(|_| true)?.into_iter().for_each(|r| rows.push((prefix_from.clone(), r)));
- // Build a map from table->(schema, rows)
- let mut table_schemas: BTreeMap<String, schema::Schema> = BTreeMap::new();
- table_schemas.insert(from.clone(), from_schema);
- // Apply joins
- for j in joins {
- let right_schema = get_cached_schema(root, &j.table, legacy)?;
- let right_tbl = Table::open(root, right_schema.clone(), legacy)?;
- let right_rows = right_tbl.select(|_| true)?;
- // parse qualified eq: left_tab.left_col = right_tab.right_col
- let (l_tab, l_col) = split_qual(&j.left_col)?;
- let (_r_tab, r_col) = split_qual(&j.right_col)?;
- let l_idx = table_schemas.get(&l_tab).ok_or_else(|| anyhow!("unknown table {}", l_tab))?
- .col_index(&l_col).ok_or_else(|| anyhow!("unknown col {}", j.left_col))?;
- let r_idx = right_schema.col_index(&r_col).ok_or_else(|| anyhow!("unknown col {}", j.right_col))?;
- // Hash join optimization for equi-joins
- let mut new_rows: Vec<(String, Vec<String>)> = Vec::new();
- // Build hash map for the right side (smaller table typically)
- let mut right_map: std::collections::HashMap<String, Vec<usize>> = std::collections::HashMap::new();
- for (idx, rrow) in right_rows.iter().enumerate() {
- let key = rrow[r_idx].clone();
- right_map.entry(key).or_insert_with(Vec::new).push(idx);
- }
- // Probe with left side
- for (ltab, lrow) in rows.into_iter() {
- let kval = lrow[l_idx].clone();
- if let Some(right_indices) = right_map.get(&kval) {
- for &right_idx in right_indices {
- let rrow = &right_rows[right_idx];
- // merged row = lrow + rrow
- let mut merged = lrow.clone();
- merged.extend(rrow.clone());
- new_rows.push((format!("{}+{}", ltab, j.table), merged));
- }
- }
- }
- rows = new_rows;
- table_schemas.insert(j.table.to_string(), right_schema);
- }
- // Build projection indices (allow qualified or bare; resolve by first match)
- let joined_headers: Vec<(String,String)> = {
- let mut acc = Vec::new();
- for (tname, sch) in table_schemas.iter() {
- for c in &sch.columns {
- acc.push((tname.clone(), c.name.clone()));
- }
- }
- acc
- };
- let proj_indices: Vec<usize> = projection.iter().map(|colref| {
- if colref == "*" {
- // expand to all columns: handled specially later
- Ok(usize::MAX)
- } else if let Some((t,c)) = colref.split_once('.') {
- // qualified
- let mut idx = 0usize;
- for (qt, qc) in joined_headers.iter() {
- if qt == t && qc == c { return Ok(idx); }
- idx += 1;
- }
- Err(anyhow!("unknown projection {}", colref))
- } else {
- // bare: first match
- let mut idx = 0usize;
- for (_qt, qc) in joined_headers.iter() {
- if qc == colref { return Ok(idx); }
- idx += 1;
- }
- Err(anyhow!("unknown column {}", colref))
- }
- }).collect::<Result<_>>()?;
- // ORDER BY with numeric support
- if let Some((ob, desc)) = order_by {
- // resolve index of order-by column (qualified or bare)
- let ob_idx: usize = if let Some((t,c)) = ob.split_once('.') {
- let mut idx = 0usize;
- for (qt, qc) in joined_headers.iter() {
- if qt == t && qc == c { break; }
- idx += 1;
- }
- idx
- } else {
- let mut idx = 0usize;
- for (_qt, qc) in joined_headers.iter() {
- if qc == &ob { break; }
- idx += 1;
- }
- idx
- };
- // Try numeric sort first, fall back to lexicographic
- let all_i64: bool = rows.iter().all(|(_, row)| row[ob_idx].parse::<i64>().is_ok());
- if all_i64 {
- // Sort as integers
- rows.sort_by(|a, b| {
- let a_val = a.1[ob_idx].parse::<i64>().unwrap_or(0);
- let b_val = b.1[ob_idx].parse::<i64>().unwrap_or(0);
- a_val.cmp(&b_val)
- });
- } else {
- let all_f64: bool = rows.iter().all(|(_, row)| row[ob_idx].parse::<f64>().is_ok());
- if all_f64 {
- // Sort as floats
- rows.sort_by(|a, b| {
- let a_val = a.1[ob_idx].parse::<f64>().unwrap_or(0.0);
- let b_val = b.1[ob_idx].parse::<f64>().unwrap_or(0.0);
- a_val.partial_cmp(&b_val).unwrap_or(std::cmp::Ordering::Equal)
- });
- } else {
- // Sort lexicographically
- rows.sort_by(|a,b| a.1[ob_idx].cmp(&b.1[ob_idx]));
- }
- }
- if desc { rows.reverse(); }
- }
- // LIMIT
- let rows = if let Some(n) = limit { rows.into_iter().take(n).collect::<Vec<_>>() } else { rows };
- // Materialize projection (optimized: compute indices once)
- let out_rows: Vec<Vec<String>> = if proj_indices.len()==1 && proj_indices[0]==usize::MAX {
- // SELECT * - return all columns
- rows.into_iter().map(|(_tname, row)| row).collect()
- } else {
- // Specific columns - project only needed ones
- rows.into_iter().map(|(_tname, row)| {
- let mut projected_row = Vec::with_capacity(proj_indices.len());
- for &i in &proj_indices {
- projected_row.push(row[i].clone());
- }
- projected_row
- }).collect()
- };
- results.push(json!({
- "ok": true,
- "op": "select",
- "columns": projection,
- "rows": out_rows
- }));
- }
- }
- }
- Ok(results)
- }
- fn parse_value(s: &str) -> engine::Value {
- let t = s.trim();
- if t.starts_with('\'') && t.ends_with('\'') {
- let inner = &t[1..t.len()-1];
- engine::Value::S(inner.to_string())
- } else if let Ok(i) = t.parse::<i64>() {
- engine::Value::I(i)
- } else if let Ok(f) = t.parse::<f64>() {
- engine::Value::F(f)
- } else if t.eq_ignore_ascii_case("true") || t.eq_ignore_ascii_case("false") {
- engine::Value::B(t.eq_ignore_ascii_case("true"))
- } else {
- engine::Value::S(t.to_string())
- }
- }
- }
- // use sql::*; // for json!, regex dependency
- // use serde_json::json;
- // ================================
- // REST server (Axum) + Auth + Users
- // ================================
- mod server {
- use super::{Args};
- use super::{sql, engine, schema};
- use axum::{routing::{get, post, delete}, Router, extract::State, http::{StatusCode, HeaderMap}, Json};
- use serde::Deserialize;
- use std::{sync::Arc, path::PathBuf, fs};
- use anyhow::{Result, anyhow};
- use base64::engine::general_purpose::STANDARD;
- use base64::Engine as _;
- use axum::extract::Path;
- use serde_json::json;
- #[derive(Clone)]
- pub struct AppState {
- pub root: Arc<PathBuf>,
- pub require_auth: bool,
- pub legacy: bool,
- }
- #[derive(Deserialize)]
- pub struct QueryReq { pub sql: String }
- #[derive(Deserialize)]
- pub struct UserReq { pub username: String, pub password: String }
- pub async fn run_server(args: Args) -> Result<()> {
- let app_state = AppState { root: Arc::new(args.data_dir.clone()), require_auth: args.require_auth, legacy: args.legacy };
- // Migrate users table from CSV to binary storage (if needed)
- if !args.legacy {
- super::migration::migrate_users_to_binary(&args.data_dir)?;
- // Ensure canto.db exists for binary storage (even if no migration was needed)
- let canto_db_path = args.data_dir.join("canto.db");
- if !canto_db_path.exists() {
- let lock_path = args.data_dir.join("canto.lock");
- let _binary_storage = super::storage::BinaryStorage::open(canto_db_path, lock_path)?;
- }
- }
- // Check for migration from CSV to binary storage for other tables
- let canto_db_path = args.data_dir.join("canto.db");
- if !args.legacy && !canto_db_path.exists() {
- // Check if there are any CSV files that need migration
- if let Ok(entries) = std::fs::read_dir(&args.data_dir) {
- let csv_files: Vec<_> = entries
- .filter_map(|e| e.ok())
- .filter(|e| e.path().extension().map_or(false, |ext| ext == "csv"))
- .filter(|e| e.file_name() != "users.csv") // Skip users table - handled above
- .collect();
- if !csv_files.is_empty() {
- println!("Found legacy CSV files, migrating to binary storage...");
- migrate_csv_to_binary(&args.data_dir, &canto_db_path)?;
- println!("Migration completed successfully!");
- }
- }
- }
- // Ensure "users" table exists
- ensure_users_table(&app_state)?;
- let app = Router::new()
- .route("/healthz", get(|| async { "ok" }))
- .route("/v1/query", post(handle_query))
- .route("/v1/users", post(create_user))
- .route("/v1/users/:username", delete(delete_user))
- .with_state(app_state);
- let listener = tokio::net::TcpListener::bind(&args.bind).await?;
- println!("canto server listening on http://{}", &args.bind);
- axum::serve(listener, app).await?;
- Ok(())
- }
- fn migrate_csv_to_binary(data_dir: &std::path::Path, canto_db_path: &std::path::Path) -> Result<()> {
- use super::storage::{BinaryStorage, StorageBackend};
- use super::schema::Schema;
- use std::collections::HashMap;
- // Create binary storage
- let lock_path = data_dir.join("canto.lock");
- let binary_storage = BinaryStorage::open(canto_db_path.to_path_buf(), lock_path)?;
- // Collect all schemas
- let mut all_schemas = HashMap::new();
- for entry in std::fs::read_dir(data_dir)? {
- let entry = entry?;
- let path = entry.path();
- if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
- if name.ends_with(".schema.toml") {
- let table_name = name.strip_suffix(".schema.toml").unwrap();
- if table_name != "users" { // Skip users table
- let schema_content = std::fs::read_to_string(&path)?;
- let schema: Schema = toml::from_str(&schema_content)?;
- all_schemas.insert(table_name.to_string(), schema);
- }
- }
- }
- }
- // Write central catalog
- let catalog_toml = toml::to_string_pretty(&all_schemas)?;
- binary_storage.write_catalog(&catalog_toml)?;
- // Migrate each table's data
- let mut checkpoints = Vec::new();
- for (table_name, schema) in &all_schemas {
- let csv_path = data_dir.join(format!("{}.csv", table_name));
- if csv_path.exists() {
- // Read CSV data
- let mut rows = Vec::new();
- let file = std::fs::File::open(&csv_path)?;
- let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(file);
- for record in rdr.records() {
- let record = record?;
- let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
- // Only take columns that exist in schema (ignore FOREIGN KEY artifacts)
- let filtered_row: Vec<String> = row.into_iter()
- .take(schema.columns.len())
- .collect();
- rows.push(filtered_row);
- }
- // Write to binary storage
- let seq_no = 1u64; // Start with sequence 1
- let data_offset = binary_storage.append_table_data(table_name, seq_no, &rows)?;
- // Handle primary key index if present
- let pk_index_offset = if let Some(pk_col) = &schema.primary_key {
- if let Some(pk_idx) = schema.col_index(pk_col) {
- let mut pk_index = std::collections::BTreeMap::new();
- for (row_no, row) in rows.iter().enumerate() {
- if let Some(pk_val) = row.get(pk_idx) {
- pk_index.insert(pk_val.clone(), row_no as u64);
- }
- }
- binary_storage.write_pk_index(table_name, &pk_index)?
- } else { 0 }
- } else { 0 };
- checkpoints.push(super::storage::TableCheckpoint {
- table_name: table_name.clone(),
- last_seq_no: seq_no,
- last_data_offset: data_offset,
- pk_index_offset,
- });
- }
- }
- // Write checkpoint
- binary_storage.write_checkpoint(&checkpoints)?;
- // Move old files to legacy directory
- let legacy_dir = data_dir.join("legacy");
- std::fs::create_dir_all(&legacy_dir)?;
- for entry in std::fs::read_dir(data_dir)? {
- let entry = entry?;
- let path = entry.path();
- if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
- // Move CSV, schema, idx, and lock files (but not users table)
- if (name.ends_with(".csv") || name.ends_with(".schema.toml") ||
- name.ends_with(".idx") || name.ends_with(".lock")) &&
- !name.starts_with("users.") && !name.starts_with("canto.") {
- let dest = legacy_dir.join(name);
- std::fs::rename(&path, &dest)?;
- }
- }
- }
- Ok(())
- }
- async fn handle_query(State(st): State<AppState>, headers: HeaderMap, Json(req): Json<QueryReq>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
- if st.require_auth {
- auth_check(&st, &headers).map_err(internal)?;
- }
- let stmts = sql::parse_statements(&req.sql).map_err(internal)?;
- // Use binary storage if canto.db exists, otherwise fall back to CSV
- let use_legacy = st.legacy || !st.root.join("canto.db").exists();
- let res = sql::exec_statements(&st.root, stmts, use_legacy).map_err(internal)?;
- Ok(Json(json!({"results": res})))
- }
- async fn create_user(State(st): State<AppState>, headers: HeaderMap, Json(req): Json<UserReq>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
- // If auth is required, only an authenticated caller can manage users.
- if st.require_auth {
- auth_check(&st, &headers).map_err(internal)?;
- }
- let (mut tbl, idx_user, _idx_pass) = open_users(&st).map_err(internal)?;
- // naive: check exists
- let exists = tbl.select(|r| r[idx_user] == req.username).map_err(internal)?.len() > 0;
- if exists { return Err((StatusCode::BAD_REQUEST, "user exists".into())); }
- tbl.insert(vec![engine::Value::S(req.username), engine::Value::S(req.password)]).map_err(internal)?;
- Ok(Json(json!({"ok": true})))
- }
- async fn delete_user(State(st): State<AppState>, headers: HeaderMap, Path(username): Path<String>) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
- if st.require_auth {
- auth_check(&st, &headers).map_err(internal)?;
- }
- let (mut tbl, idx_user, _idx_pass) = open_users(&st).map_err(internal)?;
- let n = tbl.delete(|r| r[idx_user] == username).map_err(internal)?;
- Ok(Json(json!({"ok": true, "deleted": n})))
- }
- fn ensure_users_table(st: &AppState) -> Result<()> {
- let schema = schema::Schema {
- name: "users".into(),
- columns: vec![
- schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
- schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
- ],
- primary_key: Some("username".into()),
- };
- // Use binary storage if canto.db exists, otherwise fall back to CSV
- let use_legacy = st.legacy || !st.root.join("canto.db").exists();
- let _ = engine::Table::open(&st.root, schema, use_legacy)?;
- Ok(())
- }
- fn open_users(st: &AppState) -> Result<(engine::Table, usize, usize)> {
- let sch: schema::Schema = {
- // Use binary storage if canto.db exists, otherwise read from CSV schema file
- if !st.legacy && st.root.join("canto.db").exists() {
- // Users schema should be in binary catalog
- schema::Schema {
- name: "users".into(),
- columns: vec![
- schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
- schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
- ],
- primary_key: Some("username".into()),
- }
- } else {
- // Read schema from TOML file
- let s = fs::read_to_string(st.root.join("users.schema.toml"))?;
- toml::from_str(&s)?
- }
- };
- let idx_user = sch.col_index("username").ok_or_else(|| anyhow!("users.username missing"))?;
- let idx_pass = sch.col_index("password").ok_or_else(|| anyhow!("users.password missing"))?;
- // Use binary storage if canto.db exists, otherwise use CSV storage
- let use_legacy = st.legacy || !st.root.join("canto.db").exists();
- let tbl = engine::Table::open(&st.root, sch, use_legacy)?;
- Ok((tbl, idx_user, idx_pass))
- }
- fn internal<E: std::fmt::Display>(e: E) -> (StatusCode, String) {
- (StatusCode::BAD_REQUEST, e.to_string())
- }
- fn auth_check(st: &AppState, headers: &HeaderMap) -> Result<()> {
- let auth = headers.get(http::header::AUTHORIZATION).ok_or_else(|| anyhow!("missing Authorization"))?;
- let s = auth.to_str().map_err(|_| anyhow!("bad Authorization header"))?;
- if !s.starts_with("Basic ") { return Err(anyhow!("expect Basic auth")); }
- let b64 = &s[6..];
- let decoded = STANDARD.decode(b64)?;
- let creds = String::from_utf8(decoded).map_err(|_| anyhow!("utf8"))?;
- let (u, p) = creds.split_once(':').ok_or_else(|| anyhow!("bad basic pair"))?;
- // verify in users table
- let (tbl, idx_user, idx_pass) = open_users(st)?;
- let rows = tbl.select(|r| r[idx_user] == u && r[idx_pass] == p)?;
- if rows.is_empty() { return Err(anyhow!("invalid credentials")); }
- Ok(())
- }
- }
- // ================================
- // Migration (Users CSV -> Binary)
- // ================================
- mod migration {
- use super::{schema, storage};
- use anyhow::Result;
- use std::path::Path;
- use std::fs;
- pub fn migrate_users_to_binary(root: &Path) -> Result<()> {
- // Step 1: Check if migration is needed
- if !legacy_users_exist(root) {
- // No legacy users files to migrate
- return Ok(());
- }
- if binary_exists(root) {
- // Binary storage exists, check if users table is already there
- if users_present_in_binary(root)? {
- // Users already migrated, just archive legacy files if they still exist
- println!("Users table already exists in binary storage, archiving legacy files...");
- archive_legacy_users(root)?;
- return Ok(());
- }
- }
- println!("Migrating users table from CSV to binary storage...");
- // Step 2: Read legacy users data
- let legacy_rows = read_legacy_users(root)?;
- println!("Found {} user(s) in legacy CSV", legacy_rows.len());
- // Step 3: Ensure binary storage and users schema exist
- ensure_users_in_binary(root)?;
- // Step 4: Import users data
- import_users_to_binary(root, &legacy_rows)?;
- // Step 5: Archive legacy files
- archive_legacy_users(root)?;
- println!("Users migration completed successfully!");
- Ok(())
- }
- fn legacy_users_exist(root: &Path) -> bool {
- let csv_path = root.join("users.csv");
- let schema_path = root.join("users.schema.toml");
- csv_path.exists() && schema_path.exists()
- }
- fn binary_exists(root: &Path) -> bool {
- root.join("canto.db").exists()
- }
- fn users_present_in_binary(root: &Path) -> Result<bool> {
- if !binary_exists(root) {
- return Ok(false);
- }
- let lock_path = root.join("canto.lock");
- let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
- if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
- let existing_schemas: std::collections::HashMap<String, schema::Schema> =
- toml::from_str(&catalog_toml).unwrap_or_default();
- Ok(existing_schemas.contains_key("users"))
- } else {
- Ok(false)
- }
- }
- fn read_legacy_users(root: &Path) -> Result<Vec<Vec<String>>> {
- let csv_path = root.join("users.csv");
- let mut rows = Vec::new();
- if csv_path.exists() {
- let file = std::fs::File::open(&csv_path)?;
- let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(file);
- for record in rdr.records() {
- let record = record?;
- let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
- rows.push(row);
- }
- }
- Ok(rows)
- }
- fn ensure_users_in_binary(root: &Path) -> Result<()> {
- let lock_path = root.join("canto.lock");
- let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
- // Create users schema
- let users_schema = schema::Schema {
- name: "users".into(),
- columns: vec![
- schema::Column { name: "username".into(), dtype: schema::DataType::String, nullable: false },
- schema::Column { name: "password".into(), dtype: schema::DataType::String, nullable: false },
- ],
- primary_key: Some("username".into()),
- };
- // Read existing catalog or create new one
- let mut all_schemas: std::collections::HashMap<String, schema::Schema> =
- if let Ok(Some(catalog_toml)) = binary_storage.read_catalog() {
- toml::from_str(&catalog_toml).unwrap_or_default()
- } else {
- std::collections::HashMap::new()
- };
- // Add or update users schema
- all_schemas.insert("users".to_string(), users_schema);
- // Write updated catalog
- let catalog_toml = toml::to_string_pretty(&all_schemas)?;
- binary_storage.write_catalog(&catalog_toml)?;
- Ok(())
- }
- fn import_users_to_binary(root: &Path, rows: &[Vec<String>]) -> Result<()> {
- if rows.is_empty() {
- return Ok(());
- }
- let lock_path = root.join("canto.lock");
- let binary_storage = storage::BinaryStorage::open(root.join("canto.db"), lock_path)?;
- // Import user rows in a single batch
- let seq_no = 1u64; // Start with sequence 1 for users
- let data_offset = binary_storage.append_table_data("users", seq_no, rows)?;
- // Build primary key index for users (username is PK)
- let mut pk_index = std::collections::BTreeMap::new();
- for (row_no, row) in rows.iter().enumerate() {
- if let Some(username) = row.get(0) { // username is first column
- pk_index.insert(username.clone(), row_no as u64);
- }
- }
- let pk_index_offset = binary_storage.write_pk_index("users", &pk_index)?;
- // Write checkpoint with users table info
- let checkpoint = storage::TableCheckpoint {
- table_name: "users".to_string(),
- last_seq_no: seq_no,
- last_data_offset: data_offset,
- pk_index_offset,
- };
- binary_storage.write_checkpoint(&[checkpoint])?;
- Ok(())
- }
- fn archive_legacy_users(root: &Path) -> Result<()> {
- let legacy_dir = root.join("legacy");
- fs::create_dir_all(&legacy_dir)?;
- let files_to_archive = ["users.csv", "users.schema.toml", "users.idx", "users.lock"];
- for filename in &files_to_archive {
- let src_path = root.join(filename);
- if src_path.exists() {
- let dest_path = legacy_dir.join(filename);
- fs::rename(&src_path, &dest_path)?;
- println!("Archived {} to legacy/", filename);
- }
- }
- Ok(())
- }
- }
- // ================================
- // CLI client
- // ================================
- mod client {
- use super::Args;
- use anyhow::{anyhow, Result};
- use base64::engine::general_purpose::STANDARD;
- use base64::Engine as _;
- use std::fs;
- use serde_json::json;
- use reqwest::Client;
- pub async fn run_client(args: Args) -> Result<()> {
- let sql = if let Some(f) = args.file {
- fs::read_to_string(&f)?
- } else {
- return Err(anyhow!("client mode requires -f <sql-file>"));
- };
- let url = format!("{}/v1/query", args.url.trim_end_matches('/'));
- let client = Client::new();
- let mut req = client.post(&url).header("content-type", "application/json");
- if let (Some(u), Some(p)) = (args.username.as_ref(), args.password.as_ref()) {
- let token = STANDARD.encode(format!("{}:{}", u, p));
- req = req.header("authorization", format!("Basic {}", token));
- }
- let resp = req.body(serde_json::to_vec(&json!({"sql": sql}))?).send().await?;
- let status = resp.status();
- let bytes = resp.bytes().await?;
- if !status.is_success() {
- println!("Error {}: {}", status, String::from_utf8_lossy(&bytes));
- return Ok(());
- }
- println!("{}", String::from_utf8_lossy(&bytes));
- Ok(())
- }
- }
Add Comment
Please, Sign In to add comment