Advertisement
Guest User

Untitled

a guest
Sep 23rd, 2016
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.30 KB | None | 0 0
  1. /**
  2. * Copyright 2013 Twitter, Inc.
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. * http://www.apache.org/licenses/LICENSE-2.0
  7. * Unless required by applicable law or agreed to in writing, software
  8. * distributed under the License is distributed on an "AS IS" BASIS,
  9. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. * See the License for the specific language governing permissions and
  11. * limitations under the License.
  12. **/
  13.  
  14. //package com.twitter.hbc.example;
  15.  
  16. import com.twitter.hbc.ClientBuilder;
  17. import com.twitter.hbc.core.Constants;
  18. import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
  19. import com.twitter.hbc.core.processor.StringDelimitedProcessor;
  20. import com.twitter.hbc.httpclient.BasicClient;
  21. import com.twitter.hbc.httpclient.auth.Authentication;
  22. import com.twitter.hbc.httpclient.auth.OAuth1;
  23.  
  24. import org.json.*;
  25. import java.sql.*;
  26. import java.text.SimpleDateFormat;
  27. import java.io.File;
  28. import java.io.FileNotFoundException;
  29. import java.io.IOException;
  30. import java.util.ArrayList;
  31. import java.util.Calendar;
  32. import java.util.List;
  33. import java.util.Scanner;
  34. import java.util.concurrent.BlockingQueue;
  35. import java.util.concurrent.LinkedBlockingQueue;
  36. import org.apache.commons.io.*;
  37.  
  38. public class StreamingTweetstoMYSQL {
  39.  
  40. public static void run(String consumerKey, String consumerSecret, String token, String secret)
  41. throws InterruptedException, IOException, JSONException{
  42. // Create an appropriately sized blocking queue
  43. BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
  44.  
  45. // Define our endpoint: By default, delimited=length is set (we need this for our processor)
  46. // and stall warnings are on.
  47. StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
  48.  
  49. //Input of user ids
  50. File input = new File("input.txt");
  51. List<String> lines1 = FileUtils.readLines(input, "utf-8");
  52. List<Long> lines = new ArrayList<Long>();
  53. for(String s : lines1) lines.add(Long.valueOf(s));
  54. endpoint.followings(lines); //connect to endpoint
  55.  
  56. Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);
  57.  
  58. // Create a new BasicClient. By default gzip is enabled.
  59. BasicClient client = new ClientBuilder()
  60. .name("sampleExampleClient")
  61. .hosts(Constants.STREAM_HOST)
  62. .endpoint(endpoint)
  63. .authentication(auth)
  64. .processor(new StringDelimitedProcessor(queue))
  65. .build();
  66.  
  67. // Establish a connection
  68. client.connect();
  69. Boolean rep; Boolean ret; Boolean pl;
  70.  
  71. try
  72. {
  73. // create a mysql database connection
  74. String myDriver = "org.gjt.mm.mysql.Driver";
  75. String myUrl = "jdbc:mysql://localhost:3306/stream?useUnicode=true&characterEncoding=utf-8";
  76. //Class.forName(myDriver);
  77. Connection conn = DriverManager.getConnection(myUrl, "root", "g5hxzy46");
  78.  
  79. // create a sql date object so we can use it in our INSERT statement
  80. //Calendar calendar = Calendar.getInstance();
  81. //java.sql.Date startDate = new java.sql.Date(calendar.getTime().getTime());
  82.  
  83. // the mysql insert statement
  84. String query = "insert into tweets (created_at, twitter_id, text, reply, retweet, place)"
  85. + " values (?, ?, ?, ?, ?, ?)";
  86.  
  87. // Do whatever needs to be done with messages
  88. while(!client.isDone())
  89. {
  90. String msg = queue.take();
  91. //create sql date
  92. //Calendar calendar = Calendar.getInstance();
  93. java.util.Date myDate = new java.util.Date();
  94. String ts = new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date());
  95. //System.out.println(msg);
  96. JSONObject obj = new JSONObject(msg);
  97. if(obj.isNull("delete"))
  98. {
  99. String text = obj.getString("text");
  100. String cr = obj.getString("created_at");
  101. Boolean reply = obj.isNull("in_reply_to_user_id_str"); if(reply){rep = false;}else{rep = true;}
  102. Boolean retweet = obj.isNull("retweeted_status"); if(retweet){ret = false;}else{ret = true;}
  103. Boolean place = obj.isNull("place"); if(place){pl = false;}else{pl = true;}
  104. JSONObject newJSON = obj.getJSONObject("user");
  105. newJSON = new JSONObject(newJSON.toString());
  106. String id= newJSON.getString("id_str");
  107. //If tweet from user?
  108. //if(checkSourceId(id,input)){
  109. //System.out.println(rep+" "+ret+" "+pl+" "+id+" "+text+" "+cr);
  110. // create the mysql insert preparedstatement
  111. PreparedStatement preparedStmt = conn.prepareStatement(query);
  112. preparedStmt.setString (1, id); preparedStmt.setString (2, ts); preparedStmt.setString (3, text);
  113. preparedStmt.setBoolean (4, rep); preparedStmt.setBoolean (5, ret); preparedStmt.setBoolean (6, pl);
  114. // execute the preparedstatement
  115. preparedStmt.execute();
  116.  
  117. }}
  118. //}
  119.  
  120. conn.close();
  121.  
  122. }
  123. catch (SQLException ex)
  124. {
  125. System.err.println("Got an exception!");
  126. System.err.println("SQLException: " + ex.getMessage());
  127. System.err.println("State: " + ex.getSQLState());
  128. System.err.println("Error: " + ex.getErrorCode());
  129.  
  130. }
  131. client.stop();
  132. System.out.printf("The client read %d messages!\n", client.getStatsTracker().getNumMessages());
  133. }
  134.  
  135. public static void main(String[] args) throws IOException {
  136. try {
  137. StreamingTweetstoMYSQL.run("w3jokstVp6cMzaWq7A4GIN3XL", "19RmyUbAoNdinnFN79BJhZwFceBbF0pj9hm6GcM9BPCOUjDpwV",
  138. "164420419-9xnW8333SpyGTTdQUbiZzKgi0YgbzXW6ODl5Ggfu", "gLbg5p8C4tnnddXyfZyjTsqpfDGVm1lEtHdVE36JuBImn");
  139. } catch (InterruptedException e) {
  140. System.out.println(e);
  141. }
  142. }
  143.  
  144. public static Boolean checkSourceId(String word, File file) throws FileNotFoundException {
  145. Boolean check = false; //Method for checking if the tweet was from user (ok) or deleting if mentioning him
  146. Scanner scanner = new Scanner(file);
  147. while (scanner.hasNextLine()) {
  148. String nextToken = scanner.next();
  149. if(nextToken.contains(word))
  150. check=true;
  151. }
  152. scanner.close();
  153. return check;
  154. }
  155.  
  156. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement