Guest User

Untitled

a guest
Jul 17th, 2018
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.54 KB | None | 0 0
  1. #![feature(nll)]
  2.  
  3. #[macro_use] extern crate serde;
  4. #[macro_use] extern crate serde_derive;
  5.  
  6. extern crate serde_json;
  7.  
  8. //extern crate integer_encoding;
  9. extern crate byteorder;
  10.  
  11. use serde::{Deserialize, Deserializer};
  12. use serde::de::{ Visitor, EnumAccess };
  13.  
  14. // Copied out of the integer_encoding library
  15. const DROP_MSB: u8 = 0b01111111;
  16. pub const MSB: u8 = 0b10000000;
  17. fn decode_var(src: &[u8]) -> (u64, usize) {
  18. let mut result: u64 = 0;
  19. let mut shift = 0;
  20.  
  21. for b in src.iter() {
  22. let msb_dropped = b & DROP_MSB;
  23. result |= (msb_dropped as u64) << shift;
  24. shift += 7;
  25.  
  26. if b & MSB == 0 || shift > (10 * 7) {
  27. break;
  28. }
  29. }
  30.  
  31. (result, shift / 7 as usize)
  32. }
  33.  
  34. #[derive(Serialize, Deserialize, Debug)]
  35. struct Schema {
  36. #[serde(rename = "type")]
  37. schema_type: String,
  38. name: String,
  39. namespace: String,
  40. fields: Vec<SchemaField>,
  41. }
  42.  
  43. impl Schema {
  44. fn from_str(schema: &str) -> serde_json::Result<Self> {
  45. serde_json::from_str(schema)
  46. }
  47. }
  48.  
  49. #[derive(Serialize, Deserialize, Debug)]
  50. struct SchemaField {
  51. name: String,
  52. #[serde(rename = "type", deserialize_with = "one_or_many")]
  53. types: Vec<SchemaFieldType>,
  54. }
  55.  
  56. #[derive(Serialize, Deserialize, Debug)]
  57. #[serde(untagged)]
  58. enum SchemaFieldType {
  59. Primitive(Primitive),
  60. Complex(Complex),
  61. }
  62.  
  63. #[derive(Serialize, Deserialize, Debug)]
  64. #[serde(rename_all = "lowercase")]
  65. enum Primitive {
  66. Null,
  67. Int,
  68. Long,
  69. Float,
  70. Double,
  71. Boolean,
  72. Bytes,
  73. String,
  74. #[serde(rename = "uint64_t")]
  75. Uint64T,
  76. #[serde(rename = "int64_t")]
  77. Int64T,
  78. }
  79.  
  80. #[derive(Serialize, Deserialize, Debug)]
  81. #[serde(tag = "type", rename_all = "lowercase")]
  82. enum Complex {
  83. Fixed {
  84. name: String,
  85. size: usize,
  86. },
  87. Map {
  88. values: String,
  89. },
  90. }
  91.  
  92. fn one_or_many<'de, D>(deserializer: D) -> Result<Vec<SchemaFieldType>, D::Error>
  93. where
  94. D: Deserializer<'de>,
  95. {
  96. #[derive(Deserialize)]
  97. #[serde(untagged)]
  98. enum OneOrMany {
  99. One(SchemaFieldType),
  100. Many(Vec<SchemaFieldType>),
  101. }
  102.  
  103. match OneOrMany::deserialize(deserializer)? {
  104. OneOrMany::One(field) => Ok(vec![field]),
  105. OneOrMany::Many(fields) => Ok(fields),
  106. }
  107. }
  108.  
  109. const SCHEMA_STR: &'static str = r###"{
  110. "type": "record",
  111. "name": "ut",
  112. "namespace": "vnoportal",
  113. "fields": [
  114. {
  115. "name": "timestamp",
  116. "type": [
  117. "long",
  118. "int",
  119. "float",
  120. "double",
  121. {
  122. "type": "fixed",
  123. "name": "uint64_t",
  124. "size": 8
  125. },
  126. {
  127. "type": "fixed",
  128. "name": "int64_t",
  129. "size": 8
  130. }
  131. ]
  132. },
  133. {
  134. "name": "metric",
  135. "type": "string"
  136. },
  137. {
  138. "name": "value",
  139. "type": [
  140. "long",
  141. "int",
  142. "float",
  143. "double",
  144. {
  145. "type": "fixed",
  146. "name": "uint8_t",
  147. "size": 1
  148. },
  149. {
  150. "type": "fixed",
  151. "name": "uint16_t",
  152. "size": 2
  153. },
  154. {
  155. "type": "fixed",
  156. "name": "uint32_t",
  157. "size": 4
  158. },
  159. "uint64_t",
  160. {
  161. "type": "fixed",
  162. "name": "int8_t",
  163. "size": 1
  164. },
  165. {
  166. "type": "fixed",
  167. "name": "int16_t",
  168. "size": 2
  169. },
  170. {
  171. "type": "fixed",
  172. "name": "int32_t",
  173. "size": 4
  174. },
  175. "int64_t"
  176. ]
  177. },
  178. {
  179. "name": "tags",
  180. "type": [
  181. "null",
  182. {
  183. "type": "map",
  184. "values": "string"
  185. }
  186. ]
  187. },
  188. {
  189. "name": "metadata",
  190. "type": [
  191. "null",
  192. {
  193. "type": "map",
  194. "values": "string"
  195. }
  196. ]
  197. }
  198. ]
  199. }"###;
  200.  
  201. struct AvroVisitor {
  202. fields: Vec<AvroField>
  203. }
  204.  
  205. #[derive(Debug,Clone)]
  206. struct AvroField {
  207. name: String,
  208. types: AvroTypeOneOrMany
  209. }
  210.  
  211. #[derive(Debug,Clone)]
  212. enum AvroTypeOneOrMany {
  213. One(AvroType),
  214. Many(Vec<AvroType>)
  215. }
  216.  
  217. #[derive(Debug,Clone)]
  218. enum AvroType {
  219. Primitive(AvroPrimitiveFields),
  220. StringMap,
  221. Fixed{name: String, size: usize}
  222. }
  223.  
  224. #[derive(Debug,Clone)]
  225. enum AvroPrimitiveFields {
  226. Null,
  227. Int,
  228. Long,
  229. Float,
  230. Double,
  231. Boolean,
  232. Bytes,
  233. String
  234. }
  235.  
  236. fn schema() -> AvroVisitor {
  237. AvroVisitor {
  238. fields: vec![
  239. AvroField {
  240. name: "timestamp".into(),
  241. types: AvroTypeOneOrMany::Many(
  242. vec![
  243. AvroType::Primitive(AvroPrimitiveFields::Long),
  244. AvroType::Primitive(AvroPrimitiveFields::Int),
  245. AvroType::Primitive(AvroPrimitiveFields::Float),
  246. AvroType::Primitive(AvroPrimitiveFields::Double),
  247. AvroType::Fixed{name: "uint64_t".into(), size: 8},
  248. AvroType::Fixed{name: "int64_t".into(), size: 8}
  249. ]
  250. )
  251. }
  252. ]
  253. }
  254. }
  255.  
  256. struct AvroDeserializer<'de> {
  257. buf: &'de [u8],
  258. visitor: AvroVisitor,
  259. current_field_index: usize
  260. }
  261.  
  262. #[derive(Debug)]
  263. struct AvroError {
  264. reason: String
  265. }
  266.  
  267. impl std::fmt::Display for AvroError {
  268. fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(),std::fmt::Error> {
  269. write!(fmt, "I got an error. Whoops!")
  270. }
  271. }
  272.  
  273. impl serde::de::Error for AvroError {
  274. fn custom<T: std::fmt::Display>(input: T) -> Self {
  275. AvroError{
  276. reason: format!("serde sez {}", input)
  277. }
  278. }
  279. }
  280.  
  281. impl std::error::Error for AvroError {
  282.  
  283. }
  284.  
  285. struct AvroMapVisitor<'a, 'de: 'a> {
  286. de: &'a mut AvroDeserializer<'de>,
  287. index: usize
  288. }
  289.  
  290. impl<'de, 'a> serde::de::MapAccess<'de> for AvroMapVisitor<'a, 'de> {
  291. type Error = AvroError;
  292.  
  293. fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
  294. where
  295. K: serde::de::DeserializeSeed<'de> {
  296. println!("next_key_seed");
  297. seed.deserialize(&mut *self.de).map(Some)
  298.  
  299. }
  300.  
  301. fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value, Self::Error>
  302. where
  303. V: serde::de::DeserializeSeed<'de> {
  304. println!("next_value_seed");
  305. seed.deserialize(&mut *self.de)
  306. }
  307. }
  308.  
  309. struct AvroEnumVisitor<'a, 'de: 'a> {
  310. de: &'a mut AvroDeserializer<'de>,
  311. enum_name: &'static str,
  312. enum_variants: &'a [&'static str],
  313. is_inside_enum: bool,
  314. }
  315.  
  316. impl<'a, 'de> AvroEnumVisitor<'a, 'de> {
  317. fn new(de: &'a mut AvroDeserializer<'de>, enum_name: &'static str, enum_variants: &'a[&'static str]) -> Self {
  318. Self { de, enum_name, enum_variants, is_inside_enum: false }
  319. }
  320. }
  321.  
  322. // `EnumAccess` is provided to the `Visitor` to give it the ability to determine
  323. // which variant of the enum is supposed to be deserialized.
  324. //
  325. // Note that all enum deserialization methods in Serde refer exclusively to the
  326. // "externally tagged" enum representation.
  327. impl<'de, 'a> EnumAccess<'de> for AvroEnumVisitor<'a, 'de> {
  328. type Error = AvroError;
  329. type Variant = Self;
  330.  
  331. fn variant_seed<V>(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error>
  332. where
  333. V: serde::de::DeserializeSeed<'de>,
  334. {
  335. println!("EnumAccess::variant_seed");
  336.  
  337. // This is the index in to the timestamp enum
  338. let variant = self.de.visit_varint();
  339.  
  340. let val = match seed.deserialize(&mut *self.de) {
  341. Ok(t) => t,
  342. Err(e) => {
  343. println!("error! {:#?}", e);
  344. panic!("not sure how to direct deserialize");
  345. }
  346. };
  347.  
  348. Ok((val,self))
  349. }
  350. }
  351.  
  352. // `VariantAccess` is provided to the `Visitor` to give it the ability to see
  353. // the content of the single variant that it decided to deserialize.
  354. impl<'de, 'a> serde::de::VariantAccess<'de> for AvroEnumVisitor<'a, 'de> {
  355. type Error = AvroError;
  356.  
  357. // If the `Visitor` expected this variant to be a unit variant, the input
  358. // should have been the plain string case handled in `deserialize_enum`.
  359. fn unit_variant(self) -> Result<(),Self::Error> {
  360. // Err(Error::ExpectedString)
  361. panic!("unit_variant never called")
  362. }
  363.  
  364. fn newtype_variant_seed<T>(self, _seed: T) -> Result<T::Value, AvroError>
  365. where
  366. T: serde::de::DeserializeSeed<'de>,
  367. {
  368. panic!("newtype_variant_seed never called")
  369. }
  370.  
  371. fn tuple_variant<V>(self, _len: usize, _visitor: V) -> Result<V::Value, AvroError>
  372. where
  373. V: Visitor<'de>,
  374. {
  375. panic!("tuple_variant never called")
  376. }
  377.  
  378. fn struct_variant<V>(
  379. self,
  380. _fields: &'static [&'static str],
  381. _visitor: V,
  382. ) -> Result<V::Value, AvroError>
  383. where
  384. V: Visitor<'de>,
  385. {
  386. panic!("struct_variant never called")
  387. // de::Deserializer::deserialize_map(self.de, visitor)
  388. }
  389. }
  390.  
  391.  
  392. impl<'de, 'a> Deserializer<'de> for &'a mut AvroDeserializer<'de> {
  393. type Error = AvroError;
  394.  
  395. fn deserialize_any<V>(mut self, visitor: V) -> Result<V::Value,Self::Error>
  396. where V: Visitor<'de> {
  397. println!("deserialize_any");
  398.  
  399. self.skip(3);
  400.  
  401. let fields = self.visitor.fields.clone();
  402. let mut fields_iter = fields.iter();
  403. if let Some(f) = fields_iter.next() {
  404. let vis = match &f.types {
  405. AvroTypeOneOrMany::Many(ref list) => {
  406. let varint = AvroDeserializer::visit_varint(&mut self);
  407. println!("varint: {}", varint);
  408. let variant = &list[varint as usize];
  409. match variant {
  410. AvroType::Primitive(AvroPrimitiveFields::Int)=> {
  411. let value = AvroDeserializer::visit_varint(&mut self);
  412. visitor.visit_i64::<AvroError>(value)
  413. },
  414. _ => {
  415. panic!("huh")
  416. }
  417. }
  418. },
  419. _ => {
  420. panic!("other!!")
  421. }
  422. };
  423.  
  424. self.current_field_index += 1;
  425. vis
  426. } else {
  427. panic!("else")
  428. }
  429. }
  430.  
  431. fn deserialize_struct<V>(mut self, _id: &'static str, _fields: &'static[&'static str], visitor: V) -> Result<V::Value,Self::Error>
  432. where V: Visitor<'de> {
  433. println!("deserialize_struct");
  434.  
  435. self.deserialize_map(visitor)
  436. }
  437.  
  438. fn deserialize_map<V>(mut self, visitor: V) -> Result<V::Value,Self::Error>
  439. where V: Visitor<'de> {
  440. println!("deserialize_map");
  441.  
  442. visitor.visit_map(AvroMapVisitor {de: &mut self, index: 0})
  443. }
  444.  
  445. fn deserialize_identifier<V>(mut self, visitor: V) -> Result<V::Value, Self::Error>
  446. where V: Visitor<'de> {
  447. let current_field = self.current_field();
  448. println!("deserialize_identifier {}", current_field.name);
  449.  
  450. visitor.visit_string(current_field.name.clone())
  451. }
  452.  
  453. fn deserialize_enum<V>(mut self, enum_name: &'static str, enum_variants: &[&'static str], visitor: V) -> Result<V::Value, Self::Error>
  454. where V: Visitor<'de> {
  455. let field = self.current_field();
  456. println!("deserialize_enum");
  457.  
  458. match field.types {
  459. AvroTypeOneOrMany::One(_) => {
  460. unimplemented!()
  461. },
  462. AvroTypeOneOrMany::Many(ref _types) => {
  463. println!("visiting enum");
  464. let value = visitor.visit_enum(AvroEnumVisitor::new(self, enum_name, enum_variants) )?;
  465. Ok(value)
  466. }
  467. }
  468. }
  469.  
  470. forward_to_deserialize_any!{
  471. <V: Visitor<'de>>
  472. bool i8 i16 i32 i64 u8 u16 u32 u64 f32 f64 char str string bytes byte_buf option unit unit_struct newtype_struct seq tuple tuple_struct ignored_any
  473. }
  474. }
  475.  
  476. impl<'de> AvroDeserializer<'de> {
  477. fn dump(&self) {
  478. println!("{:#?}", self.buf);
  479. }
  480.  
  481. fn skip(&mut self, bytes: usize) {
  482. self.buf = &self.buf[bytes..];
  483. }
  484.  
  485. fn visit_u64(&mut self) -> u64 {
  486. use byteorder::{ ByteOrder, LittleEndian };
  487. let val = LittleEndian::read_u64(&self.buf[..8]);
  488. self.buf = &self.buf[7..];
  489. val
  490. }
  491.  
  492. fn visit_varint(&mut self) -> i64 {
  493. let (int,varsize) : (u64, usize) = decode_var(self.buf);
  494. self.buf = &self.buf[varsize..];
  495. int as i64
  496. }
  497.  
  498. fn visit_long(&mut self) -> i64 {
  499. self.visit_varint()
  500. }
  501.  
  502. fn visit_str(&mut self) -> &'de [u8] {
  503. let (strlen,strstart) : (u64, usize) = decode_var(&self.buf[..]);
  504.  
  505. let rstr = &self.buf[strstart..strstart+strlen as usize];
  506. self.buf = &self.buf[strstart+strlen as usize..];
  507.  
  508. rstr
  509. }
  510.  
  511. fn visit_strmap(&mut self) -> Vec<(&[u8],&[u8])> {
  512. let num_blocks = self.visit_varint();
  513. println!("num_blocks: {}", num_blocks);
  514.  
  515. let mut vec : Vec<(&[u8], &[u8])> = Vec::with_capacity(num_blocks as usize);
  516.  
  517. for _i in 0..num_blocks {
  518. let key = self.visit_str();
  519. let val = self.visit_str();
  520.  
  521. vec.push((key,val));
  522. }
  523.  
  524. vec
  525. }
  526.  
  527. }
  528.  
  529. impl<'de> AvroDeserializer<'de> {
  530. fn from_slice(visitor: AvroVisitor, buf: &'de [u8]) -> Self {
  531. let current_field_index = 0;
  532. AvroDeserializer {
  533. buf,
  534. visitor,
  535. current_field_index,
  536. }
  537. }
  538.  
  539. fn current_field(&self) -> &AvroField {
  540. &self.visitor.fields[self.current_field_index]
  541. }
  542.  
  543. fn done_with_filed(&mut self) {
  544. self.current_field_index += 1;
  545. }
  546. }
  547.  
  548. #[derive(Deserialize,Debug)]
  549. struct UT {
  550. timestamp: Timestamp,
  551. metric: String,
  552. // value: Value,
  553. }
  554.  
  555. #[derive(Deserialize,Debug)]
  556. enum Timestamp {
  557. Long(u64),
  558. Int(i64),
  559. Float(f32),
  560. Double(f64)
  561. }
  562.  
  563. enum Value {
  564. Long8(u8),
  565. Long16(u16),
  566. Long32(u32),
  567. Long64(u64),
  568. I8(i8),
  569. I16(i16),
  570. I32(i32),
  571. Int64(i64),
  572. Float(f32),
  573. Double(f64)
  574. }
  575.  
  576. fn visit_u64(buf: &[u8]) -> (u64, &[u8]) {
  577. use byteorder::{ ByteOrder, LittleEndian };
  578. let val = LittleEndian::read_u64(&buf[..8]);
  579. (val,&buf[7..]) // TODO: wtf is going on here?
  580. }
  581.  
  582. fn visit_varint(buf: &[u8]) -> (i64, &[u8]) {
  583. let (int,varsize) : (u64, usize) = decode_var(buf);
  584. (int as i64, &buf[varsize..])
  585. }
  586.  
  587. fn visit_long(buf: &[u8]) -> (i64, &[u8]) {
  588. visit_varint(buf)
  589. }
  590.  
  591. fn visit_str(buf: &[u8]) -> (&[u8], &[u8]) {
  592. let (strlen,strstart) : (u64, usize) = decode_var(&buf[..]);
  593.  
  594. let rstr = &buf[strstart..strstart+strlen as usize];
  595. let rest = &buf[strstart+strlen as usize..];
  596.  
  597. return (rstr, rest)
  598. }
  599.  
  600. fn visit_strmap(buf: &[u8]) -> (Vec<(&[u8],&[u8])>, &[u8]) {
  601. let (num_blocks,mut buf) = visit_varint(buf);
  602. println!("num_blocks: {}", num_blocks);
  603.  
  604. let mut vec : Vec<(&[u8], &[u8])> = Vec::with_capacity(num_blocks as usize);
  605.  
  606. for _i in 0..num_blocks {
  607. let (key, val) : (&[u8], &[u8]);
  608.  
  609. let visit = visit_str(buf);
  610. key = visit.0; buf = visit.1;
  611.  
  612. let visit = visit_str(buf);
  613. val = visit.0; buf = visit.1;
  614.  
  615. vec.push((key,val));
  616. }
  617.  
  618. (vec,buf)
  619. }
  620.  
  621. #[test]
  622. fn avro_deserializer() {
  623. let record : [u8; 257] = [0, 0, 0, 2, 106, 0, 186, 149, 235, 179, 11, 86, 118, 105, 97, 115, 97, 116, 45, 97, 98, 45, 118, 110, 111, 45, 112, 109, 46, 117, 116, 46, 112, 100, 102, 46, 102, 108, 45, 115, 100, 117, 45, 109, 97, 114, 107, 101, 100, 45, 99, 111, 117, 110, 116, 0, 0, 2, 22, 10, 97, 110, 45, 105, 100, 2, 49, 10, 112, 100, 102, 105, 100, 8, 49, 48, 53, 50, 16, 115, 109, 97, 99, 100, 45, 105, 100, 6, 49, 52, 55, 24, 115, 97, 116, 101, 108, 108, 105, 116, 101, 45, 105, 100, 2, 52, 34, 115, 109, 97, 99, 45, 115, 101, 114, 118, 105, 99, 101, 45, 110, 97, 109, 101, 26, 115, 109, 97, 99, 45, 99, 104, 105, 48, 55, 45, 115, 50, 16, 109, 97, 99, 45, 97, 100, 100, 114, 24, 48, 48, 97, 48, 98, 99, 56, 99, 55, 57, 55, 102, 10, 115, 116, 97, 116, 101, 14, 111, 110, 95, 108, 105, 110, 101, 14, 98, 101, 97, 109, 45, 105, 100, 10, 49, 49, 48, 52, 53, 22, 99, 97, 114, 114, 105, 101, 114, 100, 45, 105, 100, 2, 55, 12, 118, 110, 111, 45, 105, 100, 6, 120, 99, 105, 44, 115, 101, 114, 118, 105, 110, 103, 45, 115, 109, 97, 99, 45, 104, 111, 115, 116, 45, 110, 97, 109, 101, 36, 115, 109, 97, 99, 45, 99, 104, 105, 48, 55, 45, 110, 50, 45, 98, 101, 116, 97, 0, 0];
  624.  
  625. let visitor = schema();
  626. let mut deserializer = AvroDeserializer::from_slice( visitor,&record[..]);
  627. deserializer.skip(3);
  628.  
  629. let t = UT::deserialize(&mut deserializer);
  630. panic!("t: {:#?}", t);
  631. }
Add Comment
Please, Sign In to add comment