Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.mail.go.hydra.fastbase.init;
- import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import ru.mail.go.corefan.kafka.exporter.KafkaExporter;
- import ru.mail.go.corefan.kafka.exporter.impl.KafkaExporterImpl;
- import ru.mail.go.corefan.spark.config.SparkConfBuilder;
- import ru.mail.go.corefan.spark.counters.SparkCounters;
- import ru.mail.go.corefan.spark.streaming.context.KafkaConfig;
- import ru.mail.go.webbase.blobs.ExtractedUrlsBlob;
- import ru.mail.go.webbase.blobs.FetchList;
- import ru.mail.go.webbase.utils.MRUtils;
- import ru.mail.go.webbase.utils.nut.IUrl;
- import ru.mail.go.webbase.utils.nut.IUrlFactoryProvider;
- import ru.mail.go.webbase.utils.nut.NUTException;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.Properties;
- import java.util.concurrent.atomic.AtomicReference;
- public class IfbSputnikCopy {
- private static final Logger LOG = LoggerFactory.getLogger(IfbSputnikCopy.class);
- private static final MRUtils.LongProperty kafkaRateProperty = new MRUtils.LongProperty("spark.streaming.kafka.maxRatePerPartition", 20000);
- private static final MRUtils.StringProperty inputFileProperty = new MRUtils.StringProperty("input.file", "");
- private static final String outputTopicName = "ifb-sputnik-urls";
- public static void main(String[] args) {
- int status = new IfbSputnikCopy().run(args);
- System.exit(status);
- }
- private int run(String[] args) {
- SparkConf sparkConf = initSparkConfiguration(args);
- String inputFile = sparkConf.get(inputFileProperty.name, inputFileProperty.defaultValue);
- Map<String, String> kafkaParams = new KafkaConfig(sparkConf).getKafkaParams();
- Properties properties = new Properties();
- properties.putAll(kafkaParams);
- AtomicReference<SparkCounters> counters = new AtomicReference<>();
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
- JavaPairRDD<Text, ProtobufWritable> pairRDD = sc.sequenceFile(inputFile, Text.class, ProtobufWritable.class);
- pairRDD
- .filter(tuple -> {
- ProtobufWritable<ExtractedUrlsBlob.ExtractedUrl> proto = tuple._2();
- proto.setConverter(ExtractedUrlsBlob.ExtractedUrl.class);
- return proto.get().getTargetBasesList().contains(FetchList.FetchUrl.Fastbases.INFO_BASE);
- })
- .keys()
- .foreachPartition(records -> exportRecords(records, properties));
- return 0;
- }
- private SparkConf initSparkConfiguration(String[] args) {
- SparkConfBuilder builder = SparkConfBuilder.create().loadCommandLine(args);
- SparkConf sparkConf = builder.build();
- sparkConf.setAppName("[FB] ifb-urls converter");
- sparkConf.set(kafkaRateProperty.name, Long.toString(kafkaRateProperty.defaultValue));
- LOG.info(sparkConf.toDebugString());
- return sparkConf;
- }
- private void exportRecords(Iterator<Text> records, Properties properties) {
- KafkaExporter<String> exporter = createKafkaExporter(properties);
- ThreadLocal<IUrl> nut = IUrlFactoryProvider.newThreadLocalUrl();
- records.forEachRemaining(record -> exportRecord(record.toString(), exporter, nut));
- }
- private void exportRecord(String record, KafkaExporter<String> exporter, ThreadLocal<IUrl> nut) {
- try {
- nut.get().assign(record, IUrl.EscapingLevel.PARTIALLY_ESCAPED, true, true);
- nut.get().normalize(true);
- exporter.send(nut.toString());
- } catch (NUTException e) {
- LOG.error("Bad url: " + record);
- } catch (Exception e) {
- LOG.error("Exporter: " + e.getMessage());
- }
- }
- private KafkaExporter<String> createKafkaExporter(Properties kafkaParams) {
- return KafkaExporterImpl.create(outputTopicName, StringSerializer.class, kafkaParams);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement