Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // for the initial connection attempt, which will determine if possibly I would need to create the container and deployment upon failure
- // i will use rusts 'timeout' for x interval determined with CONNECTION_TIMEOUT
- async fn attempt_connection() -> Result<TcpStream, Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Attempting TCP connection to {}", TcpUrl);
- timeout(CONNECTION_TIMEOUT, TcpStream::connect(TcpUrl)).await?.map_err(Into::into)
- }
- // The websocket connection will be important for sending data to there and back
- // in this case, this is primarially used for transmitting console data to the frontend from the gameserver
- async fn handle_server_data(
- state: Arc<RwLock<AppState>>,
- data: &[u8],
- ws_tx: &broadcast::Sender<String>,
- ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Handling server data ({} bytes)", data.len());
- if let Ok(text) = String::from_utf8(data.to_vec()) {
- println!("[DEBUG] Raw message from server: {}", text);
- if let Ok(outer_msg) = serde_json::from_str::<InnerData>(&text) {
- println!("[DEBUG] Parsed outer message successfully");
- let inner_data_str = outer_msg.data.as_str();
- let mut borrowed_state = state.write().await;
- if let Some(start_keyword) = borrowed_state.gameserver["start_keyword"].as_str() {
- println!("[DEBUG] Checking for start keyword: {}", start_keyword);
- if inner_data_str.contains(start_keyword) {
- println!("[DEBUG] Start keyword found - setting status to Up");
- borrowed_state.status = Status::Up;
- } else {
- println!("[DEBUG] Start keyword not found - setting status to Down");
- borrowed_state.status = Status::Down;
- }
- } else {
- eprintln!("[DEBUG] start_keyword is not a string");
- }
- if let Ok(inner_data) = serde_json::from_str::<serde_json::Value>(inner_data_str) {
- println!("[DEBUG] Parsed inner data successfully");
- if ALLOW_NONJSON_DATA == true {
- if let Some(message_content) = inner_data["data"].as_str() {
- println!("[DEBUG] Extracted message: {}", message_content);
- let _ = ws_tx.send(message_content.to_string());
- }
- } else {
- println!("[DEBUG] Sending raw inner data: {}", inner_data_str);
- let _ = ws_tx.send(inner_data_str.to_string());
- }
- } else {
- println!("[DEBUG] Failed to parse inner data, sending raw");
- let _ = ws_tx.send(inner_data_str.to_string());
- }
- } else {
- println!("[DEBUG] Failed to parse outer message, sending raw text");
- let _ = ws_tx.send(text);
- }
- }
- Ok(())
- }
- // this handles the main tcp stream between the gameserver and the client console, more specifically the data exchanged
- // as well as notifying the gameserver of its capabilities which at some point ill make dependent on what feature flag is set
- // `tokio::select!` is used to concurrently wait for either incoming TCP data or messages from the channel to send.
- async fn handle_stream(
- state: Arc<RwLock<AppState>>,
- rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
- stream: &mut TcpStream,
- ws_tx: broadcast::Sender<String>,
- ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Starting stream handling");
- let (mut reader, mut writer) = stream.split();
- let mut buf = vec![0u8; 1024];
- let mut buf_reader = BufReader::new(reader);
- let mut line = String::new();
- let capability_msg = serde_json::to_string(
- &List {
- list: ApiCalls::Capabilities(vec!["all".to_string()])
- }
- )? + "\n";
- println!("[DEBUG] Sending capability message");
- writer.write_all(capability_msg.as_bytes()).await?;
- match buf_reader.read_line(&mut line).await {
- Ok(0) => {
- println!("[DEBUG] Connection possibly closed after capability message");
- }
- Ok(_) => {
- println!("[DEBUG] Received capability response: {}", line.trim_end());
- }
- Err(e) => {
- println!("[DEBUG] Error reading capability response: {:?}", e);
- return Err(e.into());
- }
- }
- let server_data_msg = serde_json::to_string(
- &MessagePayload {
- r#type: "command".to_string(),
- message: "server_data".to_string(),
- authcode: "0".to_string(),
- }
- )? + "\n";
- println!("[DEBUG] Sending server data request");
- writer.write_all(server_data_msg.as_bytes()).await?;
- match buf_reader.read_line(&mut line).await {
- Ok(0) => {
- println!("[DEBUG] Connection possibly closed after server data request");
- }
- Ok(_) => {
- println!("[DEBUG] Received server data: {}", line.trim_end());
- state.write().await.gameserver = serde_json::Value::String(line.clone());
- }
- Err(e) => {
- println!("[DEBUG] Error reading server data: {:?}", e);
- return Err(e.into());
- }
- }
- reader = buf_reader.into_inner();
- println!("[DEBUG] Entering main stream loop");
- loop {
- let mut rx_guard = rx.lock().await;
- tokio::select! {
- result = reader.read(&mut buf) => match result {
- Ok(0) => {
- println!("[DEBUG] Stream read returned 0 bytes - connection closed");
- return Ok(());
- },
- Ok(n) => {
- println!("[DEBUG] Received {} bytes from stream", n);
- handle_server_data(state.clone(), &buf[..n], &ws_tx).await?
- },
- Err(e) => {
- println!("[DEBUG] Stream read error: {:?}", e);
- return Err(e.into());
- },
- },
- result = rx_guard.recv() => if let Some(data) = result {
- println!("[DEBUG] Sending {} bytes to server", data.len());
- writer.write_all(&data).await?;
- writer.write_all(b"\n").await?;
- writer.flush().await?;
- } else {
- println!("[DEBUG] Channel closed - ending stream handling");
- return Ok(());
- }
- }
- }
- }
- // does the connection to the tcp server, wether initial or not, on success it will pass it off to the dedicated handler for the stream
- async fn connect_to_server(
- state: &AppState,
- rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
- ws_tx: broadcast::Sender<String>,
- ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Starting connect_to_server loop");
- loop {
- println!("[DEBUG] Trying to connect to {}…", TcpUrl);
- match timeout(CONNECTION_TIMEOUT, TcpStream::connect(TcpUrl)).await {
- Ok(Ok(mut stream)) => {
- println!("[DEBUG] Connection succeeded!");
- handle_stream(Arc::new(RwLock::new(state.clone())), rx.clone(), &mut stream, ws_tx.clone()).await?
- }
- Ok(Err(e)) => {
- eprintln!("[DEBUG] Connection error: {}", e);
- tokio::time::sleep(CONNECTION_RETRY_DELAY).await;
- }
- Err(_) => {
- eprintln!("[DEBUG] Connection timed out after {:?}", CONNECTION_TIMEOUT);
- tokio::time::sleep(CONNECTION_RETRY_DELAY).await;
- }
- }
- }
- }
- // This function is soley for the initial connection to the tcp server, then passes it off to the dedicated handler, for the initial conneciton
- // this is where it determines wether or not to try and create the container and deployment, as attempt_connection itself is used in various diffrent contexts (like it will constantly
- // try to connect upon failing but it should not try to create the container and deployment every time it fails)
- async fn try_initial_connection(
- state: Arc<RwLock<AppState>>,
- ws_tx: broadcast::Sender<String>,
- tcp_tx: Arc<Mutex<mpsc::Sender<Vec<u8>>>>,
- ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Starting try_initial_connection");
- match attempt_connection().await {
- Ok(mut stream) => {
- println!("[DEBUG] Initial connection succeeded!");
- let (temp_tx, temp_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
- {
- let mut guard = tcp_tx.lock().await;
- *guard = temp_tx;
- }
- handle_stream(state,Arc::new(Mutex::new(temp_rx)), &mut stream, ws_tx).await
- }
- Err(e) => {
- eprintln!("[DEBUG] Initial connection failed: {}", e);
- Err(e)
- }
- }
- }
- #[tokio::main]
- async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- println!("[DEBUG] Starting server...");
- println!("[DEBUG] Establishing database connection...");
- let conn = first_connection().await?;
- let database = database::Database::new(Some(conn));
- println!("[DEBUG] Loading configuration...");
- let verbose = std::env::var("VERBOSE").is_ok();
- let base_path = std::env::var("SITE_URL")
- .map(|s| {
- let mut s = s.trim().to_string();
- if !s.is_empty() {
- if !s.starts_with('/') { s.insert(0, '/'); }
- if s.ends_with('/') && s != "/" { s.pop(); }
- }
- s
- })
- .unwrap_or_default();
- // Overrides for testing or specific cases where how it worksin a setup may be diffrent
- const ENABLE_K8S_CLIENT: bool = true;
- const ENABLE_INITIAL_CONNECTION: bool = false;
- const FORCE_REBUILD: bool = false;
- const BUILD_DOCKER_IMAGE: bool = true;
- const BUILD_DEPLOYMENT: bool = true;
- println!("[DEBUG] Creating communication channels...");
- let (ws_tx, _) = broadcast::channel::<String>(CHANNEL_BUFFER_SIZE);
- let (tcp_tx, tcp_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
- println!("[DEBUG] Initializing Kubernetes client if enabled...");
- let mut client: Option<Client> = None;
- if ENABLE_K8S_CLIENT && K8S_WORKS {
- println!("[DEBUG] Creating Kubernetes client...");
- client = Some(Client::try_default().await?);
- }
- println!("[DEBUG] Creating application state...");
- let state = AppState {
- gameserver: json!({}),
- status: Status::Down,
- tcp_tx: Arc::new(Mutex::new(tcp_tx)),
- tcp_rx: Arc::new(Mutex::new(tcp_rx)),
- ws_tx: ws_tx.clone(),
- base_path: base_path.clone(),
- peer_addr: None,
- database,
- client,
- };
- let multifaceted_state = Arc::new(RwLock::new(state));
- if ENABLE_INITIAL_CONNECTION && multifaceted_state.write().await.client.is_some() {
- println!("[DEBUG] Trying initial connection...");
- if try_initial_connection(multifaceted_state.clone(), ws_tx.clone(), multifaceted_state.write().await.tcp_tx.clone()).await.is_err() || FORCE_REBUILD {
- eprintln!("[DEBUG] Initial connection failed or force rebuild enabled");
- if BUILD_DOCKER_IMAGE {
- println!("[DEBUG] Building Docker image...");
- docker::build_docker_image().await?;
- }
- if BUILD_DEPLOYMENT {
- println!("[DEBUG] Creating Kubernetes deployment...");
- kubernetes::create_k8s_deployment(multifaceted_state.write().await.client.as_ref().unwrap()).await?;
- }
- }
- }
- println!("[DEBUG] Starting server connection task...");
- let bridge_rx = multifaceted_state.write().await.tcp_rx.clone();
- let bridge_tx = multifaceted_state.write().await.ws_tx.clone();
- let server_connection_state = multifaceted_state.clone();
- tokio::spawn(async move {
- println!("[DEBUG] Server connection task started");
- let server_connecting_state_write = server_connection_state.write().await;
- if let Err(e) = connect_to_server(&server_connecting_state_write, bridge_rx, bridge_tx).await {
- eprintln!("[DEBUG] Connection task failed: {}", e);
- }
- });
- println!("[DEBUG] Setting up CORS and routes...");
- let cors = CorsLayer::new()
- .allow_origin(CorsAny)
- .allow_methods([Method::GET, Method::POST])
- .allow_headers(CorsAny);
- let fallback_router = routes_static(multifaceted_state.write().await.clone().into());
- let inner = Router::new()
- .route("/api/message", get(get_message))
- .route("/api/nodes", get(get_nodes))
- .route("/api/servers", get(get_servers))
- .route("/api/users", get(users))
- .route("/api/awaitserverstatus", get(ongoing_server_status))
- .route("/api/addnode", post(add_node))
- .route("/api/edituser", post(edit_user))
- .route("/api/getuser", post(get_user))
- .route("/api/send", post(receive_message))
- .route("/api/general", post(process_general))
- .route("/api/signin", post(sign_in))
- .route("/api/createuser", post(create_user))
- .route("/api/deleteuser", post(delete_user))
- .merge(fallback_router)
- .with_state(multifaceted_state.write().await.clone());
- let normal_routes = Router::new()
- .merge(inner);
- let app = if base_path.is_empty() || base_path == "/" {
- println!("[DEBUG] Using root path for routes");
- normal_routes.layer(cors)
- } else {
- println!("[DEBUG] Using base path '{}' for routes", base_path);
- Router::new().nest(&base_path, normal_routes).layer(cors)
- };
- println!("[DEBUG] Starting web server...");
- let addr: SocketAddr = LocalUrl.parse().unwrap();
- println!("[DEBUG] Listening on http://{}{}", addr, base_path);
- let listener = TcpListener::bind(addr).await?;
- axum::serve(listener, app.into_make_service())
- .await?;
- Ok(())
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement