Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #[derive(Debug)]
- pub struct ProcessSpawningError {
- pub message: String,
- }
- impl ProcessSpawningError {
- pub fn no_stdout() -> Self {
- ProcessSpawningError {
- message: "Failed to capture stdout".to_string(),
- }
- }
- pub fn no_stderr() -> Self {
- ProcessSpawningError {
- message: "Failed to capture stderr".to_string(),
- }
- }
- }
- impl std::fmt::Display for ProcessSpawningError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "ProcessSpawningError: {}", self.message)
- }
- }
- impl std::error::Error for ProcessSpawningError {}
- impl From<std::io::Error> for ProcessSpawningError {
- fn from(error: std::io::Error) -> Self {
- ProcessSpawningError {
- message: error.to_string(),
- }
- }
- }
- struct LogFile {
- file: Option<tokio::fs::File>,
- }
- impl LogFile {
- async fn new(log_file_path: Option<String>) -> std::io::Result<Self> {
- let file = if let Some(path) = log_file_path {
- // Ensure directory exists
- if let Some(parent) = Path::new(&path).parent() {
- fs::create_dir_all(parent).await?;
- }
- // Open file with timestamp
- let timestamp = chrono::prelude::Utc::now();
- let file = OpenOptions::new()
- .create(true)
- .append(true)
- .open(format!("{}.{}", path, timestamp))
- .await?;
- Some(file)
- } else {
- None
- };
- Ok(LogFile { file })
- }
- async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
- if let Some(f) = &mut self.file {
- f.write_all(format!("{}\n", line).as_bytes()).await?;
- }
- Ok(())
- }
- }
- /// Internal message type for communicating between stream readers and collector
- enum LogMessage {
- Stdout(String),
- Stderr(String),
- }
- impl std::fmt::Display for LogMessage {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- LogMessage::Stdout(line) => write!(f, "[OUT] {}", line),
- LogMessage::Stderr(line) => write!(f, "[ERR] {}", line),
- }
- }
- }
- #[derive(Clone, Debug)]
- pub struct ProcessDescription {
- pub command: String,
- pub args: Vec<String>,
- pub work_dir: Option<String>,
- pub log_file_out: Option<String>,
- pub log_file_err: Option<String>,
- }
- #[derive(Clone, Debug)]
- pub enum ProcessStatus {
- Exited(Option<i32>),
- }
- impl From<Option<i32>> for ProcessStatus {
- fn from(code: Option<i32>) -> Self {
- ProcessStatus::Exited(code)
- }
- }
- pub struct Process {
- pub log_lines: Vec<String>,
- pub status: ProcessStatus,
- }
- impl Process {
- /// Spawns a task to read from a stream and send messages through a channel
- /// Returns a LogFile for writing received messages to disk
- async fn spawn_stream_reader<F>(
- stream: impl tokio::io::AsyncRead + Unpin + Send + 'static,
- log_file_path: Option<String>,
- tx: mpsc::Sender<LogMessage>,
- msg_constructor: F,
- ) -> std::io::Result<LogFile>
- where
- F: Fn(String) -> LogMessage + Send + 'static,
- {
- // Create log file (with directory creation)
- let log_file = LogFile::new(log_file_path).await?;
- tokio::spawn(async move {
- let mut reader = BufReader::new(stream).lines();
- while let Ok(Some(line)) = reader.next_line().await {
- let _ = tx.send(msg_constructor(line)).await;
- }
- });
- Ok(log_file)
- }
- pub async fn spawn(
- process_description: ProcessDescription,
- env: &HashMap<String, String>,
- ) -> Result<Self, ProcessSpawningError> {
- let mut child = Command::new(process_description.command)
- .args(process_description.args)
- .envs(env)
- .current_dir(
- process_description
- .work_dir
- .unwrap_or(".".to_string()),
- )
- .stdout(std::process::Stdio::piped())
- .stderr(std::process::Stdio::piped())
- .spawn()?;
- let stdout = child.stdout.take().ok_or_else(ProcessSpawningError::no_stdout)?;
- let stderr = child.stderr.take().ok_or_else(ProcessSpawningError::no_stderr)?;
- let mut log_lines = Vec::new();
- // Create channel for log messages
- let (log_tx, mut log_rx) = mpsc::channel::<LogMessage>(100);
- // Spawn stream readers and get log files
- let mut out_log_file = Self::spawn_stream_reader(stdout, process_description.log_file_out, log_tx.clone(), LogMessage::Stdout).await?;
- let mut err_log_file = Self::spawn_stream_reader(stderr, process_description.log_file_err, log_tx, LogMessage::Stderr).await?;
- while let Some(msg) = log_rx.recv().await {
- // Use Display implementation for formatting
- log_lines.push(msg.to_string());
- // Write to appropriate log file
- let result = match &msg {
- LogMessage::Stdout(line) => out_log_file.write_line(line).await,
- LogMessage::Stderr(line) => err_log_file.write_line(line).await,
- };
- if let Err(e) = result {
- log_lines.push(format!("[ERR] Failed to write to log: {}", e));
- }
- }
- // Channel closed - both reader tasks finished
- // Now wait for the child process to exit and capture exit code
- let exit_status = child.wait().await;
- let status = exit_status.ok().and_then(|s| s.code()).into();
- Ok(Self {
- log_lines,
- status,
- })
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment