Advertisement
Guest User

Untitled

a guest
Nov 12th, 2013
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 11.99 KB | None | 0 0
  1. package org.apache.nutch.indexer;
  2.  
  3. import java.io.ByteArrayInputStream;
  4. import java.io.IOException;
  5. import java.io.ObjectInputStream;
  6. import java.nio.ByteBuffer;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.Collection;
  10. import java.util.HashSet;
  11. import java.util.List;
  12. import java.util.Map;
  13.  
  14. import org.apache.avro.util.Utf8;
  15. import org.apache.commons.lang.StringUtils;
  16. import org.apache.commons.logging.Log;
  17. import org.apache.commons.logging.LogFactory;
  18. import org.apache.gora.mapreduce.GoraMapper;
  19. import org.apache.hadoop.conf.Configuration;
  20. import org.apache.hadoop.io.Text;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.util.ToolRunner;
  23. import org.apache.http.HttpException;
  24. import org.apache.http.HttpHost;
  25. import org.apache.http.HttpRequest;
  26. import org.apache.http.HttpRequestInterceptor;
  27. import org.apache.http.auth.AuthScope;
  28. import org.apache.http.auth.AuthState;
  29. import org.apache.http.auth.Credentials;
  30. import org.apache.http.auth.UsernamePasswordCredentials;
  31. import org.apache.http.client.CredentialsProvider;
  32. import org.apache.http.client.protocol.ClientContext;
  33. import org.apache.http.impl.auth.BasicScheme;
  34. import org.apache.http.impl.client.DefaultHttpClient;
  35. import org.apache.http.impl.conn.PoolingClientConnectionManager;
  36. import org.apache.http.impl.conn.SchemeRegistryFactory;
  37. import org.apache.http.protocol.ExecutionContext;
  38. import org.apache.http.protocol.HttpContext;
  39. import org.apache.nutch.indexer.solr.SolrConstants;
  40. import org.apache.nutch.metadata.Nutch;
  41. import org.apache.nutch.storage.StorageUtils;
  42. import org.apache.nutch.storage.WebPage;
  43. import org.apache.nutch.util.NutchConfiguration;
  44. import org.apache.nutch.util.NutchJob;
  45. import org.apache.nutch.util.TableUtil;
  46. import org.apache.nutch.util.ToolUtil;
  47. import org.apache.solr.client.solrj.SolrServer;
  48. import org.apache.solr.client.solrj.SolrServerException;
  49. import org.apache.solr.common.SolrInputDocument;
  50. import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
  51. import org.apache.solr.client.solrj.impl.HttpSolrServer;
  52.  
  53. public class MediaSolrIndexerJob extends IndexerJob {
  54.  
  55.     private static Log LOG = LogFactory.getLog(MediaSolrIndexerJob.class);
  56.  
  57.     private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  58.  
  59.     static {
  60.         FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
  61.     }
  62.  
  63.     public static class SolrMediaIndexerJobMapper extends GoraMapper<String, WebPage, Text, NutchDocument> {
  64.  
  65.         private Utf8 batchId;
  66.  
  67.         @Override
  68.         public void setup(Context ctx) throws IOException {
  69.             Configuration conf = ctx.getConfiguration();
  70.             batchId = new Utf8(conf.get(Nutch.ARG_BATCH));
  71.         }
  72.    
  73.         @Override
  74.         public void map(String key, WebPage page, Context ctx) throws IOException, InterruptedException {
  75.             if (page.getBaseUrl() != null) {
  76.                 System.out.println("FETCH TIMEEE: " + page.getFetchTime());
  77.             System.out.println(page.getBaseUrl()
  78.                     + "=========================================================================");
  79.             String url = TableUtil.unreverseUrl(key);
  80.             Map<Utf8, ByteBuffer> metadata = page.getMetadata();
  81.             for (String metamanagerKey : MCP.getAllMetadataKeys()) {
  82.                 Utf8 utf8Key = new Utf8(metamanagerKey);
  83.                 if (metadata.get(utf8Key) == null) {
  84.                     continue;
  85.                 }
  86.                 ObjectInputStream ois = null;
  87.                 try {
  88.                     ois = new ObjectInputStream(new ByteArrayInputStream(metadata.get(utf8Key).array()));
  89.                     MetaReader metaReader = new CommonMetaReader();//TODO static, constructor or setup
  90.                     List<NutchDocument> documents = metaReader.prepareNutchDocuments(ois, page);
  91.                     for (NutchDocument doc : documents) {
  92.                         writeInContext(ctx, doc);
  93.                     }
  94.                 } catch (IOException exc) {
  95.                     exc.printStackTrace();
  96.                 } finally {
  97.                     try {
  98.                         ois.close();
  99.                     } catch (IOException e) {
  100.                         e.printStackTrace();
  101.                     }
  102.                 }
  103.                 System.out.println("--------------------------------------------------------------");
  104.             }
  105.             }
  106.         }
  107.  
  108.         /**
  109.          * Writes media document to context. If there is no id, then throw away
  110.          * it. Add also neccessary fields (see addFieldKeysComplementToDoc ) and
  111.          * print info.
  112.          *
  113.          * @param ctx
  114.          * @param doc
  115.          * @throws InterruptedException
  116.          * @throws IOException
  117.          */
  118.         private void writeInContext(Context ctx, NutchDocument doc) throws IOException, InterruptedException {
  119.             if (doc.getFieldValue(MCP.ID_FIELD) == null) {
  120.                 System.out.println("WARNING: ID field is null!!!!!Should never be here");
  121.                 return;
  122.             }
  123.             System.out.println("");
  124.             System.out.println("Adding to context ----------------------------------------------------------");
  125.             addFieldKeysComplementToDoc(doc);
  126.             System.out.println("TYPE: " + doc.getFieldValue(MCP.MEDIA_TYPE_FIELD) + " ID: "
  127.                     + doc.getFieldValue(MCP.ID_FIELD));
  128.             //addFieldKeysComplementToDoc(doc);
  129.            
  130.             System.out.println("**************************************** SOLRIK *********************");
  131.             System.out.println("MCP FIELD VALUE: " + doc.getFieldValue(MCP.ID_FIELD));
  132.             System.out.println("SCORE: " + doc.getScore());
  133.             System.out.println("DOC META: " + doc.getDocumentMeta());
  134.             System.out.println("**************************************** SOLRIK *********************");
  135.             ctx.write(new Text(doc.getFieldValue(MCP.ID_FIELD)), doc);
  136.         }
  137.  
  138.         /**
  139.          * This is hack for bad indexing. Bug is in indexing to solr. If there
  140.          * is a document with more fields than document that is commited after
  141.          * it, values of forerunner override values of follower. So
  142.          * unfortunately every document has to have every fields, even when
  143.          * empty.
  144.          *
  145.          * @param doc
  146.          */
  147.         private void addFieldKeysComplementToDoc(NutchDocument doc) {
  148.             for (String key : MCP.getAllSolrSchemaFields()) {
  149.                 String fieldValue = doc.getFieldValue(key);
  150.                 if (fieldValue != null)
  151.                     continue;
  152.                 if (key.equals("parse_time_unix_timestamp") && (fieldValue == null || fieldValue.equals(""))) {
  153.                     doc.add(key, "1111111111");
  154.                 }
  155.                 if (key.equals("parse_time") && (fieldValue == null || fieldValue.equals(""))) {
  156.                     doc.add(key, "2042-09-21T18:45:19.344Z");
  157.                 }              
  158.                 doc.add(key, "");
  159.             }
  160.         }
  161.     }
  162.  
  163.     public static class SolrMediaIndexerJobReducer extends Reducer<Text, NutchDocument, Text, NutchDocument> {
  164.  
  165.         private int commitSize;
  166.         private SolrServer server;
  167.         private final List<SolrInputDocument> sdocs = new ArrayList<SolrInputDocument>();
  168.  
  169.         @Override
  170.         public void setup(Context ctx) throws IOException {
  171.             Configuration conf = ctx.getConfiguration();
  172.            
  173.             PoolingClientConnectionManager cxMgr = new PoolingClientConnectionManager(
  174.                     SchemeRegistryFactory.createDefault());
  175.             cxMgr.setMaxTotal(100);
  176.             cxMgr.setDefaultMaxPerRoute(20);
  177.  
  178.             DefaultHttpClient httpclient = new DefaultHttpClient(cxMgr);
  179.             httpclient.addRequestInterceptor(new PreemptiveAuthInterceptor(), 0);
  180.             httpclient.getCredentialsProvider().setCredentials(
  181.                     AuthScope.ANY,
  182.                     new UsernamePasswordCredentials("solr",
  183.                             "heslo"));
  184.                            
  185.             this.server = new HttpSolrServer(conf.get(Nutch.ARG_SOLR),
  186.                     httpclient);
  187.                    
  188.            
  189.             //this.server = new HttpSolrServer(conf.get(Nutch.ARG_SOLR));          
  190.            
  191.             this.commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
  192.         }
  193.  
  194.         @Override
  195.         public void reduce(Text key, Iterable<NutchDocument> values, Context ctx) throws IOException,
  196.                 InterruptedException {
  197.            
  198.             for (NutchDocument doc : values) {
  199.                 SolrInputDocument sdoc = new SolrInputDocument();
  200.                 for (String fieldname : doc.getFieldNames()) {
  201.                     sdoc.addField(fieldname, doc.getFieldValue(fieldname));
  202.                 }
  203.                 sdocs.add(sdoc);
  204.                 if (sdocs.size() >= 50) {
  205.                     System.out.println("****************************************************");
  206.                     System.out.println("****************************************************");
  207.                     System.out.println("****************************************************");
  208.                     System.out.println("***************M  A  X     S  I  Z  E***************");
  209.                     System.out.println("****************************************************");
  210.                     System.out.println("****************************************************");
  211.                     System.out.println("****************************************************");            
  212.                     try {
  213.                         server.add(sdocs);
  214.                         server.commit();
  215.                     } catch (SolrServerException e) {
  216.                         throw new IOException(e);
  217.                     }
  218.                     sdocs.clear(); 
  219.                     sdoc = null;
  220.                     doc = null;
  221.                 }
  222.             }
  223.         }
  224.  
  225.         @Override
  226.         public void cleanup(Context ctx) throws IOException {
  227.             try {
  228.                 if (sdocs.size() > 0) {
  229.                     try {
  230.                         server.add(sdocs);
  231.                     } catch (SolrServerException e) {
  232.                         throw new IOException(e);
  233.                     }
  234.                     sdocs.clear();
  235.                 }
  236.                 server.commit();
  237.             } catch (SolrServerException e) {
  238.                 throw new IOException(e);
  239.             }
  240.         }
  241.        
  242.         private class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
  243.  
  244.             @Override
  245.             public void process(final HttpRequest request, final HttpContext context)
  246.                     throws HttpException, IOException {
  247.                 AuthState authState = (AuthState) context
  248.                         .getAttribute(ClientContext.TARGET_AUTH_STATE);
  249.  
  250.                 // If no auth scheme avaialble yet, try to initialize it
  251.                 // preemptively
  252.                 if (authState.getAuthScheme() == null) {
  253.                     CredentialsProvider credsProvider = (CredentialsProvider) context
  254.                             .getAttribute(ClientContext.CREDS_PROVIDER);
  255.                     HttpHost targetHost = (HttpHost) context
  256.                             .getAttribute(ExecutionContext.HTTP_TARGET_HOST);
  257.                     Credentials creds = credsProvider.getCredentials(new AuthScope(
  258.                             targetHost.getHostName(), targetHost.getPort()));
  259.                     if (creds == null)
  260.                         throw new HttpException(
  261.                                 "No credentials for preemptive authentication");
  262.                     authState.setAuthScheme(new BasicScheme());
  263.                     authState.setCredentials(creds);
  264.                 }
  265.  
  266.             }
  267.  
  268.         }      
  269.  
  270.     }
  271.  
  272.     @Override
  273.     public int run(String[] args) throws Exception {
  274.         if (args.length < 4) {
  275.             System.err.println("Usage: SolrImageIndexerJob <solr url> " + "(<batch_id> | -all)" + "(-crawlId <crawl_id>)");
  276.             return -1;
  277.         }
  278.         LOG.info("SolrImageIndexerJob: starting");
  279.         run(ToolUtil.toArgMap(Nutch.ARG_SOLR, args[0], Nutch.ARG_BATCH, args[1], Nutch.ARG_CRAWL, args[3]));
  280.         LOG.info("SolrImageIndexerJob: success");
  281.         return 0;
  282.     }
  283.  
  284.     @Override
  285.     public Map<String, Object> run(Map<String, Object> args) throws Exception {
  286.         String solrUrl = (String) args.get(Nutch.ARG_SOLR);
  287.         if (StringUtils.isNotEmpty(solrUrl)) {
  288.             getConf().set(Nutch.ARG_SOLR, solrUrl);
  289.         }
  290.         String batchId = (String) args.get(Nutch.ARG_BATCH);
  291.         if (StringUtils.isNotEmpty(batchId)) {
  292.             getConf().set(Nutch.ARG_BATCH, batchId);
  293.         }
  294.         String crawlId = (String) args.get(Nutch.ARG_CRAWL);
  295.         if (StringUtils.isNotEmpty(crawlId)) {
  296.             getConf().set(Nutch.ARG_CRAWL, crawlId);
  297.         }      
  298.         LOG.info("SolrUrl: " + solrUrl);
  299.         LOG.info("batchId: " + batchId);
  300.         LOG.info("crawlId: " + crawlId);
  301.         currentJob = new NutchJob(getConf(), "solr-image-index");
  302.         LOG.info("job start");
  303.         StorageUtils
  304.                 .initMapperJob(currentJob, FIELDS, Text.class, NutchDocument.class, SolrMediaIndexerJobMapper.class);
  305.         LOG.info("job initiated");
  306.         currentJob.setMapOutputKeyClass(Text.class);
  307.         currentJob.setMapOutputValueClass(NutchDocument.class);
  308.         currentJob.setReducerClass(SolrMediaIndexerJobReducer.class);
  309.         currentJob.setNumReduceTasks(5);
  310.         currentJob.waitForCompletion(true);
  311.         ToolUtil.recordJobStatus(null, currentJob, results);
  312.         return results;
  313.     }
  314.  
  315.     public static void main(String[] args) throws Exception {
  316.         final int res = ToolRunner.run(NutchConfiguration.create(), new MediaSolrIndexerJob(), args);
  317.         System.exit(res);
  318.     }
  319.  
  320. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement