s243a

ArchiveHandlerImpl.java (freenet.client)

Oct 22nd, 2014
341
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 9.77 KB | None | 0 0
  1. //s243a pearltree node: http://www.pearltrees.com/s243a/archivehandlerimpl-java/id12827529
  2. package freenet.client;
  3.  
  4. import java.io.IOException;
  5.  
  6. import com.db4o.ObjectContainer;
  7. import com.db4o.ObjectSet;
  8. import com.db4o.query.Predicate;
  9.  
  10. import freenet.client.ArchiveManager.ARCHIVE_TYPE;
  11. import freenet.client.async.ClientContext;
  12. import freenet.client.async.DBJob;
  13. import freenet.client.async.DatabaseDisabledException;
  14. import freenet.keys.FreenetURI;
  15. import freenet.support.Logger;
  16. import freenet.support.api.Bucket;
  17. import freenet.support.api.BucketFactory;
  18. import freenet.support.compress.Compressor.COMPRESSOR_TYPE;
  19. import freenet.support.io.BucketTools;
  20. import freenet.support.io.NativeThread;
  21.  
  22. // WARNING: THIS CLASS IS STORED IN DB4O -- THINK TWICE BEFORE ADD/REMOVE/RENAME FIELDS
  23. class ArchiveHandlerImpl implements ArchiveHandler {
  24.  
  25.     private static volatile boolean logMINOR;
  26.  
  27.     static {
  28.         Logger.registerClass(ArchiveHandlerImpl.class);
  29.     }
  30.  
  31.     private final FreenetURI key;
  32.     private boolean forceRefetchArchive;
  33.     ARCHIVE_TYPE archiveType;
  34.     COMPRESSOR_TYPE compressorType;
  35.  
  36.     ArchiveHandlerImpl(FreenetURI key, ARCHIVE_TYPE archiveType, COMPRESSOR_TYPE ctype, boolean forceRefetchArchive) {
  37.         this.key = key;
  38.         this.archiveType = archiveType;
  39.         this.compressorType = ctype;
  40.         this.forceRefetchArchive = forceRefetchArchive;
  41.     }
  42.  
  43.     @Override
  44.     public Bucket get(String internalName, ArchiveContext archiveContext,
  45.             ArchiveManager manager, ObjectContainer container)
  46.             throws ArchiveFailureException, ArchiveRestartException,
  47.             MetadataParseException, FetchException {
  48.  
  49.         if(forceRefetchArchive) return null;
  50.  
  51.         Bucket data;
  52.  
  53.         // Fetch from cache
  54.         if(logMINOR)
  55.             Logger.minor(this, "Checking cache: "+key+ ' ' +internalName);
  56.         if((data = manager.getCached(key, internalName)) != null) {
  57.             return data;
  58.         }
  59.  
  60.         return null;
  61.     }
  62.  
  63.     @Override
  64.     public Bucket getMetadata(ArchiveContext archiveContext,
  65.             ArchiveManager manager, ObjectContainer container) throws ArchiveFailureException,
  66.             ArchiveRestartException, MetadataParseException, FetchException {
  67.         return get(".metadata", archiveContext, manager, container);
  68.     }
  69.  
  70.     @Override
  71.     public void extractToCache(Bucket bucket, ArchiveContext actx,
  72.             String element, ArchiveExtractCallback callback,
  73.             ArchiveManager manager, ObjectContainer container, ClientContext context) throws ArchiveFailureException,
  74.             ArchiveRestartException {
  75.         forceRefetchArchive = false; // now we don't need to force refetch any more
  76.         ArchiveStoreContext ctx = manager.makeContext(key, archiveType, compressorType, false);
  77.         manager.extractToCache(key, archiveType, compressorType, bucket, actx, ctx, element, callback, container, context);
  78.     }
  79.  
  80.     @Override
  81.     public ARCHIVE_TYPE getArchiveType() {
  82.         return archiveType;
  83.     }
  84.  
  85.     public COMPRESSOR_TYPE getCompressorType() {
  86.         return compressorType;
  87.     }
  88.  
  89.     @Override
  90.     public FreenetURI getKey() {
  91.         return key;
  92.     }
  93.  
  94.     /**
  95.      * Unpack a fetched archive on a separate thread for a persistent caller.
  96.      * This involves:
  97.      * - Add a tag to the database so that it will be restarted on a crash.
  98.      * - Run the actual unpack on a separate thread.
  99.      * - Copy the data to a persistent bucket.
  100.      * - Schedule a database job.
  101.      * - Call the callback.
  102.      * @param bucket
  103.      * @param actx
  104.      * @param element
  105.      * @param callback
  106.      * @param container
  107.      * @param context
  108.      */
  109.     @Override
  110.     public void extractPersistentOffThread(Bucket bucket, boolean freeBucket, ArchiveContext actx, String element, ArchiveExtractCallback callback, ObjectContainer container, final ClientContext context) {
  111.         assert(element != null); // no callback would be called...
  112.         final ArchiveManager manager = context.archiveManager;
  113.         final ArchiveExtractTag tag = new ArchiveExtractTag(this, bucket, freeBucket, actx, element, callback, context.nodeDBHandle);
  114.         container.store(tag);
  115.         runPersistentOffThread(tag, context, manager, context.persistentBucketFactory);
  116.     }
  117.  
  118.     private static void runPersistentOffThread(final ArchiveExtractTag tag, final ClientContext context, final ArchiveManager manager, final BucketFactory bf) {
  119.         final ProxyCallback proxyCallback = new ProxyCallback();
  120.  
  121.         if(logMINOR)
  122.             Logger.minor(ArchiveHandlerImpl.class, "Scheduling off-thread extraction: "+tag.data+" for "+tag.handler.key+" element "+tag.element+" for "+tag.callback, new Exception("debug"));
  123.  
  124.         context.mainExecutor.execute(new Runnable() {
  125.  
  126.             @Override
  127.             public void run() {
  128.                 try {
  129.                     if(logMINOR)
  130.                         Logger.minor(this, "Extracting off-thread: "+tag.data+" for "+tag.handler.key+" element "+tag.element+" for "+tag.callback);
  131.                     tag.handler.extractToCache(tag.data, tag.actx, tag.element, proxyCallback, manager, null, context);
  132.                     if(logMINOR)
  133.                         Logger.minor(this, "Extracted");
  134.                     final Bucket data;
  135.                     if(proxyCallback.data == null)
  136.                         data = null;
  137.                     else {
  138.                         try {
  139.                             if(logMINOR)
  140.                                 Logger.minor(this, "Copying data...");
  141.                             data = bf.makeBucket(proxyCallback.data.size());
  142.                             BucketTools.copy(proxyCallback.data, data);
  143.                             proxyCallback.data.free();
  144.                             if(logMINOR)
  145.                                 Logger.minor(this, "Copied and freed original");
  146.                         } catch (IOException e) {
  147.                             throw new ArchiveFailureException("Failure copying data to persistent storage", e);
  148.                         }
  149.                     }
  150.                     context.jobRunner.queue(new DBJob() {
  151.  
  152.                         @Override
  153.                         public boolean run(ObjectContainer container, ClientContext context) {
  154.                             if(logMINOR)
  155.                                 Logger.minor(this, "Calling callback for "+tag.data+" for "+tag.handler.key+" element "+tag.element+" for "+tag.callback);
  156.                             container.activate(tag.callback, 1);
  157.                             if(proxyCallback.data == null)
  158.                                 tag.callback.notInArchive(container, context);
  159.                             else
  160.                                 tag.callback.gotBucket(data, container, context);
  161.                             tag.callback.removeFrom(container);
  162.                             if(tag.freeBucket) {
  163.                                 tag.data.free();
  164.                                 tag.data.removeFrom(container);
  165.                             }
  166.                             container.deactivate(tag.callback, 1);
  167.                             container.delete(tag);
  168.                             return false;
  169.                         }
  170.  
  171.                     }, NativeThread.NORM_PRIORITY, false);
  172.  
  173.                 } catch (final ArchiveFailureException e) {
  174.  
  175.                     try {
  176.                         context.jobRunner.queue(new DBJob() {
  177.  
  178.                             @Override
  179.                             public boolean run(ObjectContainer container, ClientContext context) {
  180.                                 container.activate(tag.callback, 1);
  181.                                 tag.callback.onFailed(e, container, context);
  182.                                 tag.callback.removeFrom(container);
  183.                                 if(tag.freeBucket) {
  184.                                     tag.data.free();
  185.                                     tag.data.removeFrom(container);
  186.                                 }
  187.                                 container.delete(tag);
  188.                                 return false;
  189.                             }
  190.  
  191.                         }, NativeThread.NORM_PRIORITY, false);
  192.                     } catch (DatabaseDisabledException e1) {
  193.                         Logger.error(this, "Extracting off thread but persistence is disabled");
  194.                     }
  195.  
  196.                 } catch (final ArchiveRestartException e) {
  197.  
  198.                     try {
  199.                         context.jobRunner.queue(new DBJob() {
  200.  
  201.                             @Override
  202.                             public boolean run(ObjectContainer container, ClientContext context) {
  203.                                 container.activate(tag.callback, 1);
  204.                                 tag.callback.onFailed(e, container, context);
  205.                                 tag.callback.removeFrom(container);
  206.                                 if(tag.freeBucket) {
  207.                                     tag.data.free();
  208.                                     tag.data.removeFrom(container);
  209.                                 }
  210.                                 container.delete(tag);
  211.                                 return false;
  212.                             }
  213.  
  214.                         }, NativeThread.NORM_PRIORITY, false);
  215.                     } catch (DatabaseDisabledException e1) {
  216.                         Logger.error(this, "Extracting off thread but persistence is disabled");
  217.                     }
  218.  
  219.                 } catch (DatabaseDisabledException e) {
  220.                     Logger.error(this, "Extracting off thread but persistence is disabled");
  221.                 }
  222.             }
  223.  
  224.         }, "Off-thread extract");
  225.     }
  226.  
  227.     /** Called from ArchiveManager.init() */
  228.     static void init(ObjectContainer container, ClientContext context, final long nodeDBHandle) {
  229.         ObjectSet<ArchiveExtractTag> set = container.query(new Predicate<ArchiveExtractTag>() {
  230.             final private static long serialVersionUID = 5769839072558476040L;
  231.             @Override
  232.             public boolean match(ArchiveExtractTag tag) {
  233.                 return tag.nodeDBHandle == nodeDBHandle;
  234.             }
  235.         });
  236.         while(set.hasNext()) {
  237.             ArchiveExtractTag tag = set.next();
  238.             if(tag.checkBroken(container, context)) continue;
  239.             tag.activateForExecution(container);
  240.             runPersistentOffThread(tag, context, context.archiveManager, context.persistentBucketFactory);
  241.         }
  242.     }
  243.  
  244.     private static class ProxyCallback implements ArchiveExtractCallback {
  245.  
  246.         Bucket data;
  247.  
  248.         @Override
  249.         public void gotBucket(Bucket data, ObjectContainer container, ClientContext context) {
  250.             this.data = data;
  251.         }
  252.  
  253.         @Override
  254.         public void notInArchive(ObjectContainer container, ClientContext context) {
  255.             this.data = null;
  256.         }
  257.  
  258.         @Override
  259.         public void onFailed(ArchiveRestartException e, ObjectContainer container, ClientContext context) {
  260.             // Must not be called.
  261.             throw new UnsupportedOperationException();
  262.         }
  263.  
  264.         @Override
  265.         public void onFailed(ArchiveFailureException e, ObjectContainer container, ClientContext context) {
  266.             // Must not be called.
  267.             throw new UnsupportedOperationException();
  268.         }
  269.  
  270.         @Override
  271.         public void removeFrom(ObjectContainer container) {
  272.             container.delete(this);
  273.         }
  274.  
  275.     }
  276.  
  277.     @Override
  278.     public void activateForExecution(ObjectContainer container) {
  279.         container.activate(this, 1);
  280.         container.activate(key, 5);
  281.     }
  282.  
  283.     @Override
  284.     public ArchiveHandler cloneHandler() {
  285.         return new ArchiveHandlerImpl(key.clone(), archiveType, compressorType, forceRefetchArchive);
  286.     }
  287.  
  288.     @Override
  289.     public void removeFrom(ObjectContainer container) {
  290.         if(key == null) {
  291.             Logger.error(this, "removeFrom() : key = null for "+this+" I exist = "+container.ext().isStored(this)+" I am active: "+container.ext().isActive(this), new Exception("error"));
  292.         } else
  293.             key.removeFrom(container);
  294.         container.delete(this);
  295.     }
  296.  
  297. }
Add Comment
Please, Sign In to add comment