Advertisement
Guest User

Custom Receiver

a guest
Apr 24th, 2016
189
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.83 KB | None | 0 0
  1. package org.css.java.gnipStreaming;
  2.  
  3. import java.io.BufferedReader;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.io.InputStreamReader;
  7. import java.net.HttpURLConnection;
  8. import java.net.URL;
  9. import java.net.URLConnection;
  10. import java.util.zip.GZIPInputStream;
  11.  
  12. import org.apache.spark.storage.StorageLevel;
  13. import org.apache.spark.streaming.receiver.Receiver;
  14.  
  15. import Decoder.BASE64Encoder;
  16.  
  17. public class GnipReceiver extends Receiver<String> {
  18.  
  19.     private static final long serialVersionUID = 1L;
  20.    
  21.     private String url, username, password;
  22.    
  23.     private URLConnection gnipConnection;
  24.  
  25.     public GnipReceiver(String url, String username, String password) {
  26.         super(StorageLevel.MEMORY_AND_DISK_2());
  27.         this.username = username;
  28.         this.password = password;
  29.         this.url = url;
  30.     }
  31.  
  32.    
  33.     @Override
  34.     public void onStart() {
  35.         try {
  36.             HttpURLConnection newGnipConnection = getConnection(url, username,
  37.                     password);
  38.             InputStream inputStream = newGnipConnection.getInputStream();
  39.             int responseCode = newGnipConnection.getResponseCode();
  40.             if (responseCode >= 200 && responseCode <= 299) {
  41.                 setGnipConnection(newGnipConnection);
  42.                 BufferedReader reader = new BufferedReader(
  43.                         new InputStreamReader(new GZIPInputStream(inputStream),
  44.                                 "UTF-8"));
  45.                 storeData(reader);
  46.             }
  47.         } catch (Exception e) {
  48.             e.printStackTrace();
  49.             System.exit(0);
  50.             restart("Error starting Gnip stream", e);
  51.         }
  52.     }
  53.  
  54.     private void storeData(BufferedReader reader) {
  55.         try {
  56.             String line;
  57.             while ((line = reader.readLine()) != null) {
  58.                 store(line);
  59.             }
  60.         } catch (IOException e) {
  61.             e.printStackTrace();
  62.         }
  63.     }
  64.  
  65.     @Override
  66.     public void onStop() {
  67.         setGnipConnection(null);
  68.     }
  69.  
  70.     private HttpURLConnection getConnection(String urlString, String username,
  71.             String password) {
  72.         try {
  73.             URL url = new URL(urlString);
  74.             HttpURLConnection connection;
  75.             connection = (HttpURLConnection) url.openConnection();
  76.             connection.setReadTimeout(1000 * 60 * 60);
  77.             connection.setConnectTimeout(1000 * 20);
  78.             connection.setRequestProperty("Authorization",
  79.                     createAuthHeader(username, password));
  80.             connection.setRequestProperty("Accept-Encoding", "gzip");
  81.             return connection;
  82.         } catch (Exception e) {
  83.             e.printStackTrace();
  84.         }
  85.         return null;
  86.     }
  87.  
  88.     private String createAuthHeader(String username, String password) {
  89.         String authToken = username + ":" + password;
  90.         BASE64Encoder encoder = new BASE64Encoder();
  91.         return "Basic " + encoder.encode(authToken.getBytes());
  92.     }
  93.  
  94.     private synchronized void setGnipConnection(
  95.             HttpURLConnection newGnipConnection) {
  96.         if (gnipConnection != null) {
  97.             try {
  98.                 gnipConnection.getInputStream().close();
  99.                 gnipConnection = null;
  100.             } catch (IOException e) {
  101.                 e.printStackTrace();
  102.             }
  103.         }
  104.         gnipConnection = newGnipConnection;
  105.     }
  106. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement