Advertisement
Guest User

Untitled

a guest
Jul 25th, 2016
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.04 KB | None | 0 0
  1. #[macro_use] extern crate hyper;
  2. extern crate rand;
  3.  
  4. use std::io::{BufReader};
  5. use std::io::prelude::*;
  6. use std::fs::OpenOptions;
  7. use std::collections::BTreeMap;
  8. use std::iter::Iterator;
  9. use hyper::client::Client;
  10. use hyper::header::Basic;
  11. use hyper::header::{Headers,ContentType,Authorization};
  12. use hyper::mime::{Mime, TopLevel, SubLevel,Attr, Value};
  13. use std::io::SeekFrom;
  14. use rand::Rng;
  15. use std::process::Command;
  16. use std::time::SystemTime;
  17. use std::fs::File;
  18. use std::env;
  19. use std::sync::{Once, ONCE_INIT};
  20. use std::mem;
  21. use std::rc::Rc;
  22. use std::cell::RefCell;
  23.  
  24. type Cache = Rc<RefCell<BTreeMap<String,SystemTime>>>;
  25. static mut CACHE: *const Cache = 0 as *const Cache;
  26. static ONCE: Once = ONCE_INIT;
  27.  
  28. fn check(app_id:&str) -> bool {
  29. let my_cache = unsafe {
  30. ONCE.call_once(|| {
  31. let cache:Cache = Rc::new(RefCell::new(BTreeMap::new()));
  32. CACHE = mem::transmute(Box::new(cache));
  33. });
  34. (*CACHE).clone()
  35. };
  36. let mut new = false;
  37. let result = match my_cache.borrow_mut().entry(String::from(app_id)).or_insert_with(|| {
  38. new = true;
  39. SystemTime::now()
  40. }).elapsed() {
  41. Ok(t) => new || t.as_secs() > 300,
  42. Err(_) => false,
  43. };
  44. return result;
  45. }
  46.  
  47. fn alert(app_id:&str,data:Option<&mut InnerValue>) {
  48. if data.is_some() {
  49. if check(app_id) {
  50. let c = Client::new();
  51. let boundary = rand::thread_rng()
  52. .gen_ascii_chars()
  53. .take(20)
  54. .collect::<String>();
  55. let mut header = Headers::new();
  56. header.set(ContentType(Mime(
  57. TopLevel::Multipart,SubLevel::FormData,
  58. vec![(Attr::Ext(String::from("boundary")),
  59. Value::Ext(boundary.clone()))])));
  60. header.set(Authorization(
  61. Basic{
  62. username:String::from(""),
  63. password:Some(String::from("")),
  64. }
  65. ));
  66. let text:String = format!("{} 应用短信失败率过高。{:?}",app_id,data);
  67. let mut body = BTreeMap::new();
  68. body.insert("from","");
  69. body.insert("to","");
  70. body.insert("subject","");
  71. body.insert("text",&text[..]);
  72. let bs = body.iter().fold(
  73. String::new(),
  74. |acc,x| {
  75. format!("{}--{}\r\nContent-Disposition: form-data; name=\"{}\"\r\n\r\n{}\r\n",&acc[..],&boundary[..],x.0,x.1)
  76. });
  77. let body_str = format!("{}--{}--",&bs[..],&boundary[..]);
  78. let result = c.post("")
  79. .headers(header)
  80. .body(&body_str[..])
  81. .send();
  82. let _ = result.map(|mut x| {
  83. let mut result_body:Vec<u8> = Vec::new();
  84. let _ = x.read_to_end(&mut result_body);
  85. println!("{}",String::from_utf8(result_body).unwrap_or(String::new()));
  86. });
  87. } else {
  88. println!("{}:{:?}",app_id,data);
  89. }
  90. }
  91. }
  92.  
  93. #[derive(Debug)]
  94. struct Data<'a> {
  95. _app_id:&'a str,
  96. _phone:&'a str,
  97. _type:&'a str,
  98. _channel:&'a str,
  99. _subchannel:&'a str,
  100. _success:bool,
  101. }
  102.  
  103. #[derive(Debug)]
  104. struct InnerValue {
  105. _success:u64,
  106. _fail:u64,
  107. _all:u64,
  108. _error:u64,
  109. }
  110.  
  111. impl InnerValue {
  112. fn new() -> Self {
  113. InnerValue{
  114. _success:0,
  115. _fail:0,
  116. _all:0,
  117. _error:0,
  118. }
  119. }
  120. }
  121.  
  122. #[derive(Debug)]
  123. struct Res {
  124. _data:BTreeMap<String,InnerValue>,
  125. }
  126.  
  127. impl Res {
  128. fn new() -> Self {
  129. Res {
  130. _data:BTreeMap::new(),
  131. }
  132. }
  133.  
  134. fn update_recv(&mut self,d:&Data) {
  135. let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new());
  136. if d._success {
  137. tmp._success += 1;
  138. } else {
  139. tmp._fail += 1
  140. }
  141. if ((tmp._success as f64 / tmp._all as f64) < 0.8) && (tmp._all > 200) {
  142. alert(d._app_id,Some(tmp));
  143. }
  144. }
  145.  
  146. fn update_send(&mut self,d:&Data) {
  147. let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new());
  148. tmp._all += 1;
  149. if !d._success {
  150. tmp._error += 1;
  151. }
  152. }
  153.  
  154. fn clean(&mut self) {
  155. {
  156. let mut tmp = self._data.iter().collect::<Vec<(&String,&InnerValue)>>();
  157. tmp.sort_by(|a,b| (a.1)._all.cmp(&(b.1)._all));
  158. for (k,v) in tmp {
  159. println!("{}:{:?}",k ,v);
  160. }
  161. }
  162. self._data.clear();
  163. }
  164. }
  165.  
  166. impl<'a> Data<'a> {
  167. #[allow(dead_code)]
  168. fn new() -> Data<'a> {
  169. Data {
  170. _app_id:"",
  171. _phone:"",
  172. _type:"",
  173. _channel:"",
  174. _subchannel:"",
  175. _success:false,
  176. }
  177. }
  178.  
  179. fn new_by_take(mut data:Box<Iterator<Item=&'a str> + 'a>,) -> Option<Data<'a>>{
  180. data.size_hint().1.and_then(|x| {
  181. if x == 6 {
  182. let data = Data {
  183. _app_id:data.next().unwrap(),
  184. _phone:data.next().unwrap(),
  185. _type:data.next().unwrap(),
  186. _channel:data.next().unwrap(),
  187. _subchannel:data.next().unwrap(),
  188. _success:{
  189. let tmp = data.next().unwrap();
  190. (tmp == "SUCCESS") || (tmp == "true")
  191. },
  192. };
  193. Some(data)
  194. } else {
  195. None
  196. }
  197. })
  198. }
  199. }
  200.  
  201. fn reduce_data(buf:&String,all_data:&mut Res)
  202. {
  203. if buf.find("Sms send").map(|x| {
  204. match Data::new_by_take(Box::new(buf
  205. .split_at(x)
  206. .1.split(':')
  207. .skip(1)
  208. .take(6))) {
  209. Some(d) => all_data.update_send(&d),
  210. None => println!("log error {}",x),
  211. }
  212. }).is_none() {
  213. buf.find("Sms receipt by").map(|x| {
  214. match Data::new_by_take(Box::new(buf
  215. .split_at(x)
  216. .1.split(':')
  217. .skip(1)
  218. .take(6))) {
  219. Some(d) => all_data.update_recv(&d),
  220. None => println!("log error {}",x),
  221. }
  222. });
  223. }
  224. }
  225. fn go(fd:&mut File, seek:&mut u64,empty_read:&mut u8, all_data:&mut Res) -> () {
  226. let _ = fd.seek(SeekFrom::Start(*seek));
  227. let mut fbuf = BufReader::new(fd);
  228. let mut buf = String::new();
  229. let mut empty = true;
  230. while fbuf.read_line(&mut buf).unwrap() > 0 {
  231. *seek += buf.as_bytes().len() as u64;
  232. empty = false;
  233. reduce_data(&buf,all_data);
  234. buf.clear();
  235. }
  236. if empty {
  237. *empty_read += 1;
  238. }
  239. }
  240.  
  241. fn log_file(pp:&String) -> String {
  242. let output = Command::new("sh")
  243. .arg("-c")
  244. .arg("date +\"uluru.log.%Y-%m-%d\"")
  245. .output()
  246. .expect("failed to execute proces");
  247. format!("{}/{}",pp,String::from_utf8(output.stdout).unwrap_or(String::new()))
  248. }
  249.  
  250. fn main() {
  251. let prefix_path = env::args().nth(1).unwrap_or(String::from("."));
  252. let mut seek:u64 = 0;
  253. let mut all_data = Res::new();
  254. let mut old_log = log_file(&prefix_path);
  255. print!("{}",old_log);
  256. let mut empty_read:u8 = 0;
  257. let mut fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap();
  258. loop {
  259. let log = log_file(&prefix_path);
  260. if empty_read > 10 {
  261. alert(&format!("已经五分钟没有日志写入 {}",old_log)[..],None);
  262. if old_log != log {
  263. seek = 0;
  264. empty_read = 0;
  265. all_data.clean();
  266. old_log = log;
  267. }
  268. fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap();
  269. }
  270. go(&mut fd,&mut seek,&mut empty_read, &mut all_data);
  271. println!("Sleep 30S old_log:{},seek:{},empty_read:{}",
  272. old_log.trim(),seek,empty_read);
  273. ::std::thread::sleep(::std::time::Duration::new(30,0));
  274. }
  275. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement