Advertisement
Guest User

Untitled

a guest
Nov 29th, 2016
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.66 KB | None | 0 0
  1. package ft;
  2.  
  3. import java.util.Optional;
  4. import java.util.function.Consumer;
  5.  
  6. import org.apache.accumulo.core.client.Connector;
  7. import org.apache.accumulo.core.client.ZooKeeperInstance;
  8. import org.apache.accumulo.core.client.security.tokens.PasswordToken;
  9. import org.apache.accumulo.core.data.Mutation;
  10. import org.apache.fluo.api.client.TransactionBase;
  11. import org.apache.fluo.api.config.FluoConfiguration;
  12. import org.apache.fluo.api.config.SimpleConfiguration;
  13. import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
  14. import org.apache.fluo.recipes.core.export.ExportQueue;
  15. import org.apache.fluo.recipes.core.export.SequencedExport;
  16. import org.apache.fluo.recipes.test.FluoITHelper;
  17.  
  18. public class ExternalIndex {
  19. public static String EXPORT_QUEUE_ID = "eq";
  20.  
  21. public static class CountChange {
  22. private final Long oldCount;
  23. private final Long newCount;
  24.  
  25. public CountChange(){
  26. oldCount = null;
  27. newCount = null;
  28. }
  29.  
  30. public CountChange(Optional<Long> oldCount, Optional<Long> newCount) {
  31. this.oldCount = oldCount.orElse(null);
  32. this.newCount = newCount.orElse(null);
  33. }
  34.  
  35. public Long getOldCount() {
  36. return oldCount;
  37. }
  38.  
  39. public Long getNewCount() {
  40. return newCount;
  41. }
  42. }
  43.  
  44. public static class SimpleExporter extends AccumuloExporter<String, CountChange> {
  45.  
  46. @Override
  47. protected void translate(SequencedExport<String, CountChange> export,
  48. Consumer<Mutation> consumer) {
  49. CountChange change = export.getValue();
  50. String word = export.getKey();
  51.  
  52. //use row of <count>:<word>
  53. //TODO create a mutation that deletes old count (use seq for timestamp)
  54. //TODO create a mutation that inserts new count (use seq for timestamp)
  55. }
  56. }
  57.  
  58. /**
  59. * Create the external export table and configure an export queue to write to it.
  60. */
  61. public static void setup(FluoConfiguration config) {
  62.  
  63. String instance = config.getAccumuloInstance();
  64. String zookeepers = config.getAccumuloZookeepers();
  65. String user = config.getAccumuloUser();
  66. String pass = config.getAccumuloPassword();
  67. String table = "queryTable";
  68.  
  69. try {
  70. ZooKeeperInstance zk = new ZooKeeperInstance(instance, zookeepers);
  71. Connector conn = zk.getConnector(user, new PasswordToken(pass));
  72. conn.tableOperations().create(table);
  73. } catch (Exception e) {
  74. throw new RuntimeException(e);
  75. }
  76. // Create config for export table.
  77. AccumuloExporter.Configuration exportTableCfg =
  78. new AccumuloExporter.Configuration(instance, zookeepers, user, pass, table);
  79.  
  80. // Create config for export queue.
  81. ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class,
  82. String.class, CountChange.class, 3).setExporterConfiguration(exportTableCfg);
  83.  
  84. // Configure export queue. This will modify fluoConfig.
  85. ExportQueue.configure(config, eqOpts);
  86. }
  87.  
  88. public static void printTable(FluoConfiguration config){
  89. String table = "queryTable";
  90.  
  91. try {
  92. ZooKeeperInstance zk = new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers());
  93. Connector conn = zk.getConnector(config.getAccumuloUser(), new PasswordToken(config.getAccumuloPassword()));
  94. FluoITHelper.printAccumuloTable(conn, table);
  95. } catch (Exception e) {
  96. throw new RuntimeException(e);
  97. }
  98. }
  99.  
  100. private ExportQueue<String, CountChange> exportQueue;
  101.  
  102. ExternalIndex(SimpleConfiguration appConfig) {
  103. exportQueue = ExportQueue.getInstance(EXPORT_QUEUE_ID, appConfig);
  104. }
  105.  
  106. public void exportTo(TransactionBase tx, String word, Optional<Long> oldCount,
  107. Optional<Long> newCount) {
  108. exportQueue.add(tx, word, new CountChange(oldCount, newCount));
  109. }
  110. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement