Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.company.grid.lookup;
- import parquet.column.ColumnDescriptor;
- import parquet.column.ColumnReader;
- import parquet.column.impl.ColumnReadStoreImpl;
- import parquet.column.page.PageReadStore;
- import parquet.hadoop.ParquetFileReader;
- import parquet.hadoop.metadata.BlockMetaData;
- import parquet.hadoop.metadata.ParquetMetadata;
- import parquet.io.api.Binary;
- import parquet.io.api.Converter;
- import parquet.io.api.GroupConverter;
- import parquet.io.api.PrimitiveConverter;
- import parquet.schema.MessageType;
- import org.apache.hadoop.fs.Path;
- import java.io.IOException;
- import java.lang.reflect.Array;
- import java.math.BigInteger;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetDecoder;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Random;
- import org.apache.hadoop.fs.LocalFileSystem;
- import org.apache.hadoop.hdfs.DistributedFileSystem;
- import org.apache.hadoop.conf.Configuration;
- final class DimColumn {
- private volatile Object arrayList;
- private String name;
- private Class<?> typeOfArray;
- private int totalSize;
- private ColumnDescriptor columnDescriptor;
- DimColumn(ColumnDescriptor columnDescriptor, int totalSize) {
- this.columnDescriptor = columnDescriptor;
- this.name = columnDescriptor.getPath()[0];
- this.typeOfArray = columnDescriptor.getType().javaType;
- this.totalSize = totalSize;
- this.arrayList = Array.newInstance(typeOfArray, totalSize);
- }
- public ColumnDescriptor getColumnDescriptor() {
- return columnDescriptor;
- }
- public Object getArrayList() {
- return arrayList;
- }
- public Object getName() {
- return name;
- }
- public Object getTotalSize() {
- return totalSize;
- }
- }
- public class RfiParquetFileReader {
- ParquetMetadata metaData;
- MessageType schema;
- private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();
- public RfiParquetFileReader(String fileName) throws IOException {
- Configuration conf = new Configuration();
- conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
- conf.set("fs.file.impl", LocalFileSystem.class.getName());
- Path filePath = new Path(fileName);
- metaData = ParquetFileReader.readFooter(conf, filePath);
- schema = metaData.getFileMetaData().getSchema();
- List<BlockMetaData> blocks;
- blocks = metaData.getBlocks();
- int totalSize = (int) blocks.get(0).getRowCount();
- List<ColumnDescriptor> columnDescriptors = schema.getColumns();
- List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize);
- ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors);
- PageReadStore pageReadStore = fileReader.readNextRowGroup();
- int index = 0;
- while (pageReadStore != null) {
- ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
- index = load(dimColumns, columnReadStoreImpl, index);
- pageReadStore = fileReader.readNextRowGroup();
- }
- Random rand = new Random();
- for (DimColumn dimColumn : dimColumns) {
- System.out.println(dimColumn.getName());
- for(int i = 0; i < 5; i++) {
- index = rand.nextInt((int) totalSize);
- System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index));
- }
- System.out.println("--------");
- }
- }
- public String getSchema() {
- return schema.toString();
- }
- public static void main(String[] args) {
- String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/";
- // String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"};
- String[] files = {"13.assigned_conversion.parquet"};
- try {
- long startTime = System.currentTimeMillis();
- for (String file : files) {
- new RfiParquetFileReader(dirName + file);
- System.out.println("========================================================================");
- }
- long endTime = System.currentTimeMillis();
- System.out.println("Time taken: " + (endTime - startTime) + "ms");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) {
- ArrayList dimColumns = new ArrayList<DimColumn>();
- for (ColumnDescriptor columnDescriptor : columnDescriptors) {
- dimColumns.add(new DimColumn(columnDescriptor, totalSize));
- }
- return dimColumns;
- }
- public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException {
- int index = 1;
- for (DimColumn dc : dimColumns) {
- index = startIndex;
- int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
- ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
- // System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
- for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
- int definitionLevel = columnReader.getCurrentDefinitionLevel();
- if (definitionLevel == maxDefinitionLevel) {
- switch (dc.getColumnDescriptor().getType()) {
- case BINARY:
- String str = new String(columnReader.getBinary().getBytes(), "UTF-8");
- System.out.println(index + " : " + dc.getName() + " : " + str);
- Array.set(dc.getArrayList(), index, columnReader.getBinary()); break;
- case BOOLEAN:
- Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break;
- case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break;
- case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break;
- case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break;
- case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break;
- case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break;
- // case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
- }
- }
- columnReader.consume();
- index += 1;
- }
- }
- return startIndex + index;
- }
- // public static String binaryToString(Binary value) {
- // byte[] data = value.getBytes();
- // if (data == null) return null;
- //
- // try {
- // CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
- // return buffer.toString();
- // } catch (Throwable th) {
- // }
- //
- // return "";
- // }
- public static BigInteger binaryToBigInteger(Binary value) {
- byte[] data = value.getBytes();
- if (data == null) return null;
- return new BigInteger(data);
- }
- private static final class DumpGroupConverter extends GroupConverter {
- @Override public void start() { }
- @Override public void end() { }
- @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
- }
- private static final class DumpConverter extends PrimitiveConverter {
- @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement