Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ft;
- import java.util.Optional;
- import java.util.function.Consumer;
- import org.apache.accumulo.core.client.Connector;
- import org.apache.accumulo.core.client.ZooKeeperInstance;
- import org.apache.accumulo.core.client.security.tokens.PasswordToken;
- import org.apache.accumulo.core.data.Mutation;
- import org.apache.fluo.api.client.TransactionBase;
- import org.apache.fluo.api.config.FluoConfiguration;
- import org.apache.fluo.api.config.SimpleConfiguration;
- import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
- import org.apache.fluo.recipes.core.export.ExportQueue;
- import org.apache.fluo.recipes.core.export.SequencedExport;
- import org.apache.fluo.recipes.test.FluoITHelper;
- public class ExternalIndex {
- public static String EXPORT_QUEUE_ID = "eq";
- public static class CountChange {
- private final Long oldCount;
- private final Long newCount;
- public CountChange(){
- oldCount = null;
- newCount = null;
- }
- public CountChange(Optional<Long> oldCount, Optional<Long> newCount) {
- this.oldCount = oldCount.orElse(null);
- this.newCount = newCount.orElse(null);
- }
- public Long getOldCount() {
- return oldCount;
- }
- public Long getNewCount() {
- return newCount;
- }
- }
- public static class SimpleExporter extends AccumuloExporter<String, CountChange> {
- @Override
- protected void translate(SequencedExport<String, CountChange> export,
- Consumer<Mutation> consumer) {
- CountChange change = export.getValue();
- String word = export.getKey();
- //use row of <count>:<word>
- //TODO create a mutation that deletes old count (use seq for timestamp)
- //TODO create a mutation that inserts new count (use seq for timestamp)
- }
- }
- /**
- * Create the external export table and configure an export queue to write to it.
- */
- public static void setup(FluoConfiguration config) {
- String instance = config.getAccumuloInstance();
- String zookeepers = config.getAccumuloZookeepers();
- String user = config.getAccumuloUser();
- String pass = config.getAccumuloPassword();
- String table = "queryTable";
- try {
- ZooKeeperInstance zk = new ZooKeeperInstance(instance, zookeepers);
- Connector conn = zk.getConnector(user, new PasswordToken(pass));
- conn.tableOperations().create(table);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- // Create config for export table.
- AccumuloExporter.Configuration exportTableCfg =
- new AccumuloExporter.Configuration(instance, zookeepers, user, pass, table);
- // Create config for export queue.
- ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class,
- String.class, CountChange.class, 3).setExporterConfiguration(exportTableCfg);
- // Configure export queue. This will modify fluoConfig.
- ExportQueue.configure(config, eqOpts);
- }
- public static void printTable(FluoConfiguration config){
- String table = "queryTable";
- try {
- ZooKeeperInstance zk = new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers());
- Connector conn = zk.getConnector(config.getAccumuloUser(), new PasswordToken(config.getAccumuloPassword()));
- FluoITHelper.printAccumuloTable(conn, table);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- private ExportQueue<String, CountChange> exportQueue;
- ExternalIndex(SimpleConfiguration appConfig) {
- exportQueue = ExportQueue.getInstance(EXPORT_QUEUE_ID, appConfig);
- }
- public void exportTo(TransactionBase tx, String word, Optional<Long> oldCount,
- Optional<Long> newCount) {
- exportQueue.add(tx, word, new CountChange(oldCount, newCount));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement