Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class AtomicReferenceBlockingPool<O> implements BlockingPool<O> {
- private final Supplier<O> supplier;
- private final Semaphore semaphore;
- private final List<O> objects;
- private final List<Entry<O>> pool;
- AtomicReferenceBlockingPool(int capacity, Supplier<O> supplier) {
- checkArgument(capacity > 0, "illegal capacity: %s; must be greater than zero", capacity);
- checkNotNull(supplier, "supplier cannot be null");
- this.supplier = supplier;
- semaphore = new Semaphore(capacity, true);
- objects = Collections.synchronizedList(Lists.<O>newArrayList());
- pool = initPool(capacity);
- }
- private List<Entry<O>> initPool(int capacity) {
- ImmutableList.Builder<Entry<O>> builder = ImmutableList.builder();
- for (int i = 0; i < capacity; i++)
- builder.add(new AtomicReferenceEntry<O>());
- return builder.build();
- }
- @Override
- public O poll() {
- if (semaphore.tryAcquire())
- return takeFromNextAvailable();
- return null;
- }
- @Override
- public O take() throws InterruptedException {
- semaphore.acquire();
- return takeFromNextAvailable();
- }
- @Override
- public O take(long timeout, TimeUnit unit) throws InterruptedException {
- if (semaphore.tryAcquire(timeout, unit))
- return takeFromNextAvailable();
- return null;
- }
- private O takeFromNextAvailable() {
- O object = null;
- for (Iterator<Entry<O>> i = pool.iterator(); object == null;) {
- Entry<O> entry = i.next();
- if (entry.available())
- object = entry.take();
- else
- object = makeAvailableAndTakeFrom(entry);
- }
- return object;
- }
- private O makeAvailableAndTakeFrom(Entry<O> entry) {
- O object = supplier.get();
- objects.add(object);
- entry.put(object);
- return entry.take();
- }
- @Override
- public void put(O object) {
- checkTakenFromHere(object);
- putIntoNextUnavailable(object);
- semaphore.release();
- }
- private void putIntoNextUnavailable(O object) {
- for (Entry<O> entry : pool)
- if (!entry.available()) {
- entry.put(object);
- return;
- }
- }
- @Override
- public void discard(O object) {
- checkTakenFromHere(object);
- objects.remove(object);
- semaphore.release();
- }
- private void checkTakenFromHere(O object) {
- checkState(objects.contains(object), "%s wasn't taken from this pool", object);
- }
- @Override
- public Iterator<O> iterator() {
- return Iterators.unmodifiableIterator(objects.iterator());
- }
- }
- class AtomicReferenceEntry<O> implements Entry<O> {
- private final AtomicReference<O> reference;
- AtomicReferenceEntry() {
- reference = new AtomicReference<O>(null);
- }
- @Override
- public O take() {
- return reference.getAndSet(null);
- }
- @Override
- public void put(O object) {
- reference.compareAndSet(null, object);
- }
- @Override
- public boolean available() {
- return reference.get() != null;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement