Advertisement
Guest User

Untitled

a guest
Nov 24th, 2017
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.28 KB | None | 0 0
  1. use std::io::{Read, Result, Error, ErrorKind, Write};
  2. use std::mem;
  3. use std::net::TcpStream;
  4. use protobuf::{RepeatedField, parse_from_bytes, Message};
  5. use haret::api::messages::*;
  6.  
  7.  
  8. pub struct HaretClient {
  9. pub client_id: String,
  10. pub api_addr: Option<String>,
  11. pub namespace_id: Option<String>,
  12. pub primary: Option<ApiPid>,
  13. sock: Option<TcpStream>,
  14. request_num: u64
  15. }
  16.  
  17. impl HaretClient {
  18. pub fn new(client_id: String) -> HaretClient {
  19. HaretClient {
  20. client_id: client_id,
  21. api_addr: None,
  22. namespace_id: None,
  23. primary: None,
  24. sock: None,
  25. request_num: 0
  26. }
  27. }
  28.  
  29.  
  30. pub fn connect(&mut self, api_addr: Option<String>) -> Result<()> {
  31. if api_addr.is_none() && self.api_addr.is_none() {
  32. return Err(Error::new(ErrorKind::InvalidInput,
  33. "API Address unknown. Please call connect with an api_addr."));
  34. }
  35. if api_addr.is_some() {
  36. self.api_addr = api_addr;
  37. }
  38. self.sock = Some(TcpStream::connect(&self.api_addr.as_ref().unwrap()[..])?);
  39. Ok(())
  40. }
  41.  
  42. pub fn register(&mut self, primary: Option<ApiPid>) -> Result<String> {
  43. let request = self.prepare_register(primary)?;
  44. self.exec(request)
  45. }
  46.  
  47. /// Register the client id on this node for the given namespace.
  48. ///
  49. /// This function returns the registration message to be written or an error if the primary is
  50. /// unknown.
  51. fn prepare_register(&mut self, primary: Option<ApiPid>) -> Result<ApiRequest> {
  52. if primary.is_none() && self.primary.is_none() {
  53. return Err(Error::new(ErrorKind::InvalidInput, "Primary unknown"));
  54. }
  55.  
  56. if primary.is_some() {
  57. self.primary = primary;
  58. self.namespace_id = Some(self.primary.as_ref().unwrap()
  59. .get_group().to_string());
  60. }
  61. let namespace_id = self.namespace_id.clone();
  62. let mut msg = RegisterClient::new();
  63. msg.set_client_id(self.client_id.clone());
  64. msg.set_namespace_id(namespace_id.as_ref().unwrap().clone());
  65. let mut request = ApiRequest::new();
  66. request.set_register_client(msg);
  67. Ok(request)
  68. }
  69.  
  70. pub fn list_namespaces(&mut self) -> Result<String> {
  71. let mut request = ApiRequest::new();
  72. request.set_get_namespaces(true);
  73. self.exec(request)
  74. }
  75.  
  76. pub fn enter_namespace<T>(&mut self, namespace_id: T) -> Result<String>
  77. where T: Into<String>,
  78. {
  79. self.reset_primary();
  80. let mut msg = RegisterClient::new();
  81. msg.set_client_id(self.client_id.clone());
  82. msg.set_namespace_id(namespace_id.into());
  83. let mut request = ApiRequest::new();
  84. request.set_register_client(msg);
  85. self.exec(request)
  86. }
  87.  
  88. pub fn ls(&mut self) -> Result<String> {
  89. let mut list_keys = ListKeys::new();
  90. list_keys.set_path("/".to_string());
  91.  
  92. let mut tree_op = TreeOp::new();
  93. tree_op.set_list_keys(list_keys);
  94.  
  95. let mut consensus_req = ConsensusRequest::new();
  96. consensus_req.set_to(self.primary.as_ref().unwrap().clone());
  97. consensus_req.set_client_id(self.client_id.clone());
  98. consensus_req.set_client_request_num(self.request_num);
  99. consensus_req.set_tree_op(tree_op);
  100.  
  101. let mut request = ApiRequest::new();
  102. request.set_consensus_request(consensus_req);
  103. self.exec(request)
  104. }
  105.  
  106. pub fn create(&mut self, path: &str, str_type: &str) -> Result<String> {
  107. let node_type = match str_type {
  108. "blob" => NodeType::BLOB,
  109. "queue" => NodeType::QUEUE,
  110. "set" => NodeType::SET,
  111. _ => unreachable!()
  112. };
  113. let mut create_node = CreateNode::new();
  114. create_node.set_path(path.to_string());
  115. create_node.set_node_type(node_type);
  116. let mut tree_op = TreeOp::new();
  117. tree_op.set_create_node(create_node);
  118. let request = self.consensus_request(tree_op);
  119. self.exec(request)
  120. }
  121.  
  122. pub fn blob_put(&mut self, blob: &str, path: &str) -> Result<String> {
  123. let mut blob_put = BlobPut::new();
  124. blob_put.set_path(path.to_string());
  125. blob_put.set_val(blob.as_bytes().to_vec());
  126. let mut tree_op = TreeOp::new();
  127. tree_op.set_blob_put(blob_put);
  128. let request = self.consensus_request(tree_op);
  129. self.exec(request)
  130. }
  131.  
  132. pub fn blob_get(&mut self, path: &str) -> Result<String> {
  133. let mut blob_get = BlobGet::new();
  134. blob_get.set_path(path.to_string());
  135. let mut tree_op = TreeOp::new();
  136. tree_op.set_blob_get(blob_get);
  137. let request = self.consensus_request(tree_op);
  138. self.exec(request)
  139. }
  140.  
  141. pub fn blob_size(&mut self, path: &str) -> Result<String> {
  142. let mut blob_size = BlobSize::new();
  143. blob_size.set_path(path.to_string());
  144. let mut tree_op = TreeOp::new();
  145. tree_op.set_blob_size(blob_size);
  146. let request = self.consensus_request(tree_op);
  147. self.exec(request)
  148. }
  149.  
  150. pub fn queue_push(&mut self, blob: &str, path: &str) -> Result<String> {
  151. let mut queue_push = QueuePush::new();
  152. queue_push.set_path(path.to_string());
  153. queue_push.set_val(blob.as_bytes().to_vec());
  154. let mut tree_op = TreeOp::new();
  155. tree_op.set_queue_push(queue_push);
  156. let request = self.consensus_request(tree_op);
  157. self.exec(request)
  158. }
  159.  
  160. pub fn queue_pop(&mut self, path: &str) -> Result<String> {
  161. let mut queue_pop = QueuePop::new();
  162. queue_pop.set_path(path.to_string());
  163. let mut tree_op = TreeOp::new();
  164. tree_op.set_queue_pop(queue_pop);
  165. let request = self.consensus_request(tree_op);
  166. self.exec(request)
  167. }
  168.  
  169. pub fn queue_front(&mut self, path: &str) -> Result<String> {
  170. let mut queue_front = QueueFront::new();
  171. queue_front.set_path(path.to_string());
  172. let mut tree_op = TreeOp::new();
  173. tree_op.set_queue_front(queue_front);
  174. let request = self.consensus_request(tree_op);
  175. self.exec(request)
  176. }
  177.  
  178. pub fn queue_back(&mut self, path: &str) -> Result<String> {
  179. let mut queue_back = QueueBack::new();
  180. queue_back.set_path(path.to_string());
  181. let mut tree_op = TreeOp::new();
  182. tree_op.set_queue_back(queue_back);
  183. let request = self.consensus_request(tree_op);
  184. self.exec(request)
  185. }
  186.  
  187. pub fn queue_len(&mut self, path: &str) -> Result<String> {
  188. let mut queue_len = QueueLen::new();
  189. queue_len.set_path(path.to_string());
  190. let mut tree_op = TreeOp::new();
  191. tree_op.set_queue_len(queue_len);
  192. let request = self.consensus_request(tree_op);
  193. self.exec(request)
  194. }
  195.  
  196. pub fn set_insert(&mut self, blob: &str, path: &str) -> Result<String> {
  197. let mut set_insert = SetInsert::new();
  198. set_insert.set_path(path.to_string());
  199. set_insert.set_val(blob.as_bytes().to_vec());
  200. let mut tree_op = TreeOp::new();
  201. tree_op.set_set_insert(set_insert);
  202. let request = self.consensus_request(tree_op);
  203. self.exec(request)
  204. }
  205.  
  206. pub fn set_remove(&mut self, blob: &str, path: &str) -> Result<String> {
  207. let mut set_remove = SetRemove::new();
  208. set_remove.set_path(path.to_string());
  209. set_remove.set_val(blob.as_bytes().to_vec());
  210. let mut tree_op = TreeOp::new();
  211. tree_op.set_set_remove(set_remove);
  212. let request = self.consensus_request(tree_op);
  213. self.exec(request)
  214. }
  215.  
  216. pub fn set_contains(&mut self, blob: &str, path: &str) -> Result<String> {
  217. let mut set_contains = SetContains::new();
  218. set_contains.set_path(path.to_string());
  219. set_contains.set_val(blob.as_bytes().to_vec());
  220. let mut tree_op = TreeOp::new();
  221. tree_op.set_set_contains(set_contains);
  222. let request = self.consensus_request(tree_op);
  223. self.exec(request)
  224. }
  225.  
  226. pub fn set_union<I, T>(&mut self, paths: I) -> Result<String>
  227. where I: Iterator<Item=T>,
  228. T: Into<String>,
  229. {
  230. let paths = paths.map(|s| s.into()).collect();
  231. let mut set_union = SetUnion::new();
  232. set_union.set_paths(RepeatedField::from_vec(paths));
  233. let mut tree_op = TreeOp::new();
  234. tree_op.set_set_union(set_union);
  235. let request = self.consensus_request(tree_op);
  236. self.exec(request)
  237. }
  238.  
  239. pub fn set_intersection(&mut self, path1: &str, path2: &str) -> Result<String> {
  240. let mut set_intersection = SetIntersection::new();
  241. set_intersection.set_path1(path1.to_string());
  242. set_intersection.set_path2(path2.to_string());
  243. let mut tree_op = TreeOp::new();
  244. tree_op.set_set_intersection(set_intersection);
  245. let request = self.consensus_request(tree_op);
  246. self.exec(request)
  247. }
  248.  
  249. fn consensus_request(&mut self, tree_op: TreeOp) -> ApiRequest {
  250. let mut consensus_req = ConsensusRequest::new();
  251. consensus_req.set_to(self.primary.as_ref().unwrap().clone());
  252. consensus_req.set_client_id(self.client_id.clone());
  253. consensus_req.set_client_request_num(self.request_num);
  254. consensus_req.set_tree_op(tree_op);
  255. let mut api_request = ApiRequest::new();
  256. api_request.set_consensus_request(consensus_req);
  257. api_request
  258. }
  259.  
  260. fn exec(&mut self, req: ApiRequest) -> Result<String> {
  261. self.write_msg(req).map_err(|_| {
  262. Error::new(ErrorKind::NotConnected,
  263. "Failed to write to socket. Please restart client and try again".to_string())
  264. })?;
  265. let mut api_response = self.read_msg().map_err(|_| {
  266. Error::new(ErrorKind::NotConnected,
  267. "Failed to read from socket. Please restart client and try again".to_string())
  268. })?;
  269.  
  270. if api_response.has_consensus_reply() {
  271. let mut consensus_reply = api_response.take_consensus_reply();
  272.  
  273. let mut s = String::new();
  274.  
  275. if consensus_reply.has_ok() {
  276. s.push_str("Ok\n");
  277. }
  278.  
  279. if consensus_reply.has_tree_op_result() {
  280. s.push_str(&tree_op_result_to_string(consensus_reply.take_tree_op_result()));
  281. }
  282.  
  283. if consensus_reply.has_tree_cas_result() {
  284. for result in consensus_reply.take_tree_cas_result().take_results().into_iter() {
  285. s.push_str(&tree_op_result_to_string(result));
  286. }
  287. }
  288.  
  289. if consensus_reply.has_path() {
  290. s.push_str(&consensus_reply.take_path());
  291. }
  292.  
  293. if consensus_reply.has_error() {
  294. s.push_str("Error: ");
  295. s.push_str(&api_error_to_string(consensus_reply.take_error()));
  296. s.push('\n');
  297. }
  298.  
  299. s.push_str(&format!("Epoch = {}, View = {}, Client Request Num = {}",
  300. consensus_reply.get_epoch(),
  301. consensus_reply.get_view(),
  302. consensus_reply.get_request_num()));
  303. return Ok(s);
  304. }
  305.  
  306. if api_response.has_namespaces() {
  307. let namespaces = api_response.take_namespaces().take_ids().to_vec();
  308. return Ok(namespaces.iter().fold(String::new(), |mut acc, namespace_id | {
  309. acc.push_str(namespace_id);
  310. acc.push_str("\n");
  311. acc
  312. }));
  313. }
  314.  
  315. if api_response.has_client_registration() {
  316. self.primary = Some(api_response.take_client_registration().take_primary());
  317. return Ok(format!("Client registered. Primary = {:?}", self.primary.as_ref().unwrap()));
  318. }
  319.  
  320. if api_response.has_redirect() {
  321. let mut redirect = api_response.take_redirect();
  322. let primary = redirect.take_primary();
  323. let api_addr = redirect.take_api_addr();
  324. self.connect(Some(api_addr))?;
  325. let req = self.prepare_register(Some(primary.clone()))?;
  326. /// Todo: Remove this recursion to prevent potential stack overflow
  327. self.exec(req)?;
  328. return Ok(format!("Finished Redirecting. Primary = {:?}, API Address = {}",
  329. self.primary.as_ref().unwrap(),
  330. self.api_addr.as_ref().unwrap()))
  331. }
  332.  
  333. if api_response.has_retry() {
  334. let duration = api_response.take_retry().get_milliseconds();
  335. return Ok(format!("Primary not found. Please retry in {} seconds.", duration / 1000));
  336. }
  337.  
  338. if api_response.has_unknown_namespace() {
  339. return Ok("Unknown namespace".to_string());
  340. }
  341.  
  342. if api_response.has_timeout() {
  343. return Ok("Timeout".to_string());
  344. }
  345.  
  346. Ok(format!("unknown message {:?}", api_response))
  347. }
  348.  
  349. fn reset_primary(&mut self) {
  350. self.primary = None;
  351. self.namespace_id = None;
  352. }
  353.  
  354. fn write_msg(&mut self, req: ApiRequest) -> Result<()> {
  355. let mut msg = ApiMsg::new();
  356. msg.set_request(req);
  357. let encoded = msg.write_to_bytes().map_err(|_| {
  358. Error::new(ErrorKind::InvalidInput, "Failed to encode msgpack data")
  359. })?;
  360. let len: u32 = encoded.len() as u32;
  361. // 4 byte len header
  362. let header: [u8; 4] = unsafe { mem::transmute(len.to_be()) };
  363. self.sock.as_ref().unwrap().write_all(&header)?;
  364. self.sock.as_ref().unwrap().write_all(&encoded)?;
  365. self.request_num += 1;
  366. Ok(())
  367. }
  368.  
  369. fn read_msg(&mut self) -> Result<ApiResponse> {
  370. let mut header = [0; 4];
  371. self.sock.as_mut().unwrap().read_exact(&mut header)?;
  372. let len = unsafe { u32::from_be(mem::transmute(header)) };
  373. let mut buf = vec![0; len as usize];
  374. self.sock.as_mut().unwrap().read_exact(&mut buf)?;
  375. let mut msg: ApiMsg = parse_from_bytes(&buf[..]).map_err(|e| {
  376. Error::new(ErrorKind::InvalidData, e.to_string())
  377. })?;
  378. Ok(msg.take_response())
  379. }
  380. }
  381.  
  382. fn tree_op_result_to_string(mut result: TreeOpResult) -> String {
  383. let mut s = String::new();
  384.  
  385. if result.has_ok() {
  386. s.push_str("Ok\n");
  387. } else if result.has_empty() {
  388. s.push_str("\n");
  389. } else if result.has_bool() {
  390. s.push_str(&format!("{}\n", result.get_bool()));
  391. } else if result.has_blob() {
  392. s.push_str(&format_blob(result.take_blob()));
  393. } else if result.has_int() {
  394. s.push_str(&format!("{:?}\n", result.get_int()));
  395. } else if result.has_set() {
  396. s.push_str(&format_set(result.take_set()));
  397. } else if result.has_keys() {
  398. for mut key in result.take_keys().take_keys().into_vec() {
  399. s.push_str(&format!("{}\n", key.take_name()));
  400. }
  401. }
  402.  
  403. if result.has_optional_version() {
  404. s.push_str(&format!("Version = {} ", result.get_optional_version()));
  405. }
  406. s
  407. }
  408.  
  409. fn format_blob(blob: Vec<u8>) -> String {
  410. match String::from_utf8(blob) {
  411. Ok(s) => format!("{}\n", s),
  412. Err(e) => format!("{:?}\n", e.into_bytes())
  413. }
  414. }
  415.  
  416. fn format_set(mut set: Set) -> String {
  417. set.take_val().into_vec().into_iter().fold(String::new(), |mut acc, blob| {
  418. acc.push_str(&format_blob(blob));
  419. acc
  420. })
  421. }
  422.  
  423. fn api_error_to_string(mut error: ApiError) -> String {
  424. if error.has_not_found() {
  425. format!("Path Not found: {}", error.take_not_found().take_path())
  426. } else if error.has_already_exists() {
  427. format!("Path Already exists: {}", error.take_already_exists().take_path())
  428. } else if error.has_does_not_exist() {
  429. format!("Path Does not exist: {}", error.take_does_not_exist().take_path())
  430. } else if error.has_wrong_type() {
  431. "Wrong type".to_string()
  432. } else if error.has_path_must_end_in_directory() {
  433. format!("Path must end in directory: {}",
  434. error.take_path_must_end_in_directory().take_path())
  435. } else if error.has_path_must_be_absolute() {
  436. "Paths must be absolute".to_string()
  437. } else if error.has_cas_failed() {
  438. "Cas Failed".to_string()
  439. } else if error.has_bad_format() {
  440. format!("Path is malformatted {}", error.take_bad_format().take_msg())
  441. } else if error.has_io() {
  442. format!("IO error: {}", error.take_io().take_msg())
  443. } else if error.has_encoding() {
  444. format!("Encoding error: {}", error.take_encoding().take_msg())
  445. } else if error.has_invalid_cas() {
  446. format!("Invalid CAS: {}", error.take_invalid_cas().take_msg())
  447. } else if error.has_msg() {
  448. error.take_msg()
  449. } else if error.has_cannot_delete_root() {
  450. "Cannot delete root".to_string()
  451. } else if error.has_invalid_msg() {
  452. "Invalid Message".to_string()
  453. } else if error.has_timeout() {
  454. "Timeout".to_string()
  455. } else if error.has_not_enough_replicas() {
  456. "Not enough replicas".to_string()
  457. } else if error.has_bad_epoch() {
  458. "Bad epoch".to_string()
  459. } else {
  460. "Unknown Error".to_string()
  461. }
  462. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement