Advertisement
Guest User

FastTableScanning.java

a guest
Dec 20th, 2010
1,450
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 7.20 KB | None | 0 0
  1. /*
  2.  * Usage:
  3.  *   java -Xmx2G FastTableScanning
  4.  *  
  5.  *   This code was written by D. Lemire, see http://lemire.me/.
  6.  *   It was the basis of a blog post : "For your in-memory databases, do you really need an index?"
  7.  *   It is in the public domain. (No copyright.)
  8.  *  
  9.  *  
  10.  */
  11.  
  12. import java.io.BufferedOutputStream;
  13. import java.io.DataOutputStream;
  14. import java.io.File;
  15. import java.io.FileOutputStream;
  16. import java.io.IOException;
  17. import java.io.RandomAccessFile;
  18. import java.nio.ByteBuffer;
  19. import java.nio.IntBuffer;
  20. import java.nio.channels.FileChannel;
  21. import java.util.ArrayList;
  22. import java.util.Arrays;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Random;
  26. import java.util.concurrent.BrokenBarrierException;
  27. import java.util.concurrent.CyclicBarrier;
  28. import java.util.concurrent.atomic.AtomicInteger;
  29.  
  30. public class FastTableScanning implements Iterable<int[]> {
  31.  
  32.     File backfile;
  33.     public int c, N;
  34.     public List<int[]> memoryversion;
  35.  
  36.     private FastTableScanning(int myN, int myc, int cardinality)
  37.             throws IOException {
  38.         backfile = File.createTempFile("PersistentNormalizedTable", "bin");
  39.         backfile.deleteOnExit();// this defeats the purpose of "persistence" but
  40.                                 // will do for our limited purposes
  41.         N = myN;
  42.         c = myc;
  43.         Random rand = new Random();
  44.         System.out.println("Creating on disk a table with " + c
  45.                 + " columns and " + N + " rows... (this can take some time)");
  46.         DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(
  47.                 new FileOutputStream(backfile)));
  48.         for (int row = 0; row < N; ++row) {
  49.             for (int column = 0; column < c; ++column)
  50.                 dos.writeInt(rand.nextInt(cardinality));
  51.         }
  52.         dos.close();
  53.         System.out
  54.                 .println("Done! Wrote " + N * c * 4 / (1024 * 1024.0) + " MB");
  55.     }
  56.  
  57.     public static void main(String[] args) throws IOException,
  58.             InterruptedException, BrokenBarrierException {
  59.         FastTableScanning fs = new FastTableScanning(10000000, 10, 100);
  60.         System.out
  61.                 .println("Now I'm going to repeatedly scan the data for slices using a memory-mapped file: ");
  62.         for (int value = 0; value < 10; ++value)
  63.             fs.computeCardinalityOfSlice(4, value);
  64.         fs.loadInRam();
  65.         System.out
  66.                 .println("Now I'm going to repeatedly scan the data for slices using an in-memory array: ");
  67.         for (int value = 0; value < 10; ++value)
  68.             fs.computeCardinalityOfSlice(4, value);
  69.  
  70.     }
  71.  
  72.     public void loadInRam() {
  73.         System.out
  74.                 .println("Loading the entire table in RAM. You may need to adjust the flags on your JVM to make this work without OutOfMemoryError (hint: -Xmx2G may work).");
  75.         memoryversion = new ArrayList<int[]>();
  76.         for (int[] row : this) {
  77.             memoryversion.add(Arrays.copyOf(row, row.length));
  78.         }
  79.         System.out.println("Done!");
  80.     }
  81.  
  82.     public int computeCardinalityOfSlice(final int filtervalue,
  83.             final int filtercolumn) throws InterruptedException,
  84.             BrokenBarrierException {
  85.         long startTime = System.currentTimeMillis();
  86.         final int numberofthreads = Runtime.getRuntime().availableProcessors();
  87.         final CyclicBarrier cb = new CyclicBarrier(numberofthreads + 1);
  88.         final AtomicInteger counter = new AtomicInteger(0);
  89.         for (int t = 0; t < numberofthreads; ++t) {
  90.             final int ft = t;
  91.             Thread r = new Thread() {
  92.                 @Override
  93.                 public void run() {
  94.                     try {
  95.                         if (memoryversion != null)
  96.                             processRAMPartition(counter, filtervalue,
  97.                                     filtercolumn, ft, numberofthreads);
  98.                         else
  99.                             processDiskPartition(counter, filtervalue,
  100.                                     filtercolumn, ft, numberofthreads);
  101.                     } catch (IOException e) {
  102.                         e.printStackTrace();
  103.                     }
  104.                     try {
  105.                         cb.await();
  106.                     } catch (InterruptedException e) {
  107.                         e.printStackTrace();
  108.                     } catch (BrokenBarrierException e) {
  109.                         e.printStackTrace();
  110.                     }
  111.                 }
  112.             };
  113.             r.start();
  114.         }
  115.         cb.await();
  116.         long endTime = System.currentTimeMillis();
  117.         System.out.println("Completed scan in  " + (endTime - startTime)
  118.                 / 1000.0 + " s ");
  119.         System.out.println("** The cardinality of the slice is " + counter
  120.                 + " rows.");
  121.         return counter.intValue();
  122.  
  123.     }
  124.  
  125.     // this is a silly example, modify to fit your needs
  126.     // as it is, it computes the cardinality of a slice
  127.     private void processDiskPartition(AtomicInteger counter, int filtervalue,
  128.             int filtercolumn, final int whichthread, final int howmanythreads)
  129.             throws IOException {
  130.         final int startindex = startIndex(whichthread, howmanythreads);
  131.         final int endindex = endIndex(whichthread, howmanythreads);
  132.         Iterator<int[]> j = iteratorFromTo(startindex, endindex);
  133.         while (j.hasNext()) {
  134.             int[] row = j.next();
  135.             if (row[filtercolumn] == filtervalue) {
  136.                 counter.incrementAndGet();
  137.             }
  138.         }
  139.  
  140.     }
  141.  
  142.     // this is a silly example, modify to fit your needs
  143.     // as it is, it computes the cardinality of a slice
  144.     private void processRAMPartition(AtomicInteger counter, int filtervalue,
  145.             int filtercolumn, final int whichthread, final int howmanythreads)
  146.             throws IOException {
  147.         final int startindex = startIndex(whichthread, howmanythreads);
  148.         final int endindex = endIndex(whichthread, howmanythreads);
  149.         Iterator<int[]> j = memoryversion.subList(startindex, endindex)
  150.                 .iterator();
  151.         while (j.hasNext()) {
  152.             int[] row = j.next();
  153.             if (row[filtercolumn] == filtervalue) {
  154.                 counter.incrementAndGet();
  155.             }
  156.         }
  157.  
  158.     }
  159.  
  160.     private int startIndex(final int whichthread, final int howmanythreads) {
  161.         return N / howmanythreads * whichthread;
  162.     }
  163.  
  164.     private int endIndex(final int whichthread, final int howmanythreads) {
  165.         return whichthread + 1 == howmanythreads ? N : N / howmanythreads
  166.                 * (whichthread + 1);
  167.     }
  168.  
  169.     public Iterator<int[]> iterator() {
  170.         try {
  171.             FileChannel roChannel = new RandomAccessFile(backfile, "r")
  172.                     .getChannel();
  173.  
  174.             ByteBuffer readonlybuffer = roChannel.map(
  175.                     FileChannel.MapMode.READ_ONLY, 0, c * N * 4);
  176.             final IntBuffer ib = readonlybuffer.asIntBuffer();
  177.  
  178.             final int[] array = new int[c];
  179.             final int endrowid = N;
  180.             return new Iterator<int[]>() {
  181.                 int counter = 0;
  182.  
  183.                 @Override
  184.                 public boolean hasNext() {
  185.                     if (counter++ < endrowid) {
  186.                         ib.get(array);
  187.                         return true;
  188.                     }
  189.                     return false;
  190.                 }
  191.  
  192.                 @Override
  193.                 public int[] next() {
  194.                     return array;
  195.                 }
  196.  
  197.                 @Override
  198.                 public void remove() {
  199.                     throw new RuntimeException("not implemented");
  200.                 }
  201.  
  202.             };
  203.         } catch (IOException e1) {
  204.             e1.printStackTrace();
  205.         }
  206.         return null;
  207.     }
  208.  
  209.     public Iterator<int[]> iteratorFromTo(final int beginrowid,
  210.             final int endrowid) throws IOException {
  211.         FileChannel roChannel = new RandomAccessFile(backfile, "r")
  212.                 .getChannel();
  213.  
  214.         ByteBuffer readonlybuffer = roChannel.map(
  215.                 FileChannel.MapMode.READ_ONLY, c * beginrowid * 4, c
  216.                         * (endrowid - beginrowid) * 4);
  217.         final IntBuffer ib = readonlybuffer.asIntBuffer();
  218.         final int[] array = new int[c];
  219.         return new Iterator<int[]>() {
  220.             int counter = beginrowid;
  221.  
  222.             @Override
  223.             public boolean hasNext() {
  224.                 if (counter++ < endrowid) {
  225.                     ib.get(array);
  226.                     return true;
  227.                 }
  228.                 return false;
  229.             }
  230.  
  231.             @Override
  232.             public int[] next() {
  233.                 return array;
  234.             }
  235.  
  236.             @Override
  237.             public void remove() {
  238.                 throw new RuntimeException("not implemented");
  239.             }
  240.  
  241.         };
  242.     }
  243.  
  244. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement