Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Copyright 2013 Twitter, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
- //package com.twitter.hbc.example;
- import com.twitter.hbc.ClientBuilder;
- import com.twitter.hbc.core.Constants;
- import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
- import com.twitter.hbc.core.processor.StringDelimitedProcessor;
- import com.twitter.hbc.httpclient.BasicClient;
- import com.twitter.hbc.httpclient.auth.Authentication;
- import com.twitter.hbc.httpclient.auth.OAuth1;
- import org.json.*;
- import java.sql.*;
- import java.text.SimpleDateFormat;
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Calendar;
- import java.util.List;
- import java.util.Scanner;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import org.apache.commons.io.*;
- public class StreamingTweetstoMYSQL {
- public static void run(String consumerKey, String consumerSecret, String token, String secret)
- throws InterruptedException, IOException, JSONException{
- // Create an appropriately sized blocking queue
- BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
- // Define our endpoint: By default, delimited=length is set (we need this for our processor)
- // and stall warnings are on.
- StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
- //Input of user ids
- File input = new File("input.txt");
- List<String> lines1 = FileUtils.readLines(input, "utf-8");
- List<Long> lines = new ArrayList<Long>();
- for(String s : lines1) lines.add(Long.valueOf(s));
- endpoint.followings(lines); //connect to endpoint
- Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);
- // Create a new BasicClient. By default gzip is enabled.
- BasicClient client = new ClientBuilder()
- .name("sampleExampleClient")
- .hosts(Constants.STREAM_HOST)
- .endpoint(endpoint)
- .authentication(auth)
- .processor(new StringDelimitedProcessor(queue))
- .build();
- // Establish a connection
- client.connect();
- Boolean rep; Boolean ret; Boolean pl;
- try
- {
- // create a mysql database connection
- String myDriver = "org.gjt.mm.mysql.Driver";
- String myUrl = "jdbc:mysql://localhost:3306/stream?useUnicode=true&characterEncoding=utf-8";
- //Class.forName(myDriver);
- Connection conn = DriverManager.getConnection(myUrl, "root", "g5hxzy46");
- // create a sql date object so we can use it in our INSERT statement
- //Calendar calendar = Calendar.getInstance();
- //java.sql.Date startDate = new java.sql.Date(calendar.getTime().getTime());
- // the mysql insert statement
- String query = "insert into tweets (created_at, twitter_id, text, reply, retweet, place)"
- + " values (?, ?, ?, ?, ?, ?)";
- // Do whatever needs to be done with messages
- while(!client.isDone())
- {
- String msg = queue.take();
- //create sql date
- //Calendar calendar = Calendar.getInstance();
- java.util.Date myDate = new java.util.Date();
- String ts = new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date());
- //System.out.println(msg);
- JSONObject obj = new JSONObject(msg);
- if(obj.isNull("delete"))
- {
- String text = obj.getString("text");
- String cr = obj.getString("created_at");
- Boolean reply = obj.isNull("in_reply_to_user_id_str"); if(reply){rep = false;}else{rep = true;}
- Boolean retweet = obj.isNull("retweeted_status"); if(retweet){ret = false;}else{ret = true;}
- Boolean place = obj.isNull("place"); if(place){pl = false;}else{pl = true;}
- JSONObject newJSON = obj.getJSONObject("user");
- newJSON = new JSONObject(newJSON.toString());
- String id= newJSON.getString("id_str");
- //If tweet from user?
- //if(checkSourceId(id,input)){
- //System.out.println(rep+" "+ret+" "+pl+" "+id+" "+text+" "+cr);
- // create the mysql insert preparedstatement
- PreparedStatement preparedStmt = conn.prepareStatement(query);
- preparedStmt.setString (1, id); preparedStmt.setString (2, ts); preparedStmt.setString (3, text);
- preparedStmt.setBoolean (4, rep); preparedStmt.setBoolean (5, ret); preparedStmt.setBoolean (6, pl);
- // execute the preparedstatement
- preparedStmt.execute();
- }}
- //}
- conn.close();
- }
- catch (SQLException ex)
- {
- System.err.println("Got an exception!");
- System.err.println("SQLException: " + ex.getMessage());
- System.err.println("State: " + ex.getSQLState());
- System.err.println("Error: " + ex.getErrorCode());
- }
- client.stop();
- System.out.printf("The client read %d messages!\n", client.getStatsTracker().getNumMessages());
- }
- public static void main(String[] args) throws IOException {
- try {
- StreamingTweetstoMYSQL.run("w3jokstVp6cMzaWq7A4GIN3XL", "19RmyUbAoNdinnFN79BJhZwFceBbF0pj9hm6GcM9BPCOUjDpwV",
- "164420419-9xnW8333SpyGTTdQUbiZzKgi0YgbzXW6ODl5Ggfu", "gLbg5p8C4tnnddXyfZyjTsqpfDGVm1lEtHdVE36JuBImn");
- } catch (InterruptedException e) {
- System.out.println(e);
- }
- }
- public static Boolean checkSourceId(String word, File file) throws FileNotFoundException {
- Boolean check = false; //Method for checking if the tweet was from user (ok) or deleting if mentioning him
- Scanner scanner = new Scanner(file);
- while (scanner.hasNextLine()) {
- String nextToken = scanner.next();
- if(nextToken.contains(word))
- check=true;
- }
- scanner.close();
- return check;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement