Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate clickhouse_rs;
- extern crate futures;
- use futures::Future;
- use futures::future::Loop;
- use futures::future::loop_fn;
- use clickhouse_rs::Pool;
- fn main() {
- let hosts_script = "select host_name from system.clusters where replica_num = 1 and cluster = 'product_analytics'";
- let script = "select max(elapsed) as max_elapsed, query from remote('$hosts', system, processes) where elapsed > 60 and lower(query) like 'select%' group by query";
- let url = "tcp://localhost:9000/default";
- let pool = Pool::new(url);
- let done = pool
- .get_handle()
- .and_then(move |c| c.query(hosts_script).fetch_all())
- .and_then(move |(c, block)| {
- let mut hosts: Vec<String> = Vec::new();
- for row in block.rows() {
- let host: String = row.get("host_name")?;
- hosts.push(host);
- }
- let hosts_string = hosts.join(",");
- println!("inside: {}", hosts_string);
- Ok((hosts_string, c))
- })
- .and_then(move |(hosts, c)| {
- let execute_script = script.replace("$hosts", hosts.as_str());
- let fut = c.query(execute_script).fetch_all().and_then(|(c, block)| {
- for row in block.rows() {
- let query: String = row.get("query")?;
- println!("query: {}", query)
- }
- Ok(())
- });
- return loop_fn((), |_| Ok(Loop::Continue(fut)))
- })
- .map_err(|err| eprintln!("database error: {}", err));
- tokio::run(done);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement