Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package net.coderodde.util;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.List;
- import java.util.Objects;
- public class CoderoddeArrays {
- private static final int BITS_PER_BUCKET = 8;
- private static final int BUCKETS = 1 << BITS_PER_BUCKET;
- private static final int BUCKET_MASK = BUCKETS - 1;
- private static final long SIGN_MASK = 1L << 63;
- private static final int THREAD_THRESHOLD = 65536;
- private static final int MERGESORT_THRESHOLD = 4096;
- public static <E> void parallelSort(final Entry<E>[] array) {
- parallelSort(array, 0, array.length);
- }
- public static <E> void parallelSort(final Entry<E>[] array,
- final int fromIndex,
- final int toIndex) {
- final int RANGE_LENGTH = toIndex - fromIndex;
- if (RANGE_LENGTH < 2) {
- return;
- }
- final Entry<E>[] buffer = array.clone();
- final int threads = Math.min(RANGE_LENGTH / THREAD_THRESHOLD,
- Runtime.getRuntime()
- .availableProcessors());
- parallelSortImpl(array, buffer, threads, 0, fromIndex, toIndex);
- }
- public static final <E> boolean areEqual(final Entry<E>[]... arrays) {
- for (int i = 0; i < arrays.length - 1; ++i) {
- if (arrays[i].length != arrays[i + 1].length) {
- return false;
- }
- }
- for (int i = 0; i < arrays[0].length; ++i) {
- for (int j = 0; j < arrays.length - 1; ++j) {
- if (!Objects.equals(arrays[j][i], arrays[j + 1][i])) {
- return false;
- }
- }
- }
- return true;
- }
- public static final <E extends Comparable<? super E>>
- boolean isSorted(final E[] array,
- final int fromIndex,
- final int toIndex) {
- for (int i = fromIndex; i < toIndex - 1; ++i) {
- if (array[i].compareTo(array[i + 1]) > 0) {
- return false;
- }
- }
- return true;
- }
- public static final <E extends Comparable<? super E>>
- boolean isSorted(final E[] array) {
- return isSorted(array, 0, array.length);
- }
- private static final <E> void sortImpl(final Entry<E>[] source,
- final Entry<E>[] target,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- // Try merge sort.
- if (toIndex - fromIndex <= MERGESORT_THRESHOLD) {
- mergesortAndCleanUp(source,
- target,
- recursionDepth,
- fromIndex,
- toIndex);
- return;
- }
- final int[] bucketSizeMap = new int[BUCKETS];
- final int[] startIndexMap = new int[BUCKETS];
- final int[] processedMap = new int[BUCKETS];
- // Compute the size of each bucket.
- for (int i = fromIndex; i < toIndex; ++i) {
- bucketSizeMap[getBucket(source[i].key(), recursionDepth)]++;
- }
- // Initialize the start index map.
- startIndexMap[0] = fromIndex;
- // Compute the start index map in its entirety.
- for (int i = 1; i != BUCKETS; ++i) {
- startIndexMap[i] = startIndexMap[i - 1] +
- bucketSizeMap[i - 1];
- }
- // Insert the entries from 'source' into their respective 'target'.
- for (int i = fromIndex; i < toIndex; ++i) {
- final Entry<E> e = source[i];
- final int index = getBucket(source[i].key(), recursionDepth);
- target[startIndexMap[index] + processedMap[index]++] = e;
- }
- if (recursionDepth == 7) {
- // There is nowhere to recur, return.
- return;
- }
- // Recur to sort each bucket.
- for (int i = 0; i != BUCKETS; ++i) {
- if (bucketSizeMap[i] != 0) {
- sortImpl(target,
- source,
- recursionDepth + 1,
- startIndexMap[i],
- startIndexMap[i] + bucketSizeMap[i]);
- }
- }
- }
- private static final <E> boolean mergesort(final Entry<E>[] source,
- final Entry<E>[] target,
- final int fromIndex,
- final int toIndex) {
- final int RANGE_LENGTH = toIndex - fromIndex;
- Entry<E>[] s = source;
- Entry<E>[] t = target;
- int passes = 0;
- for (int width = 1; width < RANGE_LENGTH; width <<= 1) {
- ++passes;
- int c = 0;
- for (; c < RANGE_LENGTH / width; c += 2) {
- int left = fromIndex + c * width;
- int right = left + width;
- int i = left;
- final int leftBound = right;
- final int rightBound = Math.min(toIndex, right + width);
- while (left < leftBound && right < rightBound) {
- t[i++] = s[right].key() < s[left].key() ?
- s[right++] :
- s[left++];
- }
- while (left < leftBound) { t[i++] = s[left++]; }
- while (right < rightBound) { t[i++] = s[right++]; }
- }
- if (c * width < RANGE_LENGTH) {
- for (int i = fromIndex + c * width; i < toIndex; ++i) {
- t[i] = s[i];
- }
- }
- final Entry<E>[] tmp = s;
- s = t;
- t = tmp;
- }
- return (passes & 1) == 0;
- }
- private static final <E>
- void mergesortAndCleanUp(final Entry<E>[] source,
- final Entry<E>[] target,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- final boolean even = mergesort(source, target, fromIndex, toIndex);
- if (even) {
- // source contains the sorted range.
- if ((recursionDepth & 1) == 1) {
- // source is buffer, copy to target.
- System.arraycopy(source,
- fromIndex,
- target,
- fromIndex,
- toIndex - fromIndex);
- }
- } else {
- // target contains the sorted range.
- if ((recursionDepth & 1) == 0) {
- // target is buffer, copy to source.
- System.arraycopy(target,
- fromIndex,
- source,
- fromIndex,
- toIndex - fromIndex);
- }
- }
- }
- private static final class BucketSizeCounter<E> extends Thread {
- int[] localBucketSizeMap;
- private final Entry<E>[] source;
- private final int recursionDepth;
- private final int fromIndex;
- private final int toIndex;
- BucketSizeCounter(final Entry<E>[] source,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- this.source = source;
- this.recursionDepth = recursionDepth;
- this.fromIndex = fromIndex;
- this.toIndex = toIndex;
- }
- @Override
- public void run() {
- this.localBucketSizeMap = new int[BUCKETS];
- for (int i = fromIndex; i < toIndex; ++i) {
- localBucketSizeMap[getBucket(source[i].key(),
- recursionDepth)]++;
- }
- }
- }
- private static final class BucketInserter<E> extends Thread {
- private final int[] startIndexMap;
- private final int[] processedMap;
- private final Entry<E>[] source;
- private final Entry<E>[] target;
- private final int recursionDepth;
- private final int fromIndex;
- private final int toIndex;
- BucketInserter(final int[] startIndexMap,
- final int[] processedMap,
- final Entry<E>[] source,
- final Entry<E>[] target,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- this.startIndexMap = startIndexMap;
- this.processedMap = processedMap;
- this.source = source;
- this.target = target;
- this.recursionDepth = recursionDepth;
- this.fromIndex = fromIndex;
- this.toIndex = toIndex;
- }
- @Override
- public void run() {
- for (int i = fromIndex; i < toIndex; ++i) {
- final Entry<E> e = source[i];
- final int index = getBucket(e.key(), recursionDepth);
- target[startIndexMap[index] + processedMap[index]++] = e;
- }
- }
- }
- private static final class Sorter<E> extends Thread {
- private final List<Task<E>> taskList;
- Sorter(final List<Task<E>> taskList) {
- this.taskList = taskList;
- }
- @Override
- public void run() {
- for (final Task task : taskList) {
- // Choose parallel or sequential.
- if (task.threads > 1) {
- parallelSortImpl(task.source,
- task.target,
- task.threads,
- task.recursionDepth,
- task.fromIndex,
- task.toIndex);
- } else {
- sortImpl(task.source,
- task.target,
- task.recursionDepth,
- task.fromIndex,
- task.toIndex);
- }
- }
- }
- }
- private static final class Task<E> {
- private final Entry<E>[] source;
- private final Entry<E>[] target;
- private final int threads;
- private final int recursionDepth;
- private final int fromIndex;
- private final int toIndex;
- Task(final Entry<E>[] source,
- final Entry<E>[] target,
- final int threads,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- this.source = source;
- this.target = target;
- this.threads = threads;
- this.recursionDepth = recursionDepth;
- this.fromIndex = fromIndex;
- this.toIndex = toIndex;
- }
- }
- private static final <E> void parallelSortImpl(final Entry<E>[] source,
- final Entry<E>[] target,
- final int threads,
- final int recursionDepth,
- final int fromIndex,
- final int toIndex) {
- final int RANGE_LENGTH = toIndex - fromIndex;
- if (RANGE_LENGTH <= MERGESORT_THRESHOLD) {
- mergesortAndCleanUp(source,
- target,
- recursionDepth,
- fromIndex,
- toIndex);
- return;
- }
- if (threads < 2) {
- sortImpl(source, target, recursionDepth, fromIndex, toIndex);
- return;
- }
- // Create the bucket size counter threads.
- final BucketSizeCounter[] counters = new BucketSizeCounter[threads];
- final int SUB_RANGE_LENGTH = RANGE_LENGTH / threads;
- int start = fromIndex;
- for (int i = 0; i != threads - 1; ++i, start += SUB_RANGE_LENGTH) {
- counters[i] = new BucketSizeCounter<>(source,
- recursionDepth,
- start,
- start + SUB_RANGE_LENGTH);
- counters[i].start();
- }
- counters[threads - 1] =
- new BucketSizeCounter<>(source,
- recursionDepth,
- start,
- toIndex);
- // Run the last counter in this thread while other are already on their
- // way.
- counters[threads - 1].run();
- try {
- for (int i = 0; i != threads - 1; ++i) {
- counters[i].join();
- }
- } catch (final InterruptedException ie) {
- ie.printStackTrace();
- return;
- }
- final int[] bucketSizeMap = new int[BUCKETS];
- final int[] startIndexMap = new int[BUCKETS];
- // Count the size of each processed bucket.
- for (int i = 0; i != threads; ++i) {
- for (int j = 0; j != BUCKETS; ++j) {
- bucketSizeMap[j] += counters[i].localBucketSizeMap[j];
- }
- }
- // Prepare the starting indices of each bucket.
- startIndexMap[0] = fromIndex;
- for (int i = 1; i != BUCKETS; ++i) {
- startIndexMap[i] = startIndexMap[i - 1] +
- bucketSizeMap[i - 1];
- }
- // Create the inserter threads.
- final BucketInserter<E>[] inserters = new BucketInserter[threads - 1];
- final int[][] processedMaps = new int[threads][BUCKETS];
- // Make processedMaps of each thread independent of the other.
- for (int i = 1; i != threads; ++i) {
- int[] partialBucketSizeMap = counters[i - 1].localBucketSizeMap;
- for (int j = 0; j != BUCKETS; ++j) {
- processedMaps[i][j] =
- processedMaps[i - 1][j] + partialBucketSizeMap[j];
- }
- }
- int startIndex = fromIndex;
- for (int i = 0; i != threads - 1; ++i, startIndex += SUB_RANGE_LENGTH) {
- inserters[i] =
- new BucketInserter<>(startIndexMap,
- processedMaps[i],
- source,
- target,
- recursionDepth,
- startIndex,
- startIndex + SUB_RANGE_LENGTH);
- inserters[i].start();
- }
- // Run the last inserter in this thread while other are on their ways.
- new BucketInserter<>(startIndexMap,
- processedMaps[threads - 1],
- source,
- target,
- recursionDepth,
- startIndex,
- toIndex).run();
- try {
- for (int i = 0; i != threads - 1; ++i) {
- inserters[i].join();
- }
- } catch (final InterruptedException ie) {
- ie.printStackTrace();
- return;
- }
- if (recursionDepth == 7) {
- // Nowhere to recur.
- return;
- }
- int nonEmptyBucketAmount = 0;
- for (int i : bucketSizeMap) {
- if (i != 0) {
- ++nonEmptyBucketAmount;
- }
- }
- final int SPAWN_DEGREE = Math.min(nonEmptyBucketAmount, threads);
- final List<Integer>[] bucketIndexListArray = new List[SPAWN_DEGREE];
- for (int i = 0; i != SPAWN_DEGREE; ++i) {
- bucketIndexListArray[i] = new ArrayList<>(nonEmptyBucketAmount);
- }
- final int[] threadCountMap = new int[SPAWN_DEGREE];
- for (int i = 0; i != SPAWN_DEGREE; ++i) {
- threadCountMap[i] = threads / SPAWN_DEGREE;
- }
- for (int i = 0; i != threads % SPAWN_DEGREE; ++i) {
- ++threadCountMap[i];
- }
- final List<Integer> nonEmptyBucketIndices =
- new ArrayList<>(nonEmptyBucketAmount);
- for (int i = 0; i != BUCKETS; ++i) {
- if (bucketSizeMap[i] != 0) {
- nonEmptyBucketIndices.add(i);
- }
- }
- Collections.sort(nonEmptyBucketIndices,
- new BucketSizeComparator(bucketSizeMap));
- final int OPTIMAL_SUBRANGE_LENGTH = RANGE_LENGTH / SPAWN_DEGREE;
- int listIndex = 0;
- int packed = 0;
- int f = 0;
- int j = 0;
- while (j < nonEmptyBucketIndices.size()) {
- int tmp = bucketSizeMap[nonEmptyBucketIndices.get(j++)];
- packed += tmp;
- if (packed >= OPTIMAL_SUBRANGE_LENGTH
- || j == nonEmptyBucketIndices.size()) {
- packed = 0;
- for (int i = f; i < j; ++i) {
- bucketIndexListArray[listIndex]
- .add(nonEmptyBucketIndices.get(i));
- }
- ++listIndex;
- f = j;
- }
- }
- final Sorter[] sorters = new Sorter[SPAWN_DEGREE];
- final List<List<Task<E>>> llt = new ArrayList<>(SPAWN_DEGREE);
- for (int i = 0; i != SPAWN_DEGREE; ++i) {
- final List<Task<E>> lt = new ArrayList<>();
- for (int idx : bucketIndexListArray[i]) {
- lt.add(new Task<>(target,
- source,
- threadCountMap[i],
- recursionDepth + 1,
- startIndexMap[idx],
- startIndexMap[idx] + bucketSizeMap[idx]));
- }
- llt.add(lt);
- }
- for (int i = 0; i != SPAWN_DEGREE - 1; ++i) {
- sorters[i] = new Sorter<>(llt.get(i));
- sorters[i].start();
- }
- new Sorter<>(llt.get(SPAWN_DEGREE - 1)).run();
- try {
- for (int i = 0; i != SPAWN_DEGREE - 1; ++i) {
- sorters[i].join();
- }
- } catch (final InterruptedException ie) {
- ie.printStackTrace();
- return;
- }
- }
- private static final class BucketSizeComparator
- implements Comparator<Integer> {
- private final int[] bucketSizeMap;
- BucketSizeComparator(final int[] bucketSizeMap) {
- this.bucketSizeMap = bucketSizeMap;
- }
- @Override
- public int compare(final Integer i1, final Integer i2) {
- final int sz1 = bucketSizeMap[i1];
- final int sz2 = bucketSizeMap[i2];
- return sz2 - sz1;
- }
- }
- private static final int getBucket(final long key,
- final int recursionDepth) {
- final int bitShift = 64 - (recursionDepth + 1) * BITS_PER_BUCKET;
- return (int)((key ^ SIGN_MASK) >>> bitShift) & BUCKET_MASK;
- }
- }
- package net.coderodde.util;
- public final class Entry<E> implements Comparable<Entry<E>> {
- private final long key;
- private final E satelliteData;
- public Entry(final long key, final E satelliteData) {
- this.key = key;
- this.satelliteData = satelliteData;
- }
- public long key() {
- return key;
- }
- public E satelliteData() {
- return satelliteData;
- }
- @Override
- public int compareTo(Entry<E> o) {
- return Long.compare(key, o.key);
- }
- }
- package net.coderodde.util;
- import java.util.Arrays;
- import java.util.Random;
- public class Demo {
- private static final int N = 10000000;
- public static void main(final String... args) {
- final long seed = System.currentTimeMillis();
- final Random rnd = new Random(seed);
- final Entry<Integer>[] array1 = getRandomEntryArray(N, rnd);
- final Entry<Integer>[] array2 = array1.clone();
- final Entry<Integer>[] array3 = array1.clone();
- System.out.println("Seed: " + seed);
- long ta = System.currentTimeMillis();
- net.coderodde.util.CoderoddeArrays.parallelSort(array1);
- long tb = System.currentTimeMillis();
- System.out.println("net.coderodde.util.CoderoddeArrays.parallelSort " +
- "in " + (tb - ta) + " ms.");
- ta = System.currentTimeMillis();
- Arrays.parallelSort(array2);
- tb = System.currentTimeMillis();
- System.out.println("java.util.Arrays.parallelSort in " +
- (tb - ta) + " ms.");
- ta = System.currentTimeMillis();
- Arrays.sort(array3);
- tb = System.currentTimeMillis();
- System.out.println("java.util.Arrays.sort in " + (tb - ta) + " ms.");
- System.out.println("Arrays are equal: " +
- CoderoddeArrays.areEqual(array1, array2, array3));
- System.out.println("Sorted: " + CoderoddeArrays.isSorted(array1));
- }
- private static Entry<Integer>[] getRandomEntryArray(final int size,
- final Random rnd) {
- final Entry<Integer>[] array = new Entry[size];
- for (int i = 0; i < size; ++i) {
- array[i] = new Entry<>(rnd.nextLong(), null);
- }
- return array;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement