Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- let dumpToDatabase databaseName =
- //opens databse connection
- fun tweet -> inserts tweet in database
- type Agent<'T> = MailboxProcessor<'T>
- let agentDump =
- Agent.Start(fun (inbox: MailboxProcessor<string>) ->
- async{
- use w2 = new StreamWriter(@"Errors.txt")
- let dumpError =fun (error:string) -> w2.WriteLine( error )
- let dumpTweet = dumpToDatabase "stream"
- while true do
- let! msg = inbox.Receive()
- try
- let tw = decode msg
- dumpTweet tw
- with
- | :? MySql.Data.MySqlClient.MySqlException as ex ->
- dumpError (msg+ex.ToString() )
- | _ as ex -> ()
- }
- )
- let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
- let parameters = "track=RT&"
- let stream_url = filter_url
- let stream = twitterStream MyCredentials stream_url parameters
- while true do
- agentDump.Post(stream.ReadLine())
- let dumpToDatabase (tweets:tweet list)=
- bulk insert of tweets in database
- let agentProcessor =
- Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
- async{
- while true do
- let! msg = inbox.Receive()
- try
- msg
- |> List.map(decode)
- |> dumpToDatabase
- with
- | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
- }
- )
- let agentDump =
- Agent.Start(fun (inbox: MailboxProcessor<string>) ->
- let rec loop messageList count = async{
- try
- let! newMsg = inbox.Receive()
- let newMsgList = newMsg::messageList
- if count = 10 then
- agentProcessor.Post( newMsgList )
- return! loop [] 0
- else
- return! loop newMsgList (count+1)
- with
- | _ as ex -> Console.WriteLine("Dump "+ex.ToString())
- }
- loop [] 0)
- let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
- let parameters = "track=RT&"
- let stream_url = filter_url
- let stream = twitterStream MyCredentials stream_url parameters
- while true do
- agentDump.Post(stream.ReadLine())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement