Advertisement
SpiderLordCoder1st

Untitled

Jul 15th, 2025
15
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.13 KB | None | 0 0
  1. // for the initial connection attempt, which will determine if possibly I would need to create the container and deployment upon failure
  2. // i will use rusts 'timeout' for x interval determined with CONNECTION_TIMEOUT
  3. async fn attempt_connection() -> Result<TcpStream, Box<dyn std::error::Error + Send + Sync>> {
  4. println!("[DEBUG] Attempting TCP connection to {}", TcpUrl);
  5. timeout(CONNECTION_TIMEOUT, TcpStream::connect(TcpUrl)).await?.map_err(Into::into)
  6. }
  7.  
  8. // The websocket connection will be important for sending data to there and back
  9. // in this case, this is primarially used for transmitting console data to the frontend from the gameserver
  10. async fn handle_server_data(
  11. state: Arc<RwLock<AppState>>,
  12. data: &[u8],
  13. ws_tx: &broadcast::Sender<String>,
  14. ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
  15. println!("[DEBUG] Handling server data ({} bytes)", data.len());
  16.  
  17. if let Ok(text) = String::from_utf8(data.to_vec()) {
  18. println!("[DEBUG] Raw message from server: {}", text);
  19.  
  20. if let Ok(outer_msg) = serde_json::from_str::<InnerData>(&text) {
  21. println!("[DEBUG] Parsed outer message successfully");
  22. let inner_data_str = outer_msg.data.as_str();
  23. let mut borrowed_state = state.write().await;
  24.  
  25. if let Some(start_keyword) = borrowed_state.gameserver["start_keyword"].as_str() {
  26. println!("[DEBUG] Checking for start keyword: {}", start_keyword);
  27. if inner_data_str.contains(start_keyword) {
  28. println!("[DEBUG] Start keyword found - setting status to Up");
  29. borrowed_state.status = Status::Up;
  30. } else {
  31. println!("[DEBUG] Start keyword not found - setting status to Down");
  32. borrowed_state.status = Status::Down;
  33. }
  34. } else {
  35. eprintln!("[DEBUG] start_keyword is not a string");
  36. }
  37.  
  38. if let Ok(inner_data) = serde_json::from_str::<serde_json::Value>(inner_data_str) {
  39. println!("[DEBUG] Parsed inner data successfully");
  40. if ALLOW_NONJSON_DATA == true {
  41. if let Some(message_content) = inner_data["data"].as_str() {
  42. println!("[DEBUG] Extracted message: {}", message_content);
  43. let _ = ws_tx.send(message_content.to_string());
  44. }
  45. } else {
  46. println!("[DEBUG] Sending raw inner data: {}", inner_data_str);
  47. let _ = ws_tx.send(inner_data_str.to_string());
  48. }
  49. } else {
  50. println!("[DEBUG] Failed to parse inner data, sending raw");
  51. let _ = ws_tx.send(inner_data_str.to_string());
  52. }
  53. } else {
  54. println!("[DEBUG] Failed to parse outer message, sending raw text");
  55. let _ = ws_tx.send(text);
  56. }
  57. }
  58. Ok(())
  59. }
  60.  
  61. // this handles the main tcp stream between the gameserver and the client console, more specifically the data exchanged
  62. // as well as notifying the gameserver of its capabilities which at some point ill make dependent on what feature flag is set
  63. // `tokio::select!` is used to concurrently wait for either incoming TCP data or messages from the channel to send.
  64. async fn handle_stream(
  65. state: Arc<RwLock<AppState>>,
  66. rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
  67. stream: &mut TcpStream,
  68. ws_tx: broadcast::Sender<String>,
  69. ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
  70. println!("[DEBUG] Starting stream handling");
  71.  
  72. let (mut reader, mut writer) = stream.split();
  73. let mut buf = vec![0u8; 1024];
  74. let mut buf_reader = BufReader::new(reader);
  75. let mut line = String::new();
  76.  
  77. let capability_msg = serde_json::to_string(
  78. &List {
  79. list: ApiCalls::Capabilities(vec!["all".to_string()])
  80. }
  81. )? + "\n";
  82.  
  83. println!("[DEBUG] Sending capability message");
  84. writer.write_all(capability_msg.as_bytes()).await?;
  85.  
  86. match buf_reader.read_line(&mut line).await {
  87. Ok(0) => {
  88. println!("[DEBUG] Connection possibly closed after capability message");
  89. }
  90. Ok(_) => {
  91. println!("[DEBUG] Received capability response: {}", line.trim_end());
  92. }
  93. Err(e) => {
  94. println!("[DEBUG] Error reading capability response: {:?}", e);
  95. return Err(e.into());
  96. }
  97. }
  98.  
  99. let server_data_msg = serde_json::to_string(
  100. &MessagePayload {
  101. r#type: "command".to_string(),
  102. message: "server_data".to_string(),
  103. authcode: "0".to_string(),
  104. }
  105. )? + "\n";
  106.  
  107. println!("[DEBUG] Sending server data request");
  108. writer.write_all(server_data_msg.as_bytes()).await?;
  109.  
  110. match buf_reader.read_line(&mut line).await {
  111. Ok(0) => {
  112. println!("[DEBUG] Connection possibly closed after server data request");
  113. }
  114. Ok(_) => {
  115. println!("[DEBUG] Received server data: {}", line.trim_end());
  116. state.write().await.gameserver = serde_json::Value::String(line.clone());
  117. }
  118. Err(e) => {
  119. println!("[DEBUG] Error reading server data: {:?}", e);
  120. return Err(e.into());
  121. }
  122. }
  123.  
  124. reader = buf_reader.into_inner();
  125. println!("[DEBUG] Entering main stream loop");
  126.  
  127. loop {
  128. let mut rx_guard = rx.lock().await;
  129. tokio::select! {
  130. result = reader.read(&mut buf) => match result {
  131. Ok(0) => {
  132. println!("[DEBUG] Stream read returned 0 bytes - connection closed");
  133. return Ok(());
  134. },
  135. Ok(n) => {
  136. println!("[DEBUG] Received {} bytes from stream", n);
  137. handle_server_data(state.clone(), &buf[..n], &ws_tx).await?
  138. },
  139. Err(e) => {
  140. println!("[DEBUG] Stream read error: {:?}", e);
  141. return Err(e.into());
  142. },
  143. },
  144. result = rx_guard.recv() => if let Some(data) = result {
  145. println!("[DEBUG] Sending {} bytes to server", data.len());
  146. writer.write_all(&data).await?;
  147. writer.write_all(b"\n").await?;
  148. writer.flush().await?;
  149. } else {
  150. println!("[DEBUG] Channel closed - ending stream handling");
  151. return Ok(());
  152. }
  153. }
  154. }
  155. }
  156.  
  157. // does the connection to the tcp server, wether initial or not, on success it will pass it off to the dedicated handler for the stream
  158. async fn connect_to_server(
  159. state: &AppState,
  160. rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
  161. ws_tx: broadcast::Sender<String>,
  162. ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
  163. println!("[DEBUG] Starting connect_to_server loop");
  164.  
  165. loop {
  166. println!("[DEBUG] Trying to connect to {}…", TcpUrl);
  167. match timeout(CONNECTION_TIMEOUT, TcpStream::connect(TcpUrl)).await {
  168. Ok(Ok(mut stream)) => {
  169. println!("[DEBUG] Connection succeeded!");
  170. handle_stream(Arc::new(RwLock::new(state.clone())), rx.clone(), &mut stream, ws_tx.clone()).await?
  171. }
  172. Ok(Err(e)) => {
  173. eprintln!("[DEBUG] Connection error: {}", e);
  174. tokio::time::sleep(CONNECTION_RETRY_DELAY).await;
  175. }
  176. Err(_) => {
  177. eprintln!("[DEBUG] Connection timed out after {:?}", CONNECTION_TIMEOUT);
  178. tokio::time::sleep(CONNECTION_RETRY_DELAY).await;
  179. }
  180. }
  181. }
  182. }
  183.  
  184. // This function is soley for the initial connection to the tcp server, then passes it off to the dedicated handler, for the initial conneciton
  185. // this is where it determines wether or not to try and create the container and deployment, as attempt_connection itself is used in various diffrent contexts (like it will constantly
  186. // try to connect upon failing but it should not try to create the container and deployment every time it fails)
  187. async fn try_initial_connection(
  188. state: Arc<RwLock<AppState>>,
  189. ws_tx: broadcast::Sender<String>,
  190. tcp_tx: Arc<Mutex<mpsc::Sender<Vec<u8>>>>,
  191. ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
  192. println!("[DEBUG] Starting try_initial_connection");
  193.  
  194. match attempt_connection().await {
  195. Ok(mut stream) => {
  196. println!("[DEBUG] Initial connection succeeded!");
  197.  
  198. let (temp_tx, temp_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
  199.  
  200. {
  201. let mut guard = tcp_tx.lock().await;
  202. *guard = temp_tx;
  203. }
  204. handle_stream(state,Arc::new(Mutex::new(temp_rx)), &mut stream, ws_tx).await
  205. }
  206. Err(e) => {
  207. eprintln!("[DEBUG] Initial connection failed: {}", e);
  208. Err(e)
  209. }
  210. }
  211. }
  212.  
  213. #[tokio::main]
  214. async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
  215. println!("[DEBUG] Starting server...");
  216.  
  217. println!("[DEBUG] Establishing database connection...");
  218. let conn = first_connection().await?;
  219. let database = database::Database::new(Some(conn));
  220.  
  221. println!("[DEBUG] Loading configuration...");
  222. let verbose = std::env::var("VERBOSE").is_ok();
  223. let base_path = std::env::var("SITE_URL")
  224. .map(|s| {
  225. let mut s = s.trim().to_string();
  226. if !s.is_empty() {
  227. if !s.starts_with('/') { s.insert(0, '/'); }
  228. if s.ends_with('/') && s != "/" { s.pop(); }
  229. }
  230. s
  231. })
  232. .unwrap_or_default();
  233.  
  234. // Overrides for testing or specific cases where how it worksin a setup may be diffrent
  235. const ENABLE_K8S_CLIENT: bool = true;
  236. const ENABLE_INITIAL_CONNECTION: bool = false;
  237. const FORCE_REBUILD: bool = false;
  238. const BUILD_DOCKER_IMAGE: bool = true;
  239. const BUILD_DEPLOYMENT: bool = true;
  240.  
  241. println!("[DEBUG] Creating communication channels...");
  242. let (ws_tx, _) = broadcast::channel::<String>(CHANNEL_BUFFER_SIZE);
  243. let (tcp_tx, tcp_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_BUFFER_SIZE);
  244.  
  245. println!("[DEBUG] Initializing Kubernetes client if enabled...");
  246. let mut client: Option<Client> = None;
  247. if ENABLE_K8S_CLIENT && K8S_WORKS {
  248. println!("[DEBUG] Creating Kubernetes client...");
  249. client = Some(Client::try_default().await?);
  250. }
  251.  
  252. println!("[DEBUG] Creating application state...");
  253. let state = AppState {
  254. gameserver: json!({}),
  255. status: Status::Down,
  256. tcp_tx: Arc::new(Mutex::new(tcp_tx)),
  257. tcp_rx: Arc::new(Mutex::new(tcp_rx)),
  258. ws_tx: ws_tx.clone(),
  259. base_path: base_path.clone(),
  260. peer_addr: None,
  261. database,
  262. client,
  263. };
  264.  
  265. let multifaceted_state = Arc::new(RwLock::new(state));
  266.  
  267. if ENABLE_INITIAL_CONNECTION && multifaceted_state.write().await.client.is_some() {
  268. println!("[DEBUG] Trying initial connection...");
  269. if try_initial_connection(multifaceted_state.clone(), ws_tx.clone(), multifaceted_state.write().await.tcp_tx.clone()).await.is_err() || FORCE_REBUILD {
  270. eprintln!("[DEBUG] Initial connection failed or force rebuild enabled");
  271. if BUILD_DOCKER_IMAGE {
  272. println!("[DEBUG] Building Docker image...");
  273. docker::build_docker_image().await?;
  274. }
  275. if BUILD_DEPLOYMENT {
  276. println!("[DEBUG] Creating Kubernetes deployment...");
  277. kubernetes::create_k8s_deployment(multifaceted_state.write().await.client.as_ref().unwrap()).await?;
  278. }
  279. }
  280. }
  281.  
  282. println!("[DEBUG] Starting server connection task...");
  283. let bridge_rx = multifaceted_state.write().await.tcp_rx.clone();
  284. let bridge_tx = multifaceted_state.write().await.ws_tx.clone();
  285.  
  286. let server_connection_state = multifaceted_state.clone();
  287. tokio::spawn(async move {
  288. println!("[DEBUG] Server connection task started");
  289. let server_connecting_state_write = server_connection_state.write().await;
  290. if let Err(e) = connect_to_server(&server_connecting_state_write, bridge_rx, bridge_tx).await {
  291. eprintln!("[DEBUG] Connection task failed: {}", e);
  292. }
  293. });
  294.  
  295. println!("[DEBUG] Setting up CORS and routes...");
  296. let cors = CorsLayer::new()
  297. .allow_origin(CorsAny)
  298. .allow_methods([Method::GET, Method::POST])
  299. .allow_headers(CorsAny);
  300.  
  301. let fallback_router = routes_static(multifaceted_state.write().await.clone().into());
  302.  
  303. let inner = Router::new()
  304. .route("/api/message", get(get_message))
  305. .route("/api/nodes", get(get_nodes))
  306. .route("/api/servers", get(get_servers))
  307. .route("/api/users", get(users))
  308. .route("/api/awaitserverstatus", get(ongoing_server_status))
  309. .route("/api/addnode", post(add_node))
  310. .route("/api/edituser", post(edit_user))
  311. .route("/api/getuser", post(get_user))
  312. .route("/api/send", post(receive_message))
  313. .route("/api/general", post(process_general))
  314. .route("/api/signin", post(sign_in))
  315. .route("/api/createuser", post(create_user))
  316. .route("/api/deleteuser", post(delete_user))
  317. .merge(fallback_router)
  318. .with_state(multifaceted_state.write().await.clone());
  319.  
  320. let normal_routes = Router::new()
  321. .merge(inner);
  322.  
  323. let app = if base_path.is_empty() || base_path == "/" {
  324. println!("[DEBUG] Using root path for routes");
  325. normal_routes.layer(cors)
  326. } else {
  327. println!("[DEBUG] Using base path '{}' for routes", base_path);
  328. Router::new().nest(&base_path, normal_routes).layer(cors)
  329. };
  330.  
  331. println!("[DEBUG] Starting web server...");
  332. let addr: SocketAddr = LocalUrl.parse().unwrap();
  333. println!("[DEBUG] Listening on http://{}{}", addr, base_path);
  334.  
  335. let listener = TcpListener::bind(addr).await?;
  336. axum::serve(listener, app.into_make_service())
  337. .await?;
  338.  
  339. Ok(())
  340. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement