Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Aug 5th, 2012  |  syntax: None  |  size: 6.75 KB  |  hits: 18  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. package com.vast.example
  2.  
  3. import java.net.InetSocketAddress
  4. import java.util.UUID
  5. import java.util.concurrent.{Executors, TimeUnit}
  6. import com.google.common.base.Splitter
  7. import com.twitter.finagle.http.Http
  8. import com.twitter.finagle.builder.{Server, ServerBuilder}
  9. import com.twitter.finagle.service.TimeoutFilter
  10. import com.twitter.finagle.{Service, SimpleFilter, GlobalRequestTimeoutException}
  11. import com.twitter.util.{Future, FuturePool, FutureTransformer, Duration}
  12. import org.codehaus.jackson.map.ObjectMapper
  13. import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
  14. import org.jboss.netty.handler.codec.http.{HttpResponseStatus, DefaultHttpResponse, HttpRequest, HttpResponse}
  15. import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
  16. import org.jboss.netty.handler.codec.http.HttpResponseStatus.{NOT_FOUND, INTERNAL_SERVER_ERROR}
  17. import org.jboss.netty.util.CharsetUtil.UTF_8
  18. import org.jboss.netty.buffer.ChannelBuffer
  19.  
  20. /**
  21.  * Server entry point.
  22.  *
  23.  * @author Alex Moffat (alex.moffat@vast.com)
  24.  */
  25.  
  26. object ExampleServer {
  27.  
  28.  
  29.   def main(args: Array[String]) {
  30.  
  31.     // Requests pass down through the filters to the service. The
  32.     // response from the service is passed back up through the filters
  33.     // to be returned. The Filter class provides the "andThen" method
  34.     // for creating the filter chain.
  35.     val service =
  36.       new ExceptionFilter andThen
  37.       new HeaderFilter andThen
  38.       new HttpTimeoutFilter(Duration(1, TimeUnit.SECONDS)) andThen
  39.       new RestService()
  40.  
  41.     // Create the server and start it. The ServerBuilder is an example
  42.     // of the Type-Safe Builder pattern in scala. Options are set on
  43.     // the builder. Build is called to construct and start the server.
  44.     val server: Server = ServerBuilder()
  45.       .codec(Http())
  46.       .bindTo(new InetSocketAddress(8080))
  47.       .name("httpserver")
  48.       .build(service)
  49.   }
  50. }
  51.  
  52. // Handle any uncaught exceptions that occur in filters or services
  53. // lower down the chain. We extend SimpleFilter because we're not
  54. // going to change the type of the request or response.
  55. class ExceptionFilter extends SimpleFilter[HttpRequest, HttpResponse] {
  56.  
  57.   // Deal with successful and error responses.
  58.   val transformer =
  59.     new FutureTransformer[HttpResponse, HttpResponse] {
  60.       // A successful response is just passed through.
  61.       override def map(value: HttpResponse): HttpResponse =
  62.         value
  63.       // An error is converted into a 500 response. The
  64.       // Responses object is in this file.
  65.       override def handle(throwable: Throwable): HttpResponse =
  66.         Responses.InternalServerError(throwable.getMessage)
  67.     }
  68.  
  69.   // Apply method is where the filtering takes place.
  70.   def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
  71.     // Use transformedBy method to deal with both the normal and error responses.
  72.     service(request).transformedBy(transformer)
  73.   }
  74. }
  75.  
  76. // An trial filter that adds a header to the request and another
  77. // to the response.
  78. class HeaderFilter extends SimpleFilter[HttpRequest, HttpResponse] {
  79.  
  80.   def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
  81.     // Look for a X-Request-ID header and add one if missing.
  82.     val id = request.getHeader("X-Request-ID")
  83.     if (id == null) {
  84.       request.setHeader("X-Request-ID",
  85.         UUID.randomUUID().toString)
  86.     }
  87.  
  88.     // Add X-Processes to the response.
  89.     service(request).onSuccess(r => {
  90.       r.addHeader("X-Processed", "TRUE")
  91.     })
  92.   }
  93. }
  94.  
  95. // Specialize the general TimeoutFilter to deal with
  96. // HTTP requests and responses.
  97. class HttpTimeoutFilter(timeout: Duration)
  98.   extends TimeoutFilter[HttpRequest, HttpResponse](timeout,
  99.     new GlobalRequestTimeoutException(timeout)) {
  100.  
  101. }
  102.  
  103. // Invoke the correct underlying service based on the incoming url.
  104. class RestService extends Service[HttpRequest, HttpResponse] {
  105.  
  106.   // Simple service that returns some json.
  107.   val okService = new SimpleService()
  108.   // Simple service that waits for longer that the configured
  109.   // timeout for the server.
  110.   val timeoutService = new SimpleService(Some(1200))
  111.  
  112.   // This pool is used to convert HttpResponse to
  113.   // Future[HttpResponse] by executing the service on
  114.   // a separate thread.
  115.   val futurePool = FuturePool(Executors.newFixedThreadPool(4))
  116.   // Used to split URIs.
  117.   val splitter = Splitter.on('/').omitEmptyStrings()
  118.  
  119.   def apply(req: HttpRequest) = {
  120.  
  121.     val path = splitter.split(req.getUri).iterator()
  122.  
  123.     // Match the pieces of the path
  124.     if (path.hasNext) {
  125.       path.next() match {
  126.           // Path starts with /ok so use ok service.
  127.           // Using futurePool creates a Future that is evaluated on a separate thread using
  128.           // the thread pool passed to the FuturePool constructor above.
  129.         case "ok" => futurePool {
  130.           // See Responses.OK comments below.
  131.           Responses.OK(req,
  132.             (req: HttpRequest) => { okService(req.getUri) })}
  133.           // Path starts with /timeout so use timeout service.
  134.         case "timeout" => futurePool {
  135.           Responses.OK(req,
  136.             (req: HttpRequest) => { timeoutService(req.getUri) })}
  137.           // No match so return a not found response. Future.value
  138.           // creates a Future with an existing constant value.
  139.         case _ => Future.value(Responses.NotFound())
  140.       }
  141.     } else {
  142.       // No match so not found.
  143.       Future.value(Responses.NotFound())
  144.     }
  145.   }
  146. }
  147.  
  148. // A very simple service that returns a model object.
  149. class SimpleService(val waitFor: Option[Int] = None) {
  150.  
  151.   def apply(name: String) = {
  152.     // If a time to wait is supplied then wait. This
  153.     // is used to timeout the service.
  154.     waitFor.foreach(t => this.synchronized { wait(t) })
  155.     // Return the model object.
  156.     new SimpleModel(UUID.randomUUID().toString, name, "Some Street")
  157.   }
  158. }
  159.  
  160. // Objects to produce some standard http responses.
  161. object Responses {
  162.  
  163.   // Used to convert objects into json
  164.   val mapper = new ObjectMapper
  165.  
  166.   // Create an HttpResponse from a status and some content.
  167.   private def respond(status: HttpResponseStatus, content: ChannelBuffer): HttpResponse = {
  168.     val response = new DefaultHttpResponse(HTTP_1_1, status)
  169.     response.setHeader("Content-Type", "application/json")
  170.     response.setHeader("Cache-Control", "no-cache")
  171.     response.setContent(content)
  172.     response    
  173.   }
  174.  
  175.   object OK {
  176.     def apply(req: HttpRequest, service: (HttpRequest) => Object): HttpResponse =
  177.       respond(HttpResponseStatus.OK,
  178.         copiedBuffer(mapper.writeValueAsBytes(service(req))))      
  179.   }
  180.  
  181.   object NotFound {
  182.     def apply(): HttpResponse  =
  183.       respond(NOT_FOUND,
  184.         copiedBuffer("{\"status\":\"NOT_FOUND\"}", UTF_8))    
  185.   }
  186.  
  187.   object InternalServerError {
  188.     def apply(message: String): HttpResponse =
  189.       respond(INTERNAL_SERVER_ERROR,
  190.         copiedBuffer("{\"status\":\"INTERNAL_SERVER_ERROR\", " +
  191.         "\"message\":\"" + message + "\"}", UTF_8))
  192.   }
  193. }