Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* SimpleApp.java */
- import org.apache.spark.api.java.*;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
- import java.util.*;
- import com.google.common.collect.ImmutableMap;
- import com.google.common.collect.ForwardingMap;
- import com.google.common.collect.ImmutableList;
- public class SimpleApp {
- public static int nblines;
- public static JavaSparkContext sc;
- public static List<Map<String, ?>> alldocs;
- public static void main(String[] args) {
- String logFile = "testCase/logements_sociaux_finances_a_paris.csv"; // Should be some file on your system
- SparkConf conf = new SparkConf();
- conf.setAppName("Simple Application");
- conf.set("es.nodes", "localhost");
- conf.set("es.nodes", "127.0.0.1");
- conf.set("es.index.auto.create", "true");
- conf.set("es.port", "9200");
- conf.set("es.resource", "index/logements");
- sc = new JavaSparkContext(conf);
- JavaRDD<String> logData = sc.textFile(logFile).cache();
- alldocs = new ArrayList<Map<String,?>>();
- logData.foreach(new VoidFunction<String>(){ public void call(String line) {
- if (line != null) {
- String[] fields = line.split(";");
- try{
- if (fields[15] != null){
- //field 0 = type logement
- //field 1 = annee agrément
- //field 2 = bailleur
- //field 3 = nombre_total_logement financés
- //field 4 = dont nombre de logement plus
- //field 5 = dont nombre de logement plus cd
- //field 6 = dont nombre de logement pla i
- //filed 7 = dont nombre de logement pls
- //field 8 = mode de realisation
- //field 9 = commentaires
- //field 10 = code postal
- //field 11 = rue
- //field 15 = XY (geo)
- String type = fields[0];
- int annee_agr = Integer.parseInt(fields[1]);
- Map<String, ?> logements = ImmutableMap.of("type", type, "annee_agrement", annee_agr);
- SimpleApp.alldocs.add(logements);
- }
- } catch ( Exception e) {
- }
- }
- }});
- /*
- for (Map<String, ?> i: SimpleApp.alldocs){
- for (Map.Entry<String, ?> entry : i.entrySet()){
- System.out.println(entry.getKey() + "/" + entry.getValue());
- }
- }
- */
- JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(alldocs);
- System.out.println(javaRDD.count());
- System.out.println("END RDD");
- //javaRDD.saveAsTextFile("spark-output");
- JavaEsSpark.saveToEs(javaRDD, "index/logements");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement