Advertisement
Guest User

Untitled

a guest
Dec 27th, 2014
202
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.81 KB | None | 0 0
  1. package com.company.grid.lookup;
  2.  
  3. import parquet.column.ColumnDescriptor;
  4. import parquet.column.ColumnReader;
  5. import parquet.column.impl.ColumnReadStoreImpl;
  6. import parquet.column.page.PageReadStore;
  7. import parquet.hadoop.ParquetFileReader;
  8. import parquet.hadoop.metadata.BlockMetaData;
  9. import parquet.hadoop.metadata.ParquetMetadata;
  10. import parquet.io.api.Binary;
  11. import parquet.io.api.Converter;
  12. import parquet.io.api.GroupConverter;
  13. import parquet.io.api.PrimitiveConverter;
  14. import parquet.schema.MessageType;
  15. import org.apache.hadoop.fs.Path;
  16.  
  17. import java.io.IOException;
  18. import java.lang.reflect.Array;
  19. import java.math.BigInteger;
  20. import java.nio.charset.Charset;
  21. import java.nio.charset.CharsetDecoder;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.Random;
  25.  
  26. import org.apache.hadoop.fs.LocalFileSystem;
  27. import org.apache.hadoop.hdfs.DistributedFileSystem;
  28. import org.apache.hadoop.conf.Configuration;
  29.  
  30. final class DimColumn {
  31. private volatile Object arrayList;
  32. private String name;
  33. private Class<?> typeOfArray;
  34. private int totalSize;
  35. private ColumnDescriptor columnDescriptor;
  36.  
  37. DimColumn(ColumnDescriptor columnDescriptor, int totalSize) {
  38. this.columnDescriptor = columnDescriptor;
  39. this.name = columnDescriptor.getPath()[0];
  40. this.typeOfArray = columnDescriptor.getType().javaType;
  41. this.totalSize = totalSize;
  42. this.arrayList = Array.newInstance(typeOfArray, totalSize);
  43. }
  44.  
  45. public ColumnDescriptor getColumnDescriptor() {
  46. return columnDescriptor;
  47. }
  48.  
  49. public Object getArrayList() {
  50. return arrayList;
  51. }
  52.  
  53. public Object getName() {
  54. return name;
  55. }
  56.  
  57. public Object getTotalSize() {
  58. return totalSize;
  59. }
  60. }
  61.  
  62. public class RfiParquetFileReader {
  63. ParquetMetadata metaData;
  64. MessageType schema;
  65. private static final Charset UTF8 = Charset.forName("UTF-8");
  66. private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();
  67.  
  68. public RfiParquetFileReader(String fileName) throws IOException {
  69. Configuration conf = new Configuration();
  70. conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
  71. conf.set("fs.file.impl", LocalFileSystem.class.getName());
  72. Path filePath = new Path(fileName);
  73. metaData = ParquetFileReader.readFooter(conf, filePath);
  74. schema = metaData.getFileMetaData().getSchema();
  75.  
  76. List<BlockMetaData> blocks;
  77. blocks = metaData.getBlocks();
  78. int totalSize = (int) blocks.get(0).getRowCount();
  79. List<ColumnDescriptor> columnDescriptors = schema.getColumns();
  80. List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize);
  81.  
  82. ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors);
  83. PageReadStore pageReadStore = fileReader.readNextRowGroup();
  84. int index = 0;
  85. while (pageReadStore != null) {
  86. ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
  87. index = load(dimColumns, columnReadStoreImpl, index);
  88. pageReadStore = fileReader.readNextRowGroup();
  89. }
  90.  
  91. Random rand = new Random();
  92. for (DimColumn dimColumn : dimColumns) {
  93. System.out.println(dimColumn.getName());
  94. for(int i = 0; i < 5; i++) {
  95. index = rand.nextInt((int) totalSize);
  96. System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index));
  97. }
  98. System.out.println("--------");
  99. }
  100. }
  101.  
  102. public String getSchema() {
  103. return schema.toString();
  104. }
  105.  
  106. public static void main(String[] args) {
  107. String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/";
  108. // String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"};
  109. String[] files = {"13.assigned_conversion.parquet"};
  110.  
  111. try {
  112. long startTime = System.currentTimeMillis();
  113. for (String file : files) {
  114. new RfiParquetFileReader(dirName + file);
  115. System.out.println("========================================================================");
  116. }
  117. long endTime = System.currentTimeMillis();
  118. System.out.println("Time taken: " + (endTime - startTime) + "ms");
  119. } catch (IOException e) {
  120. e.printStackTrace();
  121. }
  122. }
  123.  
  124. public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) {
  125. ArrayList dimColumns = new ArrayList<DimColumn>();
  126. for (ColumnDescriptor columnDescriptor : columnDescriptors) {
  127. dimColumns.add(new DimColumn(columnDescriptor, totalSize));
  128. }
  129. return dimColumns;
  130. }
  131.  
  132. public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException {
  133. int index = 1;
  134. for (DimColumn dc : dimColumns) {
  135. index = startIndex;
  136. int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
  137. ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
  138.  
  139. // System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
  140. for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
  141. int definitionLevel = columnReader.getCurrentDefinitionLevel();
  142.  
  143. if (definitionLevel == maxDefinitionLevel) {
  144. switch (dc.getColumnDescriptor().getType()) {
  145. case BINARY:
  146. String str = new String(columnReader.getBinary().getBytes(), "UTF-8");
  147. System.out.println(index + " : " + dc.getName() + " : " + str);
  148. Array.set(dc.getArrayList(), index, columnReader.getBinary()); break;
  149. case BOOLEAN:
  150. Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break;
  151. case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break;
  152. case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break;
  153. case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break;
  154. case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break;
  155. case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break;
  156. // case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
  157. }
  158. }
  159. columnReader.consume();
  160. index += 1;
  161. }
  162. }
  163. return startIndex + index;
  164. }
  165.  
  166. // public static String binaryToString(Binary value) {
  167. // byte[] data = value.getBytes();
  168. // if (data == null) return null;
  169. //
  170. // try {
  171. // CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
  172. // return buffer.toString();
  173. // } catch (Throwable th) {
  174. // }
  175. //
  176. // return "";
  177. // }
  178.  
  179. public static BigInteger binaryToBigInteger(Binary value) {
  180. byte[] data = value.getBytes();
  181. if (data == null) return null;
  182.  
  183. return new BigInteger(data);
  184. }
  185.  
  186. private static final class DumpGroupConverter extends GroupConverter {
  187. @Override public void start() { }
  188. @Override public void end() { }
  189. @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
  190. }
  191.  
  192. private static final class DumpConverter extends PrimitiveConverter {
  193. @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
  194. }
  195. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement