Advertisement
Guest User

Untitled

a guest
Jun 16th, 2019
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.00 KB | None | 0 0
  1. import com.google.common.util.concurrent.MoreExecutors;
  2. import com.lucidworks.apollo.component.ExecutorComponent;
  3. import com.lucidworks.apollo.pipeline.*;
  4. import com.lucidworks.apollo.pipeline.async.AbstractAsyncStageConfig;
  5. import com.lucidworks.apollo.pipeline.async.AsyncStage;
  6. import com.lucidworks.apollo.pipeline.async.AsyncStageConfig;
  7.  
  8. import java.util.concurrent.ExecutorService;
  9.  
  10. /**
  11. * Convenience base class for stages which need asynchronous processing.
  12. *
  13. * @param <C> The config class, subclass of {@link AbstractAsyncStageConfig}
  14. * @param <R> The intermediate result type, passed between {@link #doProcessing} and {@link #mergeResults}.
  15. * @see AsyncStage
  16. */
  17. public abstract class AbstractAsyncQueryStage<C extends AbstractAsyncStageConfig, R> extends QueryStage<C>
  18. implements AsyncStage<R, QueryRequestAndResponse> {
  19.  
  20. private final ExecutorService executorService;
  21.  
  22. protected AbstractAsyncQueryStage(StageAssistFactoryParams params, ExecutorComponent executorComponent) {
  23. super(params);
  24. if(getConfiguration().isAsyncEnabled()) {
  25. int threadPoolSize = Math.max(0, params.getConfigurationComponent().getInt(
  26. "com.lucidworks.apollo.pipeline.query.async.threadPoolSize", 1000));
  27. if(threadPoolSize == 0) {
  28. executorService = executorComponent.getOrCreateCachedThreadPool("system-async-query-pipelines");
  29. } else {
  30. executorService = executorComponent.getOrCreateFixedThreadPool(threadPoolSize, "system-async-query-pipelines");
  31. }
  32. } else {
  33. executorService = MoreExecutors.newDirectExecutorService();
  34. }
  35. }
  36.  
  37. @Override
  38. public final QueryRequestAndResponse process(QueryRequestAndResponse message, Context context) throws Exception {
  39.  
  40. if(getConfiguration().isAsyncEnabled()) {
  41. AsyncStageConfig asyncConfig = getConfiguration().getAsyncConfig();
  42. AsyncStage.runInExecutor(this, message, context, asyncConfig.getAsyncId(), executorService);
  43. } else {
  44. AsyncStage.runInForeground(this, message, context);
  45. }
  46. return message;
  47. }
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement