SHARE
TWEET

Untitled

a guest Jun 16th, 2019 65 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import com.google.common.util.concurrent.MoreExecutors;
  2. import com.lucidworks.apollo.component.ExecutorComponent;
  3. import com.lucidworks.apollo.pipeline.*;
  4. import com.lucidworks.apollo.pipeline.async.AbstractAsyncStageConfig;
  5. import com.lucidworks.apollo.pipeline.async.AsyncStage;
  6. import com.lucidworks.apollo.pipeline.async.AsyncStageConfig;
  7.  
  8. import java.util.concurrent.ExecutorService;
  9.  
  10. /**
  11.  * Convenience base class for stages which need asynchronous processing.
  12.  *
  13.  * @param <C> The config class, subclass of {@link AbstractAsyncStageConfig}
  14.  * @param <R> The intermediate result type, passed between {@link #doProcessing} and {@link #mergeResults}.
  15.  * @see AsyncStage
  16.  */
  17. public abstract class AbstractAsyncQueryStage<C extends AbstractAsyncStageConfig, R> extends QueryStage<C>
  18.     implements AsyncStage<R, QueryRequestAndResponse> {
  19.  
  20.   private final ExecutorService executorService;
  21.  
  22.   protected AbstractAsyncQueryStage(StageAssistFactoryParams params, ExecutorComponent executorComponent) {
  23.     super(params);
  24.     if(getConfiguration().isAsyncEnabled()) {
  25.       int threadPoolSize = Math.max(0, params.getConfigurationComponent().getInt(
  26.           "com.lucidworks.apollo.pipeline.query.async.threadPoolSize", 1000));
  27.       if(threadPoolSize == 0) {
  28.         executorService = executorComponent.getOrCreateCachedThreadPool("system-async-query-pipelines");
  29.       } else {
  30.         executorService = executorComponent.getOrCreateFixedThreadPool(threadPoolSize, "system-async-query-pipelines");
  31.       }
  32.     } else {
  33.       executorService = MoreExecutors.newDirectExecutorService();
  34.     }
  35.   }
  36.  
  37.   @Override
  38.   public final QueryRequestAndResponse process(QueryRequestAndResponse message, Context context) throws Exception {
  39.  
  40.     if(getConfiguration().isAsyncEnabled()) {
  41.       AsyncStageConfig asyncConfig = getConfiguration().getAsyncConfig();
  42.       AsyncStage.runInExecutor(this, message, context, asyncConfig.getAsyncId(), executorService);
  43.     } else {
  44.       AsyncStage.runInForeground(this, message, context);
  45.     }
  46.     return message;
  47.   }
  48. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top