Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Test
- public void testOrphanBlobStore() throws Exception {
- int instances = 2;
- long cleanupInterval = 1000l;
- BlobClient blobClient = null;
- BlobStoreService blobStoreService = null;
- BlobServer[] blobServers = new BlobServer[instances];
- BlobLibraryCacheManager[] managers = new BlobLibraryCacheManager[instances];
- InetSocketAddress[] addresses = new InetSocketAddress[instances];
- Random random = new Random();
- try {
- final Configuration config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
- temporaryFolder.getRoot().getPath() + "/tmp-data");
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- for (int i = 0; i < instances; i++) {
- blobServers[i] = new BlobServer(config, blobStoreService);
- addresses[i] = new InetSocketAddress("0.0.0.0", blobServers[i].getPort());
- managers[i] = new BlobLibraryCacheManager(blobServers[i], cleanupInterval);
- }
- blobClient = new BlobClient(addresses[0], config);
- byte[] blob_1 = new byte[1024];
- byte[] blob_2 = new byte[1024];
- random.nextBytes(blob_1);
- random.nextBytes(blob_2);
- BlobKey key_1 = blobClient.put(blob_1);
- BlobKey key_2 = blobClient.put(blob_2);
- List<BlobKey> orphans = new ArrayList<>();
- orphans.add(key_1);
- orphans.add(key_2);
- JobID jobOrphan = new JobID();
- managers[0].registerJob(jobOrphan, orphans, Collections.<URL>emptyList());
- assertTrue(managers[0].getFile(key_1).exists());
- assertTrue(managers[0].getFile(key_2).exists());
- assertTrue(managers[1].getFile(key_1).exists());
- assertTrue(managers[1].getFile(key_2).exists());
- // server restarted and manager recovered to another server
- blobServers[0] = new BlobServer(config, blobStoreService);
- managers[0] = new BlobLibraryCacheManager(blobServers[1], cleanupInterval);
- byte[] blob_3 = new byte[1024];
- random.nextBytes(blob_3);
- BlobKey key_3 = blobClient.put(blob_3);
- List<BlobKey> blobs = new ArrayList<>();
- blobs.add(key_3);
- JobID job = new JobID();
- managers[0].registerJob(job, blobs, Collections.<URL>emptyList());
- assertTrue(managers[0].getFile(key_1).exists());
- assertTrue(managers[0].getNumberOfReferencesByKey(key_1) == 0);
- assertTrue(managers[0].getFile(key_2).exists());
- assertTrue(managers[0].getNumberOfReferencesByKey(key_2) == 0);
- assertTrue(managers[0].getFile(key_3).exists());
- assertTrue(managers[0].getNumberOfReferencesByKey(key_3) == 1);
- managers[0].unregisterJob(jobOrphan);
- managers[0].unregisterJob(job);
- // Cleaning up
- Thread.sleep(cleanupInterval + 1);
- // Checking orphan references
- //----------------------------------------------------------------------
- assertTrue(managers[0].getNumberOfReferencesByKey(key_1) == 0);
- assertTrue(managers[0].getFile(key_1).exists());
- assertTrue(managers[0].getNumberOfReferencesByKey(key_2) == 0);
- assertTrue(managers[0].getFile(key_2).exists());
- //----------------------------------------------------------------------
- int fileNotFound = 0;
- try {
- assertTrue(managers[0].getNumberOfReferencesByKey(key_3) == 0);
- managers[0].getFile(key_3).exists();
- } catch (IOException ex) {
- fileNotFound++;
- }
- // File not found, was removed correctly by cleanup
- assertEquals(1, fileNotFound);
- // Checking files after cleanup. This files should be re-register
- assertTrue(managers[0].getFile(key_1).exists());
- assertTrue(managers[0].getFile(key_2).exists());
- } finally {
- if (blobClient != null && !blobClient.isClosed()) {
- blobClient.close();
- }
- for (BlobServer bs : blobServers) {
- if (bs != null && !bs.isShutdown()) {
- bs.close();
- }
- }
- for (BlobLibraryCacheManager blcm : managers) {
- if (blcm != null) {
- blcm.shutdown();
- }
- }
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement