Advertisement
Guest User

Untitled

a guest
Dec 12th, 2019
162
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.19 KB | None | 0 0
  1. package ru.mail.go.hydra.fastbase.init;
  2.  
  3. import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaSparkContext;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import ru.mail.go.corefan.kafka.exporter.KafkaExporter;
  12. import ru.mail.go.corefan.kafka.exporter.impl.KafkaExporterImpl;
  13. import ru.mail.go.corefan.spark.config.SparkConfBuilder;
  14. import ru.mail.go.corefan.spark.counters.SparkCounters;
  15. import ru.mail.go.corefan.spark.streaming.context.KafkaConfig;
  16. import ru.mail.go.webbase.blobs.ExtractedUrlsBlob;
  17. import ru.mail.go.webbase.blobs.FetchList;
  18. import ru.mail.go.webbase.utils.MRUtils;
  19. import ru.mail.go.webbase.utils.nut.IUrl;
  20. import ru.mail.go.webbase.utils.nut.IUrlFactoryProvider;
  21. import ru.mail.go.webbase.utils.nut.NUTException;
  22.  
  23. import java.util.Iterator;
  24. import java.util.Map;
  25. import java.util.Properties;
  26. import java.util.concurrent.atomic.AtomicReference;
  27.  
  28. public class IfbSputnikCopy {
  29.     private static final Logger LOG = LoggerFactory.getLogger(IfbSputnikCopy.class);
  30.  
  31.     private static final MRUtils.LongProperty kafkaRateProperty = new MRUtils.LongProperty("spark.streaming.kafka.maxRatePerPartition", 20000);
  32.     private static final MRUtils.StringProperty inputFileProperty = new MRUtils.StringProperty("input.file", "");
  33.     private static final String outputTopicName = "ifb-sputnik-urls";
  34.  
  35.     public static void main(String[] args) {
  36.         int status = new IfbSputnikCopy().run(args);
  37.         System.exit(status);
  38.     }
  39.  
  40.     private int run(String[] args) {
  41.         SparkConf sparkConf = initSparkConfiguration(args);
  42.         String inputFile = sparkConf.get(inputFileProperty.name, inputFileProperty.defaultValue);
  43.  
  44.         Map<String, String> kafkaParams = new KafkaConfig(sparkConf).getKafkaParams();
  45.         Properties properties = new Properties();
  46.         properties.putAll(kafkaParams);
  47.  
  48.         AtomicReference<SparkCounters> counters = new AtomicReference<>();
  49.         JavaSparkContext sc = new JavaSparkContext(sparkConf);
  50.         JavaPairRDD<Text, ProtobufWritable> pairRDD = sc.sequenceFile(inputFile, Text.class, ProtobufWritable.class);
  51.  
  52.         pairRDD
  53.                 .filter(tuple -> {
  54.                     ProtobufWritable<ExtractedUrlsBlob.ExtractedUrl> proto = tuple._2();
  55.                     proto.setConverter(ExtractedUrlsBlob.ExtractedUrl.class);
  56.                     return proto.get().getTargetBasesList().contains(FetchList.FetchUrl.Fastbases.INFO_BASE);
  57.                 })
  58.                 .keys()
  59.                 .foreachPartition(records -> exportRecords(records, properties));
  60.  
  61.         return 0;
  62.     }
  63.  
  64.     private SparkConf initSparkConfiguration(String[] args) {
  65.         SparkConfBuilder builder = SparkConfBuilder.create().loadCommandLine(args);
  66.         SparkConf sparkConf = builder.build();
  67.         sparkConf.setAppName("[FB] ifb-urls converter");
  68.  
  69.         sparkConf.set(kafkaRateProperty.name, Long.toString(kafkaRateProperty.defaultValue));
  70.         LOG.info(sparkConf.toDebugString());
  71.         return sparkConf;
  72.     }
  73.  
  74.     private void exportRecords(Iterator<Text> records, Properties properties) {
  75.         KafkaExporter<String> exporter = createKafkaExporter(properties);
  76.         ThreadLocal<IUrl> nut = IUrlFactoryProvider.newThreadLocalUrl();
  77.  
  78.         records.forEachRemaining(record -> exportRecord(record.toString(), exporter, nut));
  79.     }
  80.  
  81.     private void exportRecord(String record, KafkaExporter<String> exporter, ThreadLocal<IUrl> nut) {
  82.         try {
  83.             nut.get().assign(record, IUrl.EscapingLevel.PARTIALLY_ESCAPED, true, true);
  84.             nut.get().normalize(true);
  85.             exporter.send(nut.toString());
  86.         } catch (NUTException e) {
  87.             LOG.error("Bad url: " + record);
  88.         } catch (Exception e) {
  89.             LOG.error("Exporter: " + e.getMessage());
  90.         }
  91.     }
  92.  
  93.     private KafkaExporter<String> createKafkaExporter(Properties kafkaParams) {
  94.         return KafkaExporterImpl.create(outputTopicName, StringSerializer.class, kafkaParams);
  95.     }
  96. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement