Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.apache.nutch.indexer;
- import java.io.ByteArrayInputStream;
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.nio.ByteBuffer;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import org.apache.avro.util.Utf8;
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.gora.mapreduce.GoraMapper;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.http.HttpException;
- import org.apache.http.HttpHost;
- import org.apache.http.HttpRequest;
- import org.apache.http.HttpRequestInterceptor;
- import org.apache.http.auth.AuthScope;
- import org.apache.http.auth.AuthState;
- import org.apache.http.auth.Credentials;
- import org.apache.http.auth.UsernamePasswordCredentials;
- import org.apache.http.client.CredentialsProvider;
- import org.apache.http.client.protocol.ClientContext;
- import org.apache.http.impl.auth.BasicScheme;
- import org.apache.http.impl.client.DefaultHttpClient;
- import org.apache.http.impl.conn.PoolingClientConnectionManager;
- import org.apache.http.impl.conn.SchemeRegistryFactory;
- import org.apache.http.protocol.ExecutionContext;
- import org.apache.http.protocol.HttpContext;
- import org.apache.nutch.indexer.solr.SolrConstants;
- import org.apache.nutch.metadata.Nutch;
- import org.apache.nutch.storage.StorageUtils;
- import org.apache.nutch.storage.WebPage;
- import org.apache.nutch.util.NutchConfiguration;
- import org.apache.nutch.util.NutchJob;
- import org.apache.nutch.util.TableUtil;
- import org.apache.nutch.util.ToolUtil;
- import org.apache.solr.client.solrj.SolrServer;
- import org.apache.solr.client.solrj.SolrServerException;
- import org.apache.solr.common.SolrInputDocument;
- import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
- import org.apache.solr.client.solrj.impl.HttpSolrServer;
- public class MediaSolrIndexerJob extends IndexerJob {
- private static Log LOG = LogFactory.getLog(MediaSolrIndexerJob.class);
- private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
- static {
- FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
- }
- public static class SolrMediaIndexerJobMapper extends GoraMapper<String, WebPage, Text, NutchDocument> {
- private Utf8 batchId;
- @Override
- public void setup(Context ctx) throws IOException {
- Configuration conf = ctx.getConfiguration();
- batchId = new Utf8(conf.get(Nutch.ARG_BATCH));
- }
- @Override
- public void map(String key, WebPage page, Context ctx) throws IOException, InterruptedException {
- if (page.getBaseUrl() != null) {
- System.out.println("FETCH TIMEEE: " + page.getFetchTime());
- System.out.println(page.getBaseUrl()
- + "=========================================================================");
- String url = TableUtil.unreverseUrl(key);
- Map<Utf8, ByteBuffer> metadata = page.getMetadata();
- for (String metamanagerKey : MCP.getAllMetadataKeys()) {
- Utf8 utf8Key = new Utf8(metamanagerKey);
- if (metadata.get(utf8Key) == null) {
- continue;
- }
- ObjectInputStream ois = null;
- try {
- ois = new ObjectInputStream(new ByteArrayInputStream(metadata.get(utf8Key).array()));
- MetaReader metaReader = new CommonMetaReader();//TODO static, constructor or setup
- List<NutchDocument> documents = metaReader.prepareNutchDocuments(ois, page);
- for (NutchDocument doc : documents) {
- writeInContext(ctx, doc);
- }
- } catch (IOException exc) {
- exc.printStackTrace();
- } finally {
- try {
- ois.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- System.out.println("--------------------------------------------------------------");
- }
- }
- }
- /**
- * Writes media document to context. If there is no id, then throw away
- * it. Add also neccessary fields (see addFieldKeysComplementToDoc ) and
- * print info.
- *
- * @param ctx
- * @param doc
- * @throws InterruptedException
- * @throws IOException
- */
- private void writeInContext(Context ctx, NutchDocument doc) throws IOException, InterruptedException {
- if (doc.getFieldValue(MCP.ID_FIELD) == null) {
- System.out.println("WARNING: ID field is null!!!!!Should never be here");
- return;
- }
- System.out.println("");
- System.out.println("Adding to context ----------------------------------------------------------");
- addFieldKeysComplementToDoc(doc);
- System.out.println("TYPE: " + doc.getFieldValue(MCP.MEDIA_TYPE_FIELD) + " ID: "
- + doc.getFieldValue(MCP.ID_FIELD));
- //addFieldKeysComplementToDoc(doc);
- System.out.println("**************************************** SOLRIK *********************");
- System.out.println("MCP FIELD VALUE: " + doc.getFieldValue(MCP.ID_FIELD));
- System.out.println("SCORE: " + doc.getScore());
- System.out.println("DOC META: " + doc.getDocumentMeta());
- System.out.println("**************************************** SOLRIK *********************");
- ctx.write(new Text(doc.getFieldValue(MCP.ID_FIELD)), doc);
- }
- /**
- * This is hack for bad indexing. Bug is in indexing to solr. If there
- * is a document with more fields than document that is commited after
- * it, values of forerunner override values of follower. So
- * unfortunately every document has to have every fields, even when
- * empty.
- *
- * @param doc
- */
- private void addFieldKeysComplementToDoc(NutchDocument doc) {
- for (String key : MCP.getAllSolrSchemaFields()) {
- String fieldValue = doc.getFieldValue(key);
- if (fieldValue != null)
- continue;
- if (key.equals("parse_time_unix_timestamp") && (fieldValue == null || fieldValue.equals(""))) {
- doc.add(key, "1111111111");
- }
- if (key.equals("parse_time") && (fieldValue == null || fieldValue.equals(""))) {
- doc.add(key, "2042-09-21T18:45:19.344Z");
- }
- doc.add(key, "");
- }
- }
- }
- public static class SolrMediaIndexerJobReducer extends Reducer<Text, NutchDocument, Text, NutchDocument> {
- private int commitSize;
- private SolrServer server;
- private final List<SolrInputDocument> sdocs = new ArrayList<SolrInputDocument>();
- @Override
- public void setup(Context ctx) throws IOException {
- Configuration conf = ctx.getConfiguration();
- PoolingClientConnectionManager cxMgr = new PoolingClientConnectionManager(
- SchemeRegistryFactory.createDefault());
- cxMgr.setMaxTotal(100);
- cxMgr.setDefaultMaxPerRoute(20);
- DefaultHttpClient httpclient = new DefaultHttpClient(cxMgr);
- httpclient.addRequestInterceptor(new PreemptiveAuthInterceptor(), 0);
- httpclient.getCredentialsProvider().setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials("solr",
- "heslo"));
- this.server = new HttpSolrServer(conf.get(Nutch.ARG_SOLR),
- httpclient);
- //this.server = new HttpSolrServer(conf.get(Nutch.ARG_SOLR));
- this.commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
- }
- @Override
- public void reduce(Text key, Iterable<NutchDocument> values, Context ctx) throws IOException,
- InterruptedException {
- for (NutchDocument doc : values) {
- SolrInputDocument sdoc = new SolrInputDocument();
- for (String fieldname : doc.getFieldNames()) {
- sdoc.addField(fieldname, doc.getFieldValue(fieldname));
- }
- sdocs.add(sdoc);
- if (sdocs.size() >= 50) {
- System.out.println("****************************************************");
- System.out.println("****************************************************");
- System.out.println("****************************************************");
- System.out.println("***************M A X S I Z E***************");
- System.out.println("****************************************************");
- System.out.println("****************************************************");
- System.out.println("****************************************************");
- try {
- server.add(sdocs);
- server.commit();
- } catch (SolrServerException e) {
- throw new IOException(e);
- }
- sdocs.clear();
- sdoc = null;
- doc = null;
- }
- }
- }
- @Override
- public void cleanup(Context ctx) throws IOException {
- try {
- if (sdocs.size() > 0) {
- try {
- server.add(sdocs);
- } catch (SolrServerException e) {
- throw new IOException(e);
- }
- sdocs.clear();
- }
- server.commit();
- } catch (SolrServerException e) {
- throw new IOException(e);
- }
- }
- private class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
- @Override
- public void process(final HttpRequest request, final HttpContext context)
- throws HttpException, IOException {
- AuthState authState = (AuthState) context
- .getAttribute(ClientContext.TARGET_AUTH_STATE);
- // If no auth scheme avaialble yet, try to initialize it
- // preemptively
- if (authState.getAuthScheme() == null) {
- CredentialsProvider credsProvider = (CredentialsProvider) context
- .getAttribute(ClientContext.CREDS_PROVIDER);
- HttpHost targetHost = (HttpHost) context
- .getAttribute(ExecutionContext.HTTP_TARGET_HOST);
- Credentials creds = credsProvider.getCredentials(new AuthScope(
- targetHost.getHostName(), targetHost.getPort()));
- if (creds == null)
- throw new HttpException(
- "No credentials for preemptive authentication");
- authState.setAuthScheme(new BasicScheme());
- authState.setCredentials(creds);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 4) {
- System.err.println("Usage: SolrImageIndexerJob <solr url> " + "(<batch_id> | -all)" + "(-crawlId <crawl_id>)");
- return -1;
- }
- LOG.info("SolrImageIndexerJob: starting");
- run(ToolUtil.toArgMap(Nutch.ARG_SOLR, args[0], Nutch.ARG_BATCH, args[1], Nutch.ARG_CRAWL, args[3]));
- LOG.info("SolrImageIndexerJob: success");
- return 0;
- }
- @Override
- public Map<String, Object> run(Map<String, Object> args) throws Exception {
- String solrUrl = (String) args.get(Nutch.ARG_SOLR);
- if (StringUtils.isNotEmpty(solrUrl)) {
- getConf().set(Nutch.ARG_SOLR, solrUrl);
- }
- String batchId = (String) args.get(Nutch.ARG_BATCH);
- if (StringUtils.isNotEmpty(batchId)) {
- getConf().set(Nutch.ARG_BATCH, batchId);
- }
- String crawlId = (String) args.get(Nutch.ARG_CRAWL);
- if (StringUtils.isNotEmpty(crawlId)) {
- getConf().set(Nutch.ARG_CRAWL, crawlId);
- }
- LOG.info("SolrUrl: " + solrUrl);
- LOG.info("batchId: " + batchId);
- LOG.info("crawlId: " + crawlId);
- currentJob = new NutchJob(getConf(), "solr-image-index");
- LOG.info("job start");
- StorageUtils
- .initMapperJob(currentJob, FIELDS, Text.class, NutchDocument.class, SolrMediaIndexerJobMapper.class);
- LOG.info("job initiated");
- currentJob.setMapOutputKeyClass(Text.class);
- currentJob.setMapOutputValueClass(NutchDocument.class);
- currentJob.setReducerClass(SolrMediaIndexerJobReducer.class);
- currentJob.setNumReduceTasks(5);
- currentJob.waitForCompletion(true);
- ToolUtil.recordJobStatus(null, currentJob, results);
- return results;
- }
- public static void main(String[] args) throws Exception {
- final int res = ToolRunner.run(NutchConfiguration.create(), new MediaSolrIndexerJob(), args);
- System.exit(res);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement