Advertisement
Guest User

Untitled

a guest
May 3rd, 2016
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.28 KB | None | 0 0
  1. package com.beeva.poc.cassandra.massive_insertion
  2.  
  3. import java.net.InetSocketAddress
  4. import com.datastax.driver.core.Session
  5. import com.typesafe.config.Config
  6. import akka.actor.Actor
  7. import com.datastax.driver.core.Cluster
  8. import akka.actor.ActorLogging
  9.  
  10.  
  11. //conexion a base de datos cassandra
  12. class CassandraConnection(config: Config) extends Actor with ActorLogging {
  13.   val nodes = scala.collection.JavaConversions.asScalaBuffer(config.getStringList("nodes")).toList
  14.   val port = config.getInt("port")
  15.   val createKeyspace = config.getString("create")
  16.   val session: Session = {
  17.     var list = new java.util.ArrayList[InetSocketAddress]()
  18.     for (n <- nodes) {
  19.       list.add(new java.net.InetSocketAddress(n, port))
  20.     }
  21.     val cluster = Cluster.builder().addContactPointsWithPorts(list).build()
  22.     val session = cluster.connect();
  23.     session.execute(createKeyspace)
  24.     session
  25.   }
  26.  
  27.   // ejecuta sentencias insert de modo asincrono no bloqueante
  28.   def insert(insert: String) {
  29.     this.session.executeAsync(insert)
  30.   }
  31.  
  32.   def receive ={
  33.     case Messages.Insert(statement) =>
  34.       insert(statement)
  35.       sender ! Messages.InsertDone
  36.     case _ =>
  37.       log.debug("Message not valid")
  38.   }
  39.  
  40.   override def postStop():Unit = {
  41.     session.close()
  42.   }
  43. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement