Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using Microsoft.Spark.Sql;
- using Microsoft.Spark.Sql.Streaming;
- using System;
- using System.Collections.Generic;
- using System.Data.SqlClient;
- using System.IO;
- using System.Threading;
- namespace SparkStructuredStreaming
- {
- class Program
- {
- static void SQLWrite(string s)
- {
- string connectionString = "Server=NB-PF1XX6BC\\SQLEXPRESS;Database=SparkDB;Integrated Security=true;";
- using (SqlConnection sqlConnection = new SqlConnection(connectionString))
- {
- sqlConnection.Open();
- string commandString = $"insert into ControlTable values('{s}')";
- using (SqlCommand sqlCommand = new SqlCommand(commandString, sqlConnection))
- {
- sqlCommand.ExecuteNonQuery();
- }
- }
- }
- static void Main()
- {
- // Reading Thread
- new Thread(() =>
- {
- using(FileStream fileStream = new FileStream(@"test.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
- using (StreamReader streamReader = new StreamReader(fileStream))
- {
- while (true)
- {
- Thread.CurrentThread.IsBackground = true;
- string? s = streamReader.ReadLine();
- if(s != null)
- {
- SQLWrite(s);
- }
- }
- }
- }).Start();
- // Create a Spark session
- SparkSession sparkSession = SparkSession
- .Builder()
- .AppName("Spark Structured Streaming")
- .GetOrCreate();
- // Data Frame Definition
- DataFrame dataFrame = sparkSession
- .ReadStream()
- .Format("kafka")
- .Options(new Dictionary<string, string>
- {
- { "kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095" },
- { "subscribe", "spark_structured_streaming_test" }
- })
- .Load();
- // Initial Values Selection
- dataFrame = dataFrame.SelectExpr(new string[]
- {
- "CAST(key as STRING)",
- "CAST(value as STRING)"
- });
- // UDF Definition
- sparkSession.Udf().Register("MYFUNC", new Func<string, string>((s) =>
- {
- using (FileStream fileStream = new FileStream(@"test.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
- using (StreamWriter streamWriter = new StreamWriter(fileStream))
- {
- streamWriter.WriteLine(s + " Modified");
- }
- return s + " Modified";
- }));
- // Fake Transformation
- dataFrame = dataFrame.SelectExpr(new string[]
- {
- "key",
- "value",
- "MYFUNC(value) as ModifiedValue"
- });
- // Streaming Query
- StreamingQuery streamingQuery = dataFrame
- .WriteStream()
- .Format("console")
- .Start();
- // Await Termination
- streamingQuery.AwaitTermination();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement