Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // server with pooling
- use std::net::{TcpListener, TcpStream};
- use std::io::{Read, Write, Error as IoError};
- use std::thread;
- use std::sync::{Arc, Mutex};
- use std::collections::HashMap;
- use std::time::{Duration, Instant};
- struct PooledConnection {
- stream: TcpStream,
- last_used: Instant,
- }
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
- println!("Load balancer listening on port 8080");
- let servers = Arc::new(Mutex::new(vec![
- "127.0.0.1:8081".to_string(),
- "127.0.0.1:8082".to_string(),
- ]));
- let counter = Arc::new(Mutex::new(0));
- let connection_pool = Arc::new(Mutex::new(HashMap::new()));
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- let servers = Arc::clone(&servers);
- let counter = Arc::clone(&counter);
- let pool = Arc::clone(&connection_pool);
- thread::spawn(move || {
- if let Err(e) = handle_connection(stream, servers, counter, pool) {
- eprintln!("Error handling connection: {:?}", e);
- }
- });
- }
- }
- fn handle_connection(
- mut client_stream: TcpStream,
- servers: Arc<Mutex<Vec<String>>>,
- counter: Arc<Mutex<usize>>,
- pool: Arc<Mutex<HashMap<String, PooledConnection>>>
- ) -> Result<(), IoError> {
- let mut buffer = [0; 1024];
- client_stream.read(&mut buffer)?;
- if let Some(server_addr) = find_available_server(&servers, &counter) {
- let mut pool = pool.lock().unwrap();
- let server_stream = match pool.get_mut(&server_addr) {
- Some(conn) if conn.last_used.elapsed() < Duration::from_secs(30) => {
- // Reuse existing connection
- &mut conn.stream
- }
- _ => {
- // Create new connection
- let stream = TcpStream::connect(&server_addr)?;
- pool.insert(server_addr.clone(), PooledConnection {
- stream,
- last_used: Instant::now(),
- });
- &mut pool.get_mut(&server_addr).unwrap().stream
- }
- };
- server_stream.write_all(&buffer)?;
- server_stream.flush()?;
- let mut response = Vec::new();
- server_stream.read_to_end(&mut response)?;
- if response.is_empty() {
- return Err(IoError::new(std::io::ErrorKind::UnexpectedEof, "Empty response from server"));
- }
- client_stream.write_all(&response)?;
- client_stream.flush()?;
- // Update last_used time
- if let Some(conn) = pool.get_mut(&server_addr) {
- conn.last_used = Instant::now();
- }
- } else {
- send_error_response(&mut client_stream, "All servers are currently unavailable")?;
- }
- Ok(())
- }
- fn find_available_server(servers: &Arc<Mutex<Vec<String>>>, counter: &Arc<Mutex<usize>>) -> Option<String> {
- let servers = servers.lock().unwrap();
- let mut counter = counter.lock().unwrap();
- let start_index = *counter % servers.len();
- for i in 0..servers.len() {
- let index = (start_index + i) % servers.len();
- let server = &servers[index];
- if TcpStream::connect_timeout(&server.parse().unwrap(), Duration::from_secs(1)).is_ok() {
- *counter = index + 1;
- return Some(server.clone());
- }
- }
- None
- }
- fn send_error_response(client_stream: &mut TcpStream, message: &str) -> Result<(), IoError> {
- let response = format!(
- "HTTP/1.1 503 Service Unavailable\r\nContent-Type: text/plain\r\n\r\n{}",
- message
- );
- client_stream.write_all(response.as_bytes())?;
- client_stream.flush()?;
- Ok(())
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement