Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.css.java.gnipStreaming;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.net.URLConnection;
- import java.util.zip.GZIPInputStream;
- import org.apache.spark.storage.StorageLevel;
- import org.apache.spark.streaming.receiver.Receiver;
- import Decoder.BASE64Encoder;
- public class GnipReceiver extends Receiver<String> {
- private static final long serialVersionUID = 1L;
- private String url, username, password;
- private URLConnection gnipConnection;
- public GnipReceiver(String url, String username, String password) {
- super(StorageLevel.MEMORY_AND_DISK_2());
- this.username = username;
- this.password = password;
- this.url = url;
- }
- @Override
- public void onStart() {
- try {
- HttpURLConnection newGnipConnection = getConnection(url, username,
- password);
- InputStream inputStream = newGnipConnection.getInputStream();
- int responseCode = newGnipConnection.getResponseCode();
- if (responseCode >= 200 && responseCode <= 299) {
- setGnipConnection(newGnipConnection);
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(new GZIPInputStream(inputStream),
- "UTF-8"));
- storeData(reader);
- }
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(0);
- restart("Error starting Gnip stream", e);
- }
- }
- private void storeData(BufferedReader reader) {
- try {
- String line;
- while ((line = reader.readLine()) != null) {
- store(line);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void onStop() {
- setGnipConnection(null);
- }
- private HttpURLConnection getConnection(String urlString, String username,
- String password) {
- try {
- URL url = new URL(urlString);
- HttpURLConnection connection;
- connection = (HttpURLConnection) url.openConnection();
- connection.setReadTimeout(1000 * 60 * 60);
- connection.setConnectTimeout(1000 * 20);
- connection.setRequestProperty("Authorization",
- createAuthHeader(username, password));
- connection.setRequestProperty("Accept-Encoding", "gzip");
- return connection;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- private String createAuthHeader(String username, String password) {
- String authToken = username + ":" + password;
- BASE64Encoder encoder = new BASE64Encoder();
- return "Basic " + encoder.encode(authToken.getBytes());
- }
- private synchronized void setGnipConnection(
- HttpURLConnection newGnipConnection) {
- if (gnipConnection != null) {
- try {
- gnipConnection.getInputStream().close();
- gnipConnection = null;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- gnipConnection = newGnipConnection;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement