SHARE
TWEET

FastTableScanning.java

a guest Dec 20th, 2010 816 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /*
  2.  * Usage:
  3.  *   java -Xmx2G FastTableScanning
  4.  *  
  5.  *   This code was written by D. Lemire, see http://lemire.me/.
  6.  *   It was the basis of a blog post : "For your in-memory databases, do you really need an index?"
  7.  *   It is in the public domain. (No copyright.)
  8.  *  
  9.  *  
  10.  */
  11.  
  12. import java.io.BufferedOutputStream;
  13. import java.io.DataOutputStream;
  14. import java.io.File;
  15. import java.io.FileOutputStream;
  16. import java.io.IOException;
  17. import java.io.RandomAccessFile;
  18. import java.nio.ByteBuffer;
  19. import java.nio.IntBuffer;
  20. import java.nio.channels.FileChannel;
  21. import java.util.ArrayList;
  22. import java.util.Arrays;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Random;
  26. import java.util.concurrent.BrokenBarrierException;
  27. import java.util.concurrent.CyclicBarrier;
  28. import java.util.concurrent.atomic.AtomicInteger;
  29.  
  30. public class FastTableScanning implements Iterable<int[]> {
  31.  
  32.         File backfile;
  33.         public int c, N;
  34.         public List<int[]> memoryversion;
  35.  
  36.         private FastTableScanning(int myN, int myc, int cardinality)
  37.                         throws IOException {
  38.                 backfile = File.createTempFile("PersistentNormalizedTable", "bin");
  39.                 backfile.deleteOnExit();// this defeats the purpose of "persistence" but
  40.                                                                 // will do for our limited purposes
  41.                 N = myN;
  42.                 c = myc;
  43.                 Random rand = new Random();
  44.                 System.out.println("Creating on disk a table with " + c
  45.                                 + " columns and " + N + " rows... (this can take some time)");
  46.                 DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(
  47.                                 new FileOutputStream(backfile)));
  48.                 for (int row = 0; row < N; ++row) {
  49.                         for (int column = 0; column < c; ++column)
  50.                                 dos.writeInt(rand.nextInt(cardinality));
  51.                 }
  52.                 dos.close();
  53.                 System.out
  54.                                 .println("Done! Wrote " + N * c * 4 / (1024 * 1024.0) + " MB");
  55.         }
  56.  
  57.         public static void main(String[] args) throws IOException,
  58.                         InterruptedException, BrokenBarrierException {
  59.                 FastTableScanning fs = new FastTableScanning(10000000, 10, 100);
  60.                 System.out
  61.                                 .println("Now I'm going to repeatedly scan the data for slices using a memory-mapped file: ");
  62.                 for (int value = 0; value < 10; ++value)
  63.                         fs.computeCardinalityOfSlice(4, value);
  64.                 fs.loadInRam();
  65.                 System.out
  66.                                 .println("Now I'm going to repeatedly scan the data for slices using an in-memory array: ");
  67.                 for (int value = 0; value < 10; ++value)
  68.                         fs.computeCardinalityOfSlice(4, value);
  69.  
  70.         }
  71.  
  72.         public void loadInRam() {
  73.                 System.out
  74.                                 .println("Loading the entire table in RAM. You may need to adjust the flags on your JVM to make this work without OutOfMemoryError (hint: -Xmx2G may work).");
  75.                 memoryversion = new ArrayList<int[]>();
  76.                 for (int[] row : this) {
  77.                         memoryversion.add(Arrays.copyOf(row, row.length));
  78.                 }
  79.                 System.out.println("Done!");
  80.         }
  81.  
  82.         public int computeCardinalityOfSlice(final int filtervalue,
  83.                         final int filtercolumn) throws InterruptedException,
  84.                         BrokenBarrierException {
  85.                 long startTime = System.currentTimeMillis();
  86.                 final int numberofthreads = Runtime.getRuntime().availableProcessors();
  87.                 final CyclicBarrier cb = new CyclicBarrier(numberofthreads + 1);
  88.                 final AtomicInteger counter = new AtomicInteger(0);
  89.                 for (int t = 0; t < numberofthreads; ++t) {
  90.                         final int ft = t;
  91.                         Thread r = new Thread() {
  92.                                 @Override
  93.                                 public void run() {
  94.                                         try {
  95.                                                 if (memoryversion != null)
  96.                                                         processRAMPartition(counter, filtervalue,
  97.                                                                         filtercolumn, ft, numberofthreads);
  98.                                                 else
  99.                                                         processDiskPartition(counter, filtervalue,
  100.                                                                         filtercolumn, ft, numberofthreads);
  101.                                         } catch (IOException e) {
  102.                                                 e.printStackTrace();
  103.                                         }
  104.                                         try {
  105.                                                 cb.await();
  106.                                         } catch (InterruptedException e) {
  107.                                                 e.printStackTrace();
  108.                                         } catch (BrokenBarrierException e) {
  109.                                                 e.printStackTrace();
  110.                                         }
  111.                                 }
  112.                         };
  113.                         r.start();
  114.                 }
  115.                 cb.await();
  116.                 long endTime = System.currentTimeMillis();
  117.                 System.out.println("Completed scan in  " + (endTime - startTime)
  118.                                 / 1000.0 + " s ");
  119.                 System.out.println("** The cardinality of the slice is " + counter
  120.                                 + " rows.");
  121.                 return counter.intValue();
  122.  
  123.         }
  124.  
  125.         // this is a silly example, modify to fit your needs
  126.         // as it is, it computes the cardinality of a slice
  127.         private void processDiskPartition(AtomicInteger counter, int filtervalue,
  128.                         int filtercolumn, final int whichthread, final int howmanythreads)
  129.                         throws IOException {
  130.                 final int startindex = startIndex(whichthread, howmanythreads);
  131.                 final int endindex = endIndex(whichthread, howmanythreads);
  132.                 Iterator<int[]> j = iteratorFromTo(startindex, endindex);
  133.                 while (j.hasNext()) {
  134.                         int[] row = j.next();
  135.                         if (row[filtercolumn] == filtervalue) {
  136.                                 counter.incrementAndGet();
  137.                         }
  138.                 }
  139.  
  140.         }
  141.  
  142.         // this is a silly example, modify to fit your needs
  143.         // as it is, it computes the cardinality of a slice
  144.         private void processRAMPartition(AtomicInteger counter, int filtervalue,
  145.                         int filtercolumn, final int whichthread, final int howmanythreads)
  146.                         throws IOException {
  147.                 final int startindex = startIndex(whichthread, howmanythreads);
  148.                 final int endindex = endIndex(whichthread, howmanythreads);
  149.                 Iterator<int[]> j = memoryversion.subList(startindex, endindex)
  150.                                 .iterator();
  151.                 while (j.hasNext()) {
  152.                         int[] row = j.next();
  153.                         if (row[filtercolumn] == filtervalue) {
  154.                                 counter.incrementAndGet();
  155.                         }
  156.                 }
  157.  
  158.         }
  159.  
  160.         private int startIndex(final int whichthread, final int howmanythreads) {
  161.                 return N / howmanythreads * whichthread;
  162.         }
  163.  
  164.         private int endIndex(final int whichthread, final int howmanythreads) {
  165.                 return whichthread + 1 == howmanythreads ? N : N / howmanythreads
  166.                                 * (whichthread + 1);
  167.         }
  168.  
  169.         public Iterator<int[]> iterator() {
  170.                 try {
  171.                         FileChannel roChannel = new RandomAccessFile(backfile, "r")
  172.                                         .getChannel();
  173.  
  174.                         ByteBuffer readonlybuffer = roChannel.map(
  175.                                         FileChannel.MapMode.READ_ONLY, 0, c * N * 4);
  176.                         final IntBuffer ib = readonlybuffer.asIntBuffer();
  177.  
  178.                         final int[] array = new int[c];
  179.                         final int endrowid = N;
  180.                         return new Iterator<int[]>() {
  181.                                 int counter = 0;
  182.  
  183.                                 @Override
  184.                                 public boolean hasNext() {
  185.                                         if (counter++ < endrowid) {
  186.                                                 ib.get(array);
  187.                                                 return true;
  188.                                         }
  189.                                         return false;
  190.                                 }
  191.  
  192.                                 @Override
  193.                                 public int[] next() {
  194.                                         return array;
  195.                                 }
  196.  
  197.                                 @Override
  198.                                 public void remove() {
  199.                                         throw new RuntimeException("not implemented");
  200.                                 }
  201.  
  202.                         };
  203.                 } catch (IOException e1) {
  204.                         e1.printStackTrace();
  205.                 }
  206.                 return null;
  207.         }
  208.  
  209.         public Iterator<int[]> iteratorFromTo(final int beginrowid,
  210.                         final int endrowid) throws IOException {
  211.                 FileChannel roChannel = new RandomAccessFile(backfile, "r")
  212.                                 .getChannel();
  213.  
  214.                 ByteBuffer readonlybuffer = roChannel.map(
  215.                                 FileChannel.MapMode.READ_ONLY, c * beginrowid * 4, c
  216.                                                 * (endrowid - beginrowid) * 4);
  217.                 final IntBuffer ib = readonlybuffer.asIntBuffer();
  218.                 final int[] array = new int[c];
  219.                 return new Iterator<int[]>() {
  220.                         int counter = beginrowid;
  221.  
  222.                         @Override
  223.                         public boolean hasNext() {
  224.                                 if (counter++ < endrowid) {
  225.                                         ib.get(array);
  226.                                         return true;
  227.                                 }
  228.                                 return false;
  229.                         }
  230.  
  231.                         @Override
  232.                         public int[] next() {
  233.                                 return array;
  234.                         }
  235.  
  236.                         @Override
  237.                         public void remove() {
  238.                                 throw new RuntimeException("not implemented");
  239.                         }
  240.  
  241.                 };
  242.         }
  243.  
  244. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top