zakpatterson

Slick Query in stream

Sep 12th, 2014
203
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.10 KB | None | 0 0
  1. /*
  2.  * Assume we have a table (TableQuery) called `bigDataTable` correctly defined including a blob type.
  3.  * Under postgres, it's implemented using the lo extension, and
  4.  * interactions with this table are only supported using transactions.
  5.  *
  6.  * Also assume we have a table called `modelDataTable` which has a blob type as well, so inserts are only done using
  7.  *
  8.  */
  9.  
  10. /**
  11.  * returns a stream of rows from the table.  Table contains way more data
  12.  * than can be in memory.  DataAccess defines a database method which returns the database connection we're using,
  13.  * and makes the right profile available.
  14.  */
  15. trait BigDataTableQL { this : DataAccess =>
  16.     import profile.simple._
  17.     import profile.Implicits
  18.  
  19.     def getSomeDataQ = for{
  20.         item <- bigDataTable
  21.     } yield item
  22.  
  23.     lazy val getSomeData = Compiled(getSomeDataQ.to[Stream])
  24. }
  25.  
  26. /**
  27.  * Now we're going to use this stream.
  28.  */
  29.  
  30. object Main extends App with BigDataTableQL with ProductionDataAccess{
  31.  
  32.     def isInteresting(b : BigDataTableItem) : Boolean  = {
  33.         //return true sometimes.
  34.     }
  35.     /*
  36.      *  This will consume the entire stream, and build a giant bunch of inserts, and not hit the
  37.      *  database until the entire stream is consumed, which will result in out of memory error.
  38.      */
  39.     def analyzeAttempt : Unit = {
  40.         database withTransaction { implicit s =>
  41.             getSomeData.run.grouped(1000).foreach{ toInsert =>
  42.                 val thisInsertGroup = toInsert.filter(isInteresting(_)).map(compileModelData(_))
  43.                 modelDataTable ++= thisInsertGroup.toList
  44.             }
  45.         } //Transaction never completes, out of memory error as entire stream is consumed...
  46.     }
  47.  
  48.     /**
  49.      * For some reason, nesting another transaction here doesn't work either.
  50.      *
  51.      */
  52.     def analyzeAttempt2 : Unit = {
  53.         database withTransaction{ implicit s =>
  54.             getSomeData.run.grouped(1000).foreach{ toInsert =>
  55.                 database withTransaction {implicit s2 =>
  56.                     val thisInsertGroup = toInsert.filter(isInteresting(_)).map(compileModelData(_))
  57.                     modelDataTable.++=(thisInsertGroup.toList)(s2)
  58.                 }   //WANT A DATABASE HIT HERE TO INSERT ABOUT 1000 ROWS AT A TIME.  Doesn't happen.
  59.             }
  60.         }      
  61.     }
  62. }
Add Comment
Please, Sign In to add comment