Advertisement
Guest User

Untitled

a guest
Sep 16th, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.54 KB | None | 0 0
  1. extern crate clickhouse_rs;
  2. extern crate futures;
  3.  
  4. use futures::Future;
  5. use futures::future::Loop;
  6. use futures::future::loop_fn;
  7. use clickhouse_rs::Pool;
  8.  
  9. fn main() {
  10. let hosts_script = "select host_name from system.clusters where replica_num = 1 and cluster = 'product_analytics'";
  11. let script = "select max(elapsed) as max_elapsed, query from remote('$hosts', system, processes) where elapsed > 60 and lower(query) like 'select%' group by query";
  12.  
  13. let url = "tcp://localhost:9000/default";
  14.  
  15. let pool = Pool::new(url);
  16.  
  17.  
  18. let done = pool
  19. .get_handle()
  20. .and_then(move |c| c.query(hosts_script).fetch_all())
  21. .and_then(move |(c, block)| {
  22. let mut hosts: Vec<String> = Vec::new();
  23. for row in block.rows() {
  24. let host: String = row.get("host_name")?;
  25. hosts.push(host);
  26. }
  27. let hosts_string = hosts.join(",");
  28.  
  29. println!("inside: {}", hosts_string);
  30. Ok((hosts_string, c))
  31. })
  32. .and_then(move |(hosts, c)| {
  33. let execute_script = script.replace("$hosts", hosts.as_str());
  34. let fut = c.query(execute_script).fetch_all().and_then(|(c, block)| {
  35. for row in block.rows() {
  36. let query: String = row.get("query")?;
  37. println!("query: {}", query)
  38. }
  39. Ok(())
  40. });
  41. return loop_fn((), |_| Ok(Loop::Continue(fut)))
  42. })
  43. .map_err(|err| eprintln!("database error: {}", err));
  44.  
  45. tokio::run(done);
  46. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement