Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.orientechnologies.common.concur.ONeedRetryException;
- import com.orientechnologies.orient.core.config.OGlobalConfiguration;
- import com.orientechnologies.orient.core.id.ORID;
- import com.orientechnologies.orient.core.sql.executor.OResult;
- import org.apache.tinkerpop.gremlin.orientdb.OrientGraph;
- import org.apache.tinkerpop.gremlin.orientdb.OrientGraphFactory;
- import org.apache.tinkerpop.gremlin.orientdb.executor.OGremlinResultSet;
- import org.apache.tinkerpop.gremlin.structure.Vertex;
- import org.junit.Assert;
- import org.junit.Test;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.atomic.AtomicLong;
- /**
- * Created by luigidellaquila on 27/07/17.
- */
- public class TestCommitRetry {
- static int N_THREADS = 8;
- static int NUM_OF_BLOCKS = 100;
- static int VERTICES_PER_BLOCK = 10;
- static int NUM_OF_RETRIES = 200;
- private ORID rootId;
- public TestCommitRetry() {
- //set this threshold high, so that you have a high chance to have ONeedRetryException
- OGlobalConfiguration.RID_BAG_EMBEDDED_TO_SBTREEBONSAI_THRESHOLD.setValue(1000000);
- }
- final AtomicLong recognizedFailures = new AtomicLong();
- class ThreadCorrect extends Thread {
- private final OrientGraphFactory factory;
- public ThreadCorrect(OrientGraphFactory factory) {
- this.factory = factory;
- }
- @Override
- public void run() {
- OrientGraph db = factory.getTx();
- for (int i = 0; i < NUM_OF_BLOCKS; i++) {
- boolean committed = false;
- db.begin();
- for (int j = 0; j < NUM_OF_RETRIES; j++) {
- Vertex root = db.vertices(rootId).next(); // reload the root at each retry!!!
- try {
- createSubgraph(db, root);
- db.commit();
- //if everything is OK, stop retrying
- committed = true;
- break;
- } catch (ONeedRetryException ex) {
- //it case of error, rollback and retry the whole operation
- db.rollback();
- db.begin();
- }
- }
- if (!committed) {
- recognizedFailures.incrementAndGet();
- }
- }
- db.close();
- }
- private void createSubgraph(OrientGraph db, Vertex root) {
- for (int j = 0; j < VERTICES_PER_BLOCK; j++) {
- Vertex vertex = db.addVertex("V");
- root.addEdge("E", vertex);
- }
- }
- }
- @Test
- public void test() throws InterruptedException {
- final OrientGraphFactory factory = new OrientGraphFactory("memory:testCorrect");
- //create the root vertex
- OrientGraph db = factory.getNoTx();
- Vertex root = db.addVertex("V");
- rootId = (ORID) root.id();
- db.close();
- //run N_THREADS threads in parallel that do massive insert
- List<Thread> list = new ArrayList<>();
- for (int i = 0; i < N_THREADS; i++) {
- ThreadCorrect thread = new ThreadCorrect(factory);
- list.add(thread);
- thread.start();
- }
- for (Thread thread : list) {
- thread.join();
- }
- //check that no thread notified a failure
- Assert.assertEquals(0, recognizedFailures.intValue());
- db = factory.getNoTx();
- //check that there are actually all the records in the DB
- OGremlinResultSet rs = db.executeSql("SELECT count(*) as count FROM V");
- OResult item = rs.getRawResultSet().next();
- Assert.assertEquals((long) N_THREADS * NUM_OF_BLOCKS * VERTICES_PER_BLOCK + 1, (long) item.getProperty("count"));
- rs.close();
- db.close();
- factory.close();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement