- package com.vast.example
- import java.net.InetSocketAddress
- import java.util.UUID
- import java.util.concurrent.{Executors, TimeUnit}
- import com.google.common.base.Splitter
- import com.twitter.finagle.http.Http
- import com.twitter.finagle.builder.{Server, ServerBuilder}
- import com.twitter.finagle.service.TimeoutFilter
- import com.twitter.finagle.{Service, SimpleFilter, GlobalRequestTimeoutException}
- import com.twitter.util.{Future, FuturePool, FutureTransformer, Duration}
- import org.codehaus.jackson.map.ObjectMapper
- import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
- import org.jboss.netty.handler.codec.http.{HttpResponseStatus, DefaultHttpResponse, HttpRequest, HttpResponse}
- import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
- import org.jboss.netty.handler.codec.http.HttpResponseStatus.{NOT_FOUND, INTERNAL_SERVER_ERROR}
- import org.jboss.netty.util.CharsetUtil.UTF_8
- import org.jboss.netty.buffer.ChannelBuffer
- /**
- * Server entry point.
- *
- * @author Alex Moffat (alex.moffat@vast.com)
- */
- object ExampleServer {
- def main(args: Array[String]) {
- // Requests pass down through the filters to the service. The
- // response from the service is passed back up through the filters
- // to be returned. The Filter class provides the "andThen" method
- // for creating the filter chain.
- val service =
- new ExceptionFilter andThen
- new HeaderFilter andThen
- new HttpTimeoutFilter(Duration(1, TimeUnit.SECONDS)) andThen
- new RestService()
- // Create the server and start it. The ServerBuilder is an example
- // of the Type-Safe Builder pattern in scala. Options are set on
- // the builder. Build is called to construct and start the server.
- val server: Server = ServerBuilder()
- .codec(Http())
- .bindTo(new InetSocketAddress(8080))
- .name("httpserver")
- .build(service)
- }
- }
- // Handle any uncaught exceptions that occur in filters or services
- // lower down the chain. We extend SimpleFilter because we're not
- // going to change the type of the request or response.
- class ExceptionFilter extends SimpleFilter[HttpRequest, HttpResponse] {
- // Deal with successful and error responses.
- val transformer =
- new FutureTransformer[HttpResponse, HttpResponse] {
- // A successful response is just passed through.
- override def map(value: HttpResponse): HttpResponse =
- value
- // An error is converted into a 500 response. The
- // Responses object is in this file.
- override def handle(throwable: Throwable): HttpResponse =
- Responses.InternalServerError(throwable.getMessage)
- }
- // Apply method is where the filtering takes place.
- def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
- // Use transformedBy method to deal with both the normal and error responses.
- service(request).transformedBy(transformer)
- }
- }
- // An trial filter that adds a header to the request and another
- // to the response.
- class HeaderFilter extends SimpleFilter[HttpRequest, HttpResponse] {
- def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
- // Look for a X-Request-ID header and add one if missing.
- val id = request.getHeader("X-Request-ID")
- if (id == null) {
- request.setHeader("X-Request-ID",
- UUID.randomUUID().toString)
- }
- // Add X-Processes to the response.
- service(request).onSuccess(r => {
- r.addHeader("X-Processed", "TRUE")
- })
- }
- }
- // Specialize the general TimeoutFilter to deal with
- // HTTP requests and responses.
- class HttpTimeoutFilter(timeout: Duration)
- extends TimeoutFilter[HttpRequest, HttpResponse](timeout,
- new GlobalRequestTimeoutException(timeout)) {
- }
- // Invoke the correct underlying service based on the incoming url.
- class RestService extends Service[HttpRequest, HttpResponse] {
- // Simple service that returns some json.
- val okService = new SimpleService()
- // Simple service that waits for longer that the configured
- // timeout for the server.
- val timeoutService = new SimpleService(Some(1200))
- // This pool is used to convert HttpResponse to
- // Future[HttpResponse] by executing the service on
- // a separate thread.
- val futurePool = FuturePool(Executors.newFixedThreadPool(4))
- // Used to split URIs.
- val splitter = Splitter.on('/').omitEmptyStrings()
- def apply(req: HttpRequest) = {
- val path = splitter.split(req.getUri).iterator()
- // Match the pieces of the path
- if (path.hasNext) {
- path.next() match {
- // Path starts with /ok so use ok service.
- // Using futurePool creates a Future that is evaluated on a separate thread using
- // the thread pool passed to the FuturePool constructor above.
- case "ok" => futurePool {
- // See Responses.OK comments below.
- Responses.OK(req,
- (req: HttpRequest) => { okService(req.getUri) })}
- // Path starts with /timeout so use timeout service.
- case "timeout" => futurePool {
- Responses.OK(req,
- (req: HttpRequest) => { timeoutService(req.getUri) })}
- // No match so return a not found response. Future.value
- // creates a Future with an existing constant value.
- case _ => Future.value(Responses.NotFound())
- }
- } else {
- // No match so not found.
- Future.value(Responses.NotFound())
- }
- }
- }
- // A very simple service that returns a model object.
- class SimpleService(val waitFor: Option[Int] = None) {
- def apply(name: String) = {
- // If a time to wait is supplied then wait. This
- // is used to timeout the service.
- waitFor.foreach(t => this.synchronized { wait(t) })
- // Return the model object.
- new SimpleModel(UUID.randomUUID().toString, name, "Some Street")
- }
- }
- // Objects to produce some standard http responses.
- object Responses {
- // Used to convert objects into json
- val mapper = new ObjectMapper
- // Create an HttpResponse from a status and some content.
- private def respond(status: HttpResponseStatus, content: ChannelBuffer): HttpResponse = {
- val response = new DefaultHttpResponse(HTTP_1_1, status)
- response.setHeader("Content-Type", "application/json")
- response.setHeader("Cache-Control", "no-cache")
- response.setContent(content)
- response
- }
- object OK {
- def apply(req: HttpRequest, service: (HttpRequest) => Object): HttpResponse =
- respond(HttpResponseStatus.OK,
- copiedBuffer(mapper.writeValueAsBytes(service(req))))
- }
- object NotFound {
- def apply(): HttpResponse =
- respond(NOT_FOUND,
- copiedBuffer("{\"status\":\"NOT_FOUND\"}", UTF_8))
- }
- object InternalServerError {
- def apply(message: String): HttpResponse =
- respond(INTERNAL_SERVER_ERROR,
- copiedBuffer("{\"status\":\"INTERNAL_SERVER_ERROR\", " +
- "\"message\":\"" + message + "\"}", UTF_8))
- }
- }