Advertisement
Guest User

Untitled

a guest
Sep 28th, 2016
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.64 KB | None | 0 0
  1. let dumpToDatabase databaseName =
  2. //opens databse connection
  3. fun tweet -> inserts tweet in database
  4.  
  5. type Agent<'T> = MailboxProcessor<'T>
  6.  
  7.  
  8.  
  9. let agentDump =
  10. Agent.Start(fun (inbox: MailboxProcessor<string>) ->
  11. async{
  12. use w2 = new StreamWriter(@"Errors.txt")
  13. let dumpError =fun (error:string) -> w2.WriteLine( error )
  14. let dumpTweet = dumpToDatabase "stream"
  15. while true do
  16. let! msg = inbox.Receive()
  17. try
  18. let tw = decode msg
  19. dumpTweet tw
  20. with
  21. | :? MySql.Data.MySqlClient.MySqlException as ex ->
  22. dumpError (msg+ex.ToString() )
  23. | _ as ex -> ()
  24.  
  25.  
  26.  
  27. }
  28. )
  29.  
  30. let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
  31. let parameters = "track=RT&"
  32. let stream_url = filter_url
  33.  
  34. let stream = twitterStream MyCredentials stream_url parameters
  35.  
  36.  
  37. while true do
  38. agentDump.Post(stream.ReadLine())
  39.  
  40. let dumpToDatabase (tweets:tweet list)=
  41. bulk insert of tweets in database
  42.  
  43. let agentProcessor =
  44. Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
  45. async{
  46. while true do
  47. let! msg = inbox.Receive()
  48. try
  49. msg
  50. |> List.map(decode)
  51. |> dumpToDatabase
  52. with
  53. | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
  54. }
  55. )
  56.  
  57.  
  58.  
  59. let agentDump =
  60. Agent.Start(fun (inbox: MailboxProcessor<string>) ->
  61. let rec loop messageList count = async{
  62. try
  63. let! newMsg = inbox.Receive()
  64. let newMsgList = newMsg::messageList
  65. if count = 10 then
  66. agentProcessor.Post( newMsgList )
  67. return! loop [] 0
  68. else
  69. return! loop newMsgList (count+1)
  70. with
  71. | _ as ex -> Console.WriteLine("Dump "+ex.ToString())
  72.  
  73. }
  74. loop [] 0)
  75.  
  76. let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
  77. let parameters = "track=RT&"
  78. let stream_url = filter_url
  79.  
  80. let stream = twitterStream MyCredentials stream_url parameters
  81.  
  82.  
  83. while true do
  84. agentDump.Post(stream.ReadLine())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement