Advertisement
Guest User

Untitled

a guest
Dec 5th, 2019
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.56 KB | None | 0 0
  1. using Microsoft.Spark.Sql;
  2. using Microsoft.Spark.Sql.Streaming;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Data.SqlClient;
  6. using System.IO;
  7. using System.Threading;
  8.  
  9. namespace SparkStructuredStreaming
  10. {
  11. class Program
  12. {
  13. static void SQLWrite(string s)
  14. {
  15. string connectionString = "Server=NB-PF1XX6BC\\SQLEXPRESS;Database=SparkDB;Integrated Security=true;";
  16. using (SqlConnection sqlConnection = new SqlConnection(connectionString))
  17. {
  18. sqlConnection.Open();
  19.  
  20. string commandString = $"insert into ControlTable values('{s}')";
  21. using (SqlCommand sqlCommand = new SqlCommand(commandString, sqlConnection))
  22. {
  23. sqlCommand.ExecuteNonQuery();
  24. }
  25. }
  26. }
  27.  
  28. static void Main()
  29. {
  30. // Reading Thread
  31. new Thread(() =>
  32. {
  33. using(FileStream fileStream = new FileStream(@"test.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
  34. using (StreamReader streamReader = new StreamReader(fileStream))
  35. {
  36. while (true)
  37. {
  38. Thread.CurrentThread.IsBackground = true;
  39.  
  40. string? s = streamReader.ReadLine();
  41. if(s != null)
  42. {
  43. SQLWrite(s);
  44. }
  45. }
  46. }
  47. }).Start();
  48.  
  49. // Create a Spark session
  50. SparkSession sparkSession = SparkSession
  51. .Builder()
  52. .AppName("Spark Structured Streaming")
  53. .GetOrCreate();
  54.  
  55. // Data Frame Definition
  56. DataFrame dataFrame = sparkSession
  57. .ReadStream()
  58. .Format("kafka")
  59. .Options(new Dictionary<string, string>
  60. {
  61. { "kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095" },
  62. { "subscribe", "spark_structured_streaming_test" }
  63. })
  64. .Load();
  65.  
  66. // Initial Values Selection
  67. dataFrame = dataFrame.SelectExpr(new string[]
  68. {
  69. "CAST(key as STRING)",
  70. "CAST(value as STRING)"
  71. });
  72.  
  73. // UDF Definition
  74. sparkSession.Udf().Register("MYFUNC", new Func<string, string>((s) =>
  75. {
  76. using (FileStream fileStream = new FileStream(@"test.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
  77. using (StreamWriter streamWriter = new StreamWriter(fileStream))
  78. {
  79. streamWriter.WriteLine(s + " Modified");
  80. }
  81.  
  82. return s + " Modified";
  83. }));
  84.  
  85. // Fake Transformation
  86. dataFrame = dataFrame.SelectExpr(new string[]
  87. {
  88. "key",
  89. "value",
  90. "MYFUNC(value) as ModifiedValue"
  91. });
  92.  
  93. // Streaming Query
  94. StreamingQuery streamingQuery = dataFrame
  95. .WriteStream()
  96. .Format("console")
  97. .Start();
  98.  
  99. // Await Termination
  100. streamingQuery.AwaitTermination();
  101. }
  102. }
  103. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement