Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class ResultSource[T](dataSource: DataSource, sql: String, prepare: PreparedStatement => PreparedStatement,
- extractor: ResultSet => T) extends GraphStage[SourceShape[T]] {
- private val out: Outlet[T] = Outlet("EnvisiaResultSource.out")
- override val shape: SourceShape[T] = SourceShape.of(out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
- private var connection: Connection = null
- private var preparedStatement: PreparedStatement = null
- private var resultSet: ResultSet = null
- override def preStart(): Unit = {
- try {
- connection = dataSource.getConnection
- preparedStatement = prepare(connection.prepareStatement(sql))
- resultSet = preparedStatement.executeQuery()
- if (!resultSet.next()) {
- completeStage()
- }
- } catch {
- case e: Exception => fail(out, e)
- }
- }
- override def onPull(): Unit = {
- if (resultSet != null) {
- try {
- push(out, extractor(resultSet))
- } catch {
- case NonFatal(e) => fail(out, e)
- }
- // if there is no next result we just fail
- if (!resultSet.next()) {
- completeStage()
- }
- } else {
- fail(out, new Exception("ResultSet can't be null at this point"))
- }
- }
- private def release() = {
- if (resultSet != null) {
- resultSet.close()
- }
- resultSet = null
- if (preparedStatement != null) {
- preparedStatement.close()
- }
- preparedStatement = null
- if (connection != null) {
- connection.close()
- }
- connection = null
- }
- override def onDownstreamFinish() = {
- release()
- }
- override def postStop() = {
- release()
- }
- setHandler(out, this)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement