Advertisement
Guest User

Untitled

a guest
Aug 26th, 2016
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.64 KB | None | 0 0
  1. import java.util.concurrent.Executors
  2.  
  3. import akka.NotUsed
  4. import akka.actor.ActorSystem
  5. import akka.http.scaladsl.Http
  6. import akka.http.scaladsl.marshalling.Marshal
  7. import akka.http.scaladsl.model.ContentTypes._
  8. import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials, OAuth2BearerToken}
  9. import akka.http.scaladsl.model._
  10. import akka.http.scaladsl.unmarshalling.Unmarshal
  11. import akka.stream.ActorMaterializer
  12. import akka.util.ByteString
  13. import de.choffmeister.auth.common.OAuth2AccessTokenResponse
  14. import model.{CreateUserRequest, JsonProtocols, ReceiptEntity, UserInfo}
  15.  
  16. import scala.concurrent.{ExecutionContext, Future}
  17. import scala.io.BufferedSource
  18. import scala.util.{Failure, Success, Try}
  19. import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
  20. import akka.stream.scaladsl.{Flow, Sink, Source}
  21. import akka.stream.scaladsl.Tcp.OutgoingConnection
  22.  
  23. import scala.collection.immutable.Seq
  24.  
  25. class Uploader extends JsonProtocols {
  26.   implicit val system = ActorSystem()
  27.   implicit val materializer = ActorMaterializer()
  28.   implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(200))
  29.  
  30.   def createUser(createUserRequest: CreateUserRequest): Future[UserInfo] = {
  31.     for {
  32.       request <- Marshal(createUserRequest).to[RequestEntity]
  33.       response <- Http().singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://localhost:9000/user/create", entity = request))
  34.       userInfo <- Unmarshal(response.entity).to[UserInfo]
  35.     } yield userInfo
  36.   }
  37.  
  38.   def authenticateUser(userInfo: UserInfo): Future[OAuth2AccessTokenResponse] = {
  39.     for {
  40.       response <- Http().singleRequest(HttpRequest(uri = s"http://localhost:9000/token/create",
  41.         headers = List(Authorization(BasicHttpCredentials(userInfo.userName, "password")))))
  42.       accessToken <- Unmarshal(response.entity).to[OAuth2AccessTokenResponse]
  43.     } yield accessToken
  44.   }
  45.  
  46.   def utf8TextEntity(content: String) = {
  47.     val bytes = ByteString(content)
  48.     HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, bytes)
  49.   }
  50.  
  51.   def createImageFileContent(): Future[RequestEntity] = {
  52.     val receiptImage: BufferedSource = scala.io.Source.fromFile("/home/leonti/Downloads/receipt.jpg", "ISO-8859-1")
  53.     val content = receiptImage.map(_.toByte).toArray
  54.  
  55.     val multipartForm =
  56.       Multipart.FormData(Multipart.FormData.BodyPart.Strict(
  57.         "receipt",
  58.         HttpEntity(`application/octet-stream`, content),
  59.         Map("filename" -> "receipt.png")),
  60.         Multipart.FormData.BodyPart.Strict("total", utf8TextEntity("12.38")),
  61.         Multipart.FormData.BodyPart.Strict("description", utf8TextEntity("some description"))
  62.       )
  63.     Marshal(multipartForm).to[RequestEntity]
  64.   }
  65.  
  66.   def upload() = {
  67.     val username = "ci_user_" + java.util.UUID.randomUUID()
  68.     val createUserRequest = CreateUserRequest(username, "password")
  69.  
  70.     val toReceiptRequest: (MessageEntity, String, String) => HttpRequest = (requestEntity, userId, accessToken) => {
  71.       HttpRequest(method = HttpMethods.POST,
  72.         uri = s"http://localhost:9000/user/${userId}/receipt",
  73.         entity = requestEntity,
  74.         headers = List(Authorization(OAuth2BearerToken(accessToken))))
  75.     }
  76.  
  77.     val uploadReceipt: (HttpRequest) => Future[StatusCode] = request => {
  78.       val start = System.currentTimeMillis()
  79.       println("Starting to upload receipt")
  80.  
  81.       Http().singleRequest(request).map(response => {
  82.         println(response.status)
  83.         val end = System.currentTimeMillis()
  84.         println(s"Receipt uploaded in ${(end - start)}ms")
  85.         response.status
  86.       })
  87.     }
  88.  
  89.     val requests: Future[Seq[HttpRequest]] = for {
  90.       userInfo: UserInfo <- createUser(createUserRequest)
  91.       accessToken: OAuth2AccessTokenResponse <- authenticateUser(userInfo)
  92.       requestEntity: MessageEntity <- createImageFileContent()
  93.     } yield Seq.fill(10)(toReceiptRequest(requestEntity, userInfo.id, accessToken.accessToken))
  94.  
  95.     val result: Future[Seq[StatusCode]] = requests.flatMap(requests => Future.sequence(requests.map(request => uploadReceipt(request))))
  96.     result
  97.   }
  98. }
  99.  
  100. object LoadTest {
  101.   implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(200))
  102.  
  103.   def main(args: Array[String]): Unit = {
  104.    
  105.     val start = System.currentTimeMillis()
  106.  
  107.     new Uploader().upload().onComplete({
  108.         case Success(receipt) => {
  109.           val end = System.currentTimeMillis()
  110.           val toUpload = end - start
  111.           println(s"All receipts uploaded in ${toUpload/1000}s")
  112.         }
  113.         case Failure(e) => println(s"Exception happened! ${e} ${e.getStackTrace.foreach(e => println(e))}")
  114.     })
  115.  
  116.   }
  117. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement