Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object SocketReadExample {
- val sparkSession = SparkSession.builder
- .master("local")
- .appName("example")
- .config("spark.driver.bindAddress", "127.0.0.1")
- .getOrCreate()
- //create stream from socket
- val socketStreamDf = sparkSession.readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 50050)
- .load()
- val consoleDataFrameWriter = socketStreamDf.writeStream
- .format("console")
- .outputMode(OutputMode.Append())
- val query = consoleDataFrameWriter.start()
- query.awaitTermination()
- }
- }
- Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Connection
- Current State: INITIALIZING
- Thread State: RUNNABLE
- at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:343)
- at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
- Caused by: java.net.ConnectException: Connection refused
- at java.net.PlainSocketImpl.socketConnect(Native Method)
- at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
- at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
- at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
- at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
- at java.net.Socket.connect(Socket.java:589)
- at java.net.Socket.connect(Socket.java:538)
- at java.net.Socket.<init>(Socket.java:434)
- at java.net.Socket.<init>(Socket.java:211)
Add Comment
Please, Sign In to add comment