Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ListenableFuture<Long> nextFsyncTask =
- new ListenableFutureTask<Long>(
- new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- nextFsyncTask = //Create next fsync task for new writes
- fsync();
- return System.currentTimeMillis();
- }
- }
- ListenableFuture<PutResult> put(final byte key[], final byte value[]) {
- final long start = System.currentTimeMillis();
- final ListenableFuture<byte[]> compressionTask = //submit to compression thread pool
- final SettableFuture<PutResult> retval = new SettableFuture<PutResult>();
- compressionTask.addListener(
- @Override
- new Runnable() {
- public void run() {
- m_writeThread.execute( new Runnable() {
- @Override
- public void run() {
- byte compressedValue[];
- try {
- compressedValue = compressionTask.get();
- } catch (ExcecutionException e) {
- retval.setException(e);
- return;
- }
- final int compressedLength = compressedValue.length;
- try {
- write(value, compressedValue);
- } catch (Exception e) {
- retval.setException(e);
- return;
- }
- final ListenableFuture fsyncTask = nextFsyncTask;
- fsyncTask.addListener(
- new Runnable() {
- @Override
- public void run() {
- try {
- final long latency = start - fsyncTask.get();
- retval.set(new PutResult(latency, value.length, compressedLength);
- } catch (Exception e) {
- retval.setException(e);
- return;
- }
- }
- });
- }
- });
- }
- },
- MoreExecutors.sameThreadExecutor());
- return retval;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement