Advertisement
Guest User

server_with_pooling

a guest
Jul 12th, 2024
42
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.70 KB | None | 0 0
  1. // server with pooling
  2.  
  3. use std::net::{TcpListener, TcpStream};
  4. use std::io::{Read, Write, Error as IoError};
  5. use std::thread;
  6. use std::sync::{Arc, Mutex};
  7. use std::collections::HashMap;
  8. use std::time::{Duration, Instant};
  9.  
  10. struct PooledConnection {
  11. stream: TcpStream,
  12. last_used: Instant,
  13. }
  14.  
  15. fn main() {
  16. let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
  17. println!("Load balancer listening on port 8080");
  18.  
  19. let servers = Arc::new(Mutex::new(vec![
  20. "127.0.0.1:8081".to_string(),
  21. "127.0.0.1:8082".to_string(),
  22. ]));
  23.  
  24. let counter = Arc::new(Mutex::new(0));
  25. let connection_pool = Arc::new(Mutex::new(HashMap::new()));
  26.  
  27. for stream in listener.incoming() {
  28. let stream = stream.unwrap();
  29. let servers = Arc::clone(&servers);
  30. let counter = Arc::clone(&counter);
  31. let pool = Arc::clone(&connection_pool);
  32.  
  33. thread::spawn(move || {
  34. if let Err(e) = handle_connection(stream, servers, counter, pool) {
  35. eprintln!("Error handling connection: {:?}", e);
  36. }
  37. });
  38. }
  39. }
  40.  
  41. fn handle_connection(
  42. mut client_stream: TcpStream,
  43. servers: Arc<Mutex<Vec<String>>>,
  44. counter: Arc<Mutex<usize>>,
  45. pool: Arc<Mutex<HashMap<String, PooledConnection>>>
  46. ) -> Result<(), IoError> {
  47. let mut buffer = [0; 1024];
  48. client_stream.read(&mut buffer)?;
  49.  
  50. if let Some(server_addr) = find_available_server(&servers, &counter) {
  51. let mut pool = pool.lock().unwrap();
  52. let server_stream = match pool.get_mut(&server_addr) {
  53. Some(conn) if conn.last_used.elapsed() < Duration::from_secs(30) => {
  54. // Reuse existing connection
  55. &mut conn.stream
  56. }
  57. _ => {
  58. // Create new connection
  59. let stream = TcpStream::connect(&server_addr)?;
  60. pool.insert(server_addr.clone(), PooledConnection {
  61. stream,
  62. last_used: Instant::now(),
  63. });
  64. &mut pool.get_mut(&server_addr).unwrap().stream
  65. }
  66. };
  67.  
  68. server_stream.write_all(&buffer)?;
  69. server_stream.flush()?;
  70.  
  71. let mut response = Vec::new();
  72. server_stream.read_to_end(&mut response)?;
  73.  
  74. if response.is_empty() {
  75. return Err(IoError::new(std::io::ErrorKind::UnexpectedEof, "Empty response from server"));
  76. }
  77.  
  78. client_stream.write_all(&response)?;
  79. client_stream.flush()?;
  80.  
  81. // Update last_used time
  82. if let Some(conn) = pool.get_mut(&server_addr) {
  83. conn.last_used = Instant::now();
  84. }
  85. } else {
  86. send_error_response(&mut client_stream, "All servers are currently unavailable")?;
  87. }
  88.  
  89. Ok(())
  90. }
  91.  
  92. fn find_available_server(servers: &Arc<Mutex<Vec<String>>>, counter: &Arc<Mutex<usize>>) -> Option<String> {
  93. let servers = servers.lock().unwrap();
  94. let mut counter = counter.lock().unwrap();
  95. let start_index = *counter % servers.len();
  96.  
  97. for i in 0..servers.len() {
  98. let index = (start_index + i) % servers.len();
  99. let server = &servers[index];
  100.  
  101. if TcpStream::connect_timeout(&server.parse().unwrap(), Duration::from_secs(1)).is_ok() {
  102. *counter = index + 1;
  103. return Some(server.clone());
  104. }
  105. }
  106.  
  107. None
  108. }
  109.  
  110. fn send_error_response(client_stream: &mut TcpStream, message: &str) -> Result<(), IoError> {
  111. let response = format!(
  112. "HTTP/1.1 503 Service Unavailable\r\nContent-Type: text/plain\r\n\r\n{}",
  113. message
  114. );
  115. client_stream.write_all(response.as_bytes())?;
  116. client_stream.flush()?;
  117. Ok(())
  118. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement