Guest User

Untitled

a guest
Nov 19th, 2025
27
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.82 KB | None | 0 0
  1. #[derive(Debug)]
  2. pub struct ProcessSpawningError {
  3. pub message: String,
  4. }
  5.  
  6. impl ProcessSpawningError {
  7. pub fn no_stdout() -> Self {
  8. ProcessSpawningError {
  9. message: "Failed to capture stdout".to_string(),
  10. }
  11. }
  12.  
  13. pub fn no_stderr() -> Self {
  14. ProcessSpawningError {
  15. message: "Failed to capture stderr".to_string(),
  16. }
  17. }
  18. }
  19.  
  20. impl std::fmt::Display for ProcessSpawningError {
  21. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  22. write!(f, "ProcessSpawningError: {}", self.message)
  23. }
  24. }
  25.  
  26. impl std::error::Error for ProcessSpawningError {}
  27.  
  28. impl From<std::io::Error> for ProcessSpawningError {
  29. fn from(error: std::io::Error) -> Self {
  30. ProcessSpawningError {
  31. message: error.to_string(),
  32. }
  33. }
  34. }
  35.  
  36. struct LogFile {
  37. file: Option<tokio::fs::File>,
  38. }
  39.  
  40. impl LogFile {
  41. async fn new(log_file_path: Option<String>) -> std::io::Result<Self> {
  42. let file = if let Some(path) = log_file_path {
  43. // Ensure directory exists
  44. if let Some(parent) = Path::new(&path).parent() {
  45. fs::create_dir_all(parent).await?;
  46. }
  47.  
  48. // Open file with timestamp
  49. let timestamp = chrono::prelude::Utc::now();
  50. let file = OpenOptions::new()
  51. .create(true)
  52. .append(true)
  53. .open(format!("{}.{}", path, timestamp))
  54. .await?;
  55. Some(file)
  56. } else {
  57. None
  58. };
  59.  
  60. Ok(LogFile { file })
  61. }
  62.  
  63. async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
  64. if let Some(f) = &mut self.file {
  65. f.write_all(format!("{}\n", line).as_bytes()).await?;
  66. }
  67. Ok(())
  68. }
  69. }
  70.  
  71. /// Internal message type for communicating between stream readers and collector
  72. enum LogMessage {
  73. Stdout(String),
  74. Stderr(String),
  75. }
  76.  
  77. impl std::fmt::Display for LogMessage {
  78. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  79. match self {
  80. LogMessage::Stdout(line) => write!(f, "[OUT] {}", line),
  81. LogMessage::Stderr(line) => write!(f, "[ERR] {}", line),
  82. }
  83. }
  84. }
  85.  
  86. #[derive(Clone, Debug)]
  87. pub struct ProcessDescription {
  88. pub command: String,
  89. pub args: Vec<String>,
  90. pub work_dir: Option<String>,
  91. pub log_file_out: Option<String>,
  92. pub log_file_err: Option<String>,
  93. }
  94.  
  95. #[derive(Clone, Debug)]
  96. pub enum ProcessStatus {
  97. Exited(Option<i32>),
  98. }
  99.  
  100. impl From<Option<i32>> for ProcessStatus {
  101. fn from(code: Option<i32>) -> Self {
  102. ProcessStatus::Exited(code)
  103. }
  104. }
  105.  
  106. pub struct Process {
  107. pub log_lines: Vec<String>,
  108. pub status: ProcessStatus,
  109. }
  110.  
  111. impl Process {
  112. /// Spawns a task to read from a stream and send messages through a channel
  113. /// Returns a LogFile for writing received messages to disk
  114. async fn spawn_stream_reader<F>(
  115. stream: impl tokio::io::AsyncRead + Unpin + Send + 'static,
  116. log_file_path: Option<String>,
  117. tx: mpsc::Sender<LogMessage>,
  118. msg_constructor: F,
  119. ) -> std::io::Result<LogFile>
  120. where
  121. F: Fn(String) -> LogMessage + Send + 'static,
  122. {
  123. // Create log file (with directory creation)
  124. let log_file = LogFile::new(log_file_path).await?;
  125.  
  126. tokio::spawn(async move {
  127. let mut reader = BufReader::new(stream).lines();
  128. while let Ok(Some(line)) = reader.next_line().await {
  129. let _ = tx.send(msg_constructor(line)).await;
  130. }
  131. });
  132.  
  133. Ok(log_file)
  134. }
  135.  
  136. pub async fn spawn(
  137. process_description: ProcessDescription,
  138. env: &HashMap<String, String>,
  139. ) -> Result<Self, ProcessSpawningError> {
  140. let mut child = Command::new(process_description.command)
  141. .args(process_description.args)
  142. .envs(env)
  143. .current_dir(
  144. process_description
  145. .work_dir
  146. .unwrap_or(".".to_string()),
  147. )
  148. .stdout(std::process::Stdio::piped())
  149. .stderr(std::process::Stdio::piped())
  150. .spawn()?;
  151.  
  152. let stdout = child.stdout.take().ok_or_else(ProcessSpawningError::no_stdout)?;
  153. let stderr = child.stderr.take().ok_or_else(ProcessSpawningError::no_stderr)?;
  154.  
  155. let mut log_lines = Vec::new();
  156.  
  157. // Create channel for log messages
  158. let (log_tx, mut log_rx) = mpsc::channel::<LogMessage>(100);
  159.  
  160. // Spawn stream readers and get log files
  161. let mut out_log_file = Self::spawn_stream_reader(stdout, process_description.log_file_out, log_tx.clone(), LogMessage::Stdout).await?;
  162. let mut err_log_file = Self::spawn_stream_reader(stderr, process_description.log_file_err, log_tx, LogMessage::Stderr).await?;
  163.  
  164. while let Some(msg) = log_rx.recv().await {
  165. // Use Display implementation for formatting
  166. log_lines.push(msg.to_string());
  167.  
  168. // Write to appropriate log file
  169. let result = match &msg {
  170. LogMessage::Stdout(line) => out_log_file.write_line(line).await,
  171. LogMessage::Stderr(line) => err_log_file.write_line(line).await,
  172. };
  173.  
  174. if let Err(e) = result {
  175. log_lines.push(format!("[ERR] Failed to write to log: {}", e));
  176. }
  177. }
  178.  
  179. // Channel closed - both reader tasks finished
  180. // Now wait for the child process to exit and capture exit code
  181. let exit_status = child.wait().await;
  182. let status = exit_status.ok().and_then(|s| s.code()).into();
  183.  
  184. Ok(Self {
  185. log_lines,
  186. status,
  187. })
  188. }
  189. }
Advertisement
Add Comment
Please, Sign In to add comment