Akka Http请求和响应模式

2019年5月6日 32点热度 0条评论

我有一个要求客户调用通过akka http创建的post REST端点。一旦请求进入post方法,我就需要将post对象传递给流(由源,多个流和接收器组成),并从接收器获取响应,以便可以将响应返回给客户端。

我浏览了一些文章,并看了下面的代码,但担心我不想为每个请求实现流。我只想实现一个流,并继续将元素传递到该流。

以下是我所看到的高层次:

val route: Route =
  path("dummy path") { p =>
    get {
      (extract(_.request) & extractMaterializer) { (req, mat) ⇒
        **Source.single(req).runWith(sink)(mat)**

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

我当时正在考虑创建一个演员,并将对象传递给那个演员。我可以从Source.actorRef创建一个源,并将多个流与此源连接。但是我不确定如何从接收器中获取响应。就像是:

    val actor: ActorRef = some actor

    Source.actorRef(actor).via(flows).to(Sink).run() --> materialized stream

    val route: akka.http.scaladsl.server.Route =
      path("post" / Segment) { p =>
        post {

          (extract(_.request) & extractMaterializer) { (req, mat) ⇒
           response = actor.ask(message) --> get back the response

            complete {
              response
            }
          }
        }
      }

或者,我可以在用例中添加其他任何内容。

解决方案如下:

我想您想要的是使请求处理通过流[仅实现一次],然后将响应从流发送回用户。可能是队列源,并且介于两者之间的Actor可以胜任

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{
  get,
  onSuccess,
  pathEnd,
  pathPrefix
}
import akka.pattern.ask
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.util.Timeout
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import com.typesafe.config.ConfigFactory

import scala.concurrent.ExecutionContext

object TestApp2 extends App {

  implicit val actorSystem = ActorSystem("test-system")
  implicit val mat = ActorMaterializer()
  implicit val ec = mat.executionContext

  val streamSource = Source
    .queue[(Message, ActorRef)](100, OverflowStrategy.dropNew)
    .map { p =>
      //do anything here
      println("I am processing request")
      ("It works", p._2)
    }
    .toMat(Sink.foreach { resp =>
      resp._2 ! resp._1
    })(Keep.left)
    .run()
  implicit val timeout = Timeout(
    10000,
    TimeUnit.MILLISECONDS
  )

  val internalActor =
    actorSystem.actorOf(Props(new InternalActor(streamSource)))
  Http(actorSystem)
    .bindAndHandle(
      getRoutes(internalActor),
      "0.0.0.0",
      8080
    )

  def getRoutes(
      internalActor: ActorRef
  )(implicit mat: ActorMaterializer, ec: ExecutionContext, timeout: Timeout) = {
    pathPrefix("healthcheck") {
      get {
        pathEnd {
          val responseReturned = internalActor ? Message()
          onSuccess(responseReturned) {
            case response: String =>
              complete(response)
            case _ => complete("error")
          }
        }
      }
    }
  }
}

case class Message()

class InternalActor(streamSource: SourceQueueWithComplete[(Message, ActorRef)])(
    implicit ec: ExecutionContext
) extends Actor {

  override def receive: Receive = {
    case m: Message =>
      val senderRef = sender()
      streamSource.offer((m, senderRef)).map {
        case QueueOfferResult.Enqueued => // do nothing for success
        case QueueOfferResult.Dropped => senderRef ! "error" // return error in case of backpressure 
        case QueueOfferResult.Failure(ex) => senderRef ! "error" // return error
        case QueueOfferResult.QueueClosed => senderRef ! "error" // return error
      }
  }
}

卷曲'
http://localhost:8080/healthcheck'

有用