Advertisement
Guest User

Untitled

a guest
Jun 30th, 2016
411
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.07 KB | None | 0 0
  1. /* SimpleApp.java */
  2. import org.apache.spark.api.java.*;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.function.Function;
  5. import org.apache.spark.api.java.function.VoidFunction;
  6. import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
  7. import java.util.*;
  8. import com.google.common.collect.ImmutableMap;
  9. import com.google.common.collect.ForwardingMap;
  10. import com.google.common.collect.ImmutableList;
  11.  
  12.  
  13. public class SimpleApp {
  14.  
  15.   public static int nblines;
  16.   public static JavaSparkContext sc;
  17.   public static List<Map<String, ?>> alldocs;
  18.  
  19.   public static void main(String[] args) {
  20.     String logFile = "testCase/logements_sociaux_finances_a_paris.csv"; // Should be some file on your system
  21.     SparkConf conf = new SparkConf();
  22.     conf.setAppName("Simple Application");
  23.     conf.set("es.nodes", "localhost");
  24.     conf.set("es.nodes", "127.0.0.1");
  25.     conf.set("es.index.auto.create", "true");
  26.     conf.set("es.port", "9200");
  27.     conf.set("es.resource", "index/logements");
  28.     sc = new JavaSparkContext(conf);
  29.     JavaRDD<String> logData = sc.textFile(logFile).cache();
  30.     alldocs = new ArrayList<Map<String,?>>();
  31.  
  32.     logData.foreach(new VoidFunction<String>(){ public void call(String line) {
  33.           if (line != null) {
  34.                 String[] fields = line.split(";");
  35.                 try{
  36.                         if (fields[15] != null){
  37.                                 //field 0 = type logement
  38.                                 //field 1 = annee agrément
  39.                                 //field 2 = bailleur
  40.                                 //field 3 = nombre_total_logement financés
  41.                                 //field 4 = dont nombre de logement plus
  42.                                 //field 5 = dont nombre de logement plus cd
  43.                                 //field 6 = dont nombre de logement pla i
  44.                                 //filed 7 = dont nombre de logement pls
  45.                                 //field 8 = mode de realisation
  46.                                 //field 9 = commentaires
  47.                                 //field 10 = code postal
  48.                                 //field 11 = rue
  49.                                 //field 15 = XY (geo)
  50.                                 String type = fields[0];
  51.                                 int annee_agr = Integer.parseInt(fields[1]);
  52.                                 Map<String, ?> logements = ImmutableMap.of("type", type, "annee_agrement", annee_agr);
  53.                                 SimpleApp.alldocs.add(logements);
  54.                         }
  55.                 } catch ( Exception e) {
  56.  
  57.                 }
  58.           }
  59.         }});
  60.     /*
  61.     for (Map<String, ?> i: SimpleApp.alldocs){
  62.         for (Map.Entry<String, ?> entry : i.entrySet()){
  63.                 System.out.println(entry.getKey() + "/" + entry.getValue());
  64.         }
  65.     }
  66.     */
  67.     JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(alldocs);
  68.  
  69.     System.out.println(javaRDD.count());
  70.     System.out.println("END RDD");
  71.     //javaRDD.saveAsTextFile("spark-output");
  72.     JavaEsSpark.saveToEs(javaRDD, "index/logements");
  73.  
  74.         }
  75. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement