Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #[macro_use] extern crate hyper;
- extern crate rand;
- use std::io::{BufReader};
- use std::io::prelude::*;
- use std::fs::OpenOptions;
- use std::collections::BTreeMap;
- use std::iter::Iterator;
- use hyper::client::Client;
- use hyper::header::Basic;
- use hyper::header::{Headers,ContentType,Authorization};
- use hyper::mime::{Mime, TopLevel, SubLevel,Attr, Value};
- use std::io::SeekFrom;
- use rand::Rng;
- use std::process::Command;
- use std::time::SystemTime;
- use std::fs::File;
- use std::env;
- use std::sync::{Once, ONCE_INIT};
- use std::mem;
- use std::rc::Rc;
- use std::cell::RefCell;
- type Cache = Rc<RefCell<BTreeMap<String,SystemTime>>>;
- static mut CACHE: *const Cache = 0 as *const Cache;
- static ONCE: Once = ONCE_INIT;
- fn check(app_id:&str) -> bool {
- let my_cache = unsafe {
- ONCE.call_once(|| {
- let cache:Cache = Rc::new(RefCell::new(BTreeMap::new()));
- CACHE = mem::transmute(Box::new(cache));
- });
- (*CACHE).clone()
- };
- let mut new = false;
- let result = match my_cache.borrow_mut().entry(String::from(app_id)).or_insert_with(|| {
- new = true;
- SystemTime::now()
- }).elapsed() {
- Ok(t) => new || t.as_secs() > 300,
- Err(_) => false,
- };
- return result;
- }
- fn alert(app_id:&str,data:Option<&mut InnerValue>) {
- if data.is_some() {
- if check(app_id) {
- let c = Client::new();
- let boundary = rand::thread_rng()
- .gen_ascii_chars()
- .take(20)
- .collect::<String>();
- let mut header = Headers::new();
- header.set(ContentType(Mime(
- TopLevel::Multipart,SubLevel::FormData,
- vec![(Attr::Ext(String::from("boundary")),
- Value::Ext(boundary.clone()))])));
- header.set(Authorization(
- Basic{
- username:String::from(""),
- password:Some(String::from("")),
- }
- ));
- let text:String = format!("{} 应用短信失败率过高。{:?}",app_id,data);
- let mut body = BTreeMap::new();
- body.insert("from","");
- body.insert("to","");
- body.insert("subject","");
- body.insert("text",&text[..]);
- let bs = body.iter().fold(
- String::new(),
- |acc,x| {
- format!("{}--{}\r\nContent-Disposition: form-data; name=\"{}\"\r\n\r\n{}\r\n",&acc[..],&boundary[..],x.0,x.1)
- });
- let body_str = format!("{}--{}--",&bs[..],&boundary[..]);
- let result = c.post("")
- .headers(header)
- .body(&body_str[..])
- .send();
- let _ = result.map(|mut x| {
- let mut result_body:Vec<u8> = Vec::new();
- let _ = x.read_to_end(&mut result_body);
- println!("{}",String::from_utf8(result_body).unwrap_or(String::new()));
- });
- } else {
- println!("{}:{:?}",app_id,data);
- }
- }
- }
- #[derive(Debug)]
- struct Data<'a> {
- _app_id:&'a str,
- _phone:&'a str,
- _type:&'a str,
- _channel:&'a str,
- _subchannel:&'a str,
- _success:bool,
- }
- #[derive(Debug)]
- struct InnerValue {
- _success:u64,
- _fail:u64,
- _all:u64,
- _error:u64,
- }
- impl InnerValue {
- fn new() -> Self {
- InnerValue{
- _success:0,
- _fail:0,
- _all:0,
- _error:0,
- }
- }
- }
- #[derive(Debug)]
- struct Res {
- _data:BTreeMap<String,InnerValue>,
- }
- impl Res {
- fn new() -> Self {
- Res {
- _data:BTreeMap::new(),
- }
- }
- fn update_recv(&mut self,d:&Data) {
- let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new());
- if d._success {
- tmp._success += 1;
- } else {
- tmp._fail += 1
- }
- if ((tmp._success as f64 / tmp._all as f64) < 0.8) && (tmp._all > 200) {
- alert(d._app_id,Some(tmp));
- }
- }
- fn update_send(&mut self,d:&Data) {
- let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new());
- tmp._all += 1;
- if !d._success {
- tmp._error += 1;
- }
- }
- fn clean(&mut self) {
- {
- let mut tmp = self._data.iter().collect::<Vec<(&String,&InnerValue)>>();
- tmp.sort_by(|a,b| (a.1)._all.cmp(&(b.1)._all));
- for (k,v) in tmp {
- println!("{}:{:?}",k ,v);
- }
- }
- self._data.clear();
- }
- }
- impl<'a> Data<'a> {
- #[allow(dead_code)]
- fn new() -> Data<'a> {
- Data {
- _app_id:"",
- _phone:"",
- _type:"",
- _channel:"",
- _subchannel:"",
- _success:false,
- }
- }
- fn new_by_take(mut data:Box<Iterator<Item=&'a str> + 'a>,) -> Option<Data<'a>>{
- data.size_hint().1.and_then(|x| {
- if x == 6 {
- let data = Data {
- _app_id:data.next().unwrap(),
- _phone:data.next().unwrap(),
- _type:data.next().unwrap(),
- _channel:data.next().unwrap(),
- _subchannel:data.next().unwrap(),
- _success:{
- let tmp = data.next().unwrap();
- (tmp == "SUCCESS") || (tmp == "true")
- },
- };
- Some(data)
- } else {
- None
- }
- })
- }
- }
- fn reduce_data(buf:&String,all_data:&mut Res)
- {
- if buf.find("Sms send").map(|x| {
- match Data::new_by_take(Box::new(buf
- .split_at(x)
- .1.split(':')
- .skip(1)
- .take(6))) {
- Some(d) => all_data.update_send(&d),
- None => println!("log error {}",x),
- }
- }).is_none() {
- buf.find("Sms receipt by").map(|x| {
- match Data::new_by_take(Box::new(buf
- .split_at(x)
- .1.split(':')
- .skip(1)
- .take(6))) {
- Some(d) => all_data.update_recv(&d),
- None => println!("log error {}",x),
- }
- });
- }
- }
- fn go(fd:&mut File, seek:&mut u64,empty_read:&mut u8, all_data:&mut Res) -> () {
- let _ = fd.seek(SeekFrom::Start(*seek));
- let mut fbuf = BufReader::new(fd);
- let mut buf = String::new();
- let mut empty = true;
- while fbuf.read_line(&mut buf).unwrap() > 0 {
- *seek += buf.as_bytes().len() as u64;
- empty = false;
- reduce_data(&buf,all_data);
- buf.clear();
- }
- if empty {
- *empty_read += 1;
- }
- }
- fn log_file(pp:&String) -> String {
- let output = Command::new("sh")
- .arg("-c")
- .arg("date +\"uluru.log.%Y-%m-%d\"")
- .output()
- .expect("failed to execute proces");
- format!("{}/{}",pp,String::from_utf8(output.stdout).unwrap_or(String::new()))
- }
- fn main() {
- let prefix_path = env::args().nth(1).unwrap_or(String::from("."));
- let mut seek:u64 = 0;
- let mut all_data = Res::new();
- let mut old_log = log_file(&prefix_path);
- print!("{}",old_log);
- let mut empty_read:u8 = 0;
- let mut fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap();
- loop {
- let log = log_file(&prefix_path);
- if empty_read > 10 {
- alert(&format!("已经五分钟没有日志写入 {}",old_log)[..],None);
- if old_log != log {
- seek = 0;
- empty_read = 0;
- all_data.clean();
- old_log = log;
- }
- fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap();
- }
- go(&mut fd,&mut seek,&mut empty_read, &mut all_data);
- println!("Sleep 30S old_log:{},seek:{},empty_read:{}",
- old_log.trim(),seek,empty_read);
- ::std::thread::sleep(::std::time::Duration::new(30,0));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement