上面即使是渲染最简单的网页也需要十多个说着不同协议的网络服务的合作。比如,为了渲染首页,应用程序需要向社交网络图(Social
Graph)服务、Memcached、数据库、以及许多其它网络服务发出请求。他们每个都使用不同的协议:Thrift、Memcached、
MySQL等等。此外,这些服务之间还相互交谈——他们既是服务器又是客户端。比如,社交网络图服务就提供了一个 Thrift
接口,但是它也从一个 MySQL 集群里面获取信息。
在这样一个系统里面,服务中断最常见的原因就是这些部件之间在发生故障的时候糟糕的交互;常见的故障包括崩溃的主机和极高的时延差异。这些故障可以通过让工作队列任务堆积、TCP
连接搅动(churn)、耗光内存和文件描述符等方式在系统里面叠加起来。在最糟的情况下,用户就会看到
复杂的网络服务器和客户端有很多活动部件:故障检测器、负载平衡器、失效备援策略(failover
strategy)等等。这些部件之间需要达到一种精致的平衡,以便对大型产品系统里面的故障有足够的弹性。
故障检测器、负载平衡器等部件的不同协议的很多不同实现使得这个任务变得尤其困难。比如,Thrift 的背压(back
pressure)策略就和 HTTP 的不同。在事故的时候确保在这种异构系统上的覆盖率非常具有挑战性。
是一个协议不可知的、异步的、用于
Java 虚拟机的远程过程调用(RPC)系统,它可能让在 Java、Scala或任何基于 JVM
的语言上构建鲁棒的客户端和服务器变得很容易。Finagle 支持广泛的基于请求/答复的 RPC 协议和很多类型的流协议。
今天,Finagle 已经部署到了 Twitter 多个前端和后端的运行产品中,包括我们的 URL 爬虫(crawler)和
HTTP 代理。我们计划更广泛地部署 Finagle。
在 Finagle 中,Future 对象是对于所有异步计算的统一抽象。一个 Future
表示了一个尚未完成的计算,其可能成功也可能失败。使用 Future 两个最基本的方法是:
组合
Future
Future 可以以有趣的方式组合或者转换,从而做到一些常常在函数式程序设计里面看到的组合行为。比如,你可以通过 map 把一个
Future[String] 转换成 Future[Int]:
val stringFuture: Future[String] =
Future("1")
val intFuture: Future[Int] =
stringFuture map
{ string =>
string.toInt
}
类似地,你还可以用 flatMap 把一系列 Future 串成一个流水线:
val authenticatedUser: Future[User] =
User.authenticate(email, password)
val lookupTweets: Future[Seq[Tweet]] =
authenticatedUser flatMap
{ user =>
Tweet.findAllByUser(user)
}
在这个例子里面,User.authenticate() 是异步执行的;Tweet.findAllByUser()
在最终结果上被调用。在 Scala 里面这可以用另一种方式表达,用 for 语句:
for {
user
<- User.authenticate(email, password)
tweets
<- Tweet.findAllByUser(user)
} yield tweets
当用 flatMap 或者 for 语句串联 Future
的时候,处理错误和异常也非常简单。在上面的例子中,ifUser.authenticate() 异步地抛出了一个异常,接下来对于
Tweet.findAllByUser() 的调用永远也不会发生。取而代之,流水线的结果表达式仍然是
Future[Seq[Tweet]] 类型,但是它含有异常值而不是推文。你可以使用 onFailure
回调函数或者其他组合计数来处理异常。
和其它异步编程技术(比如 CPS:
Continuation-Passing
Style)相比,Future
有一个很好的性质,就是你可以更容易的编写出清楚且鲁棒的异步代码,即使是带有复杂的散布/收集(scatter/gather)操作:
val severalFutures = Seq[Future[Int]] =
Seq(Tweet.find(1), Tweet.find(2), ...)
val combinedFuture: Future[Seq[Int]] =
Future.collect(severalFutures)
Service
对象
Service 是一个函数,其接受一个请求,返回一个 Future 对象作为答复。注意客户端和服务器都是用 Service
对象表示的。
要创建一个 Service 对象,你需要继承抽象的 Service 类并监听一个端口。下面是一个简单的 HTTP 服务器,监听端口
10000:
val service = new Service[HttpRequest, HttpResponse] {
def
apply(request: HttpRequest) =
Future(new
DefaultHttpResponse(HTTP_1_1, OK))
}
val address = new InetSocketAddress(10000)
val server: Server[HttpRequest, HttpResponse] = ServerBuilder()
.name("MyWebServer")
.codec(Http())
.bindTo(address)
.build(service)
建立一个 HTTP 客户端就更简单了:
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http())
.hosts(address)
.build()
// Issue a request, get
a response:
val request: HttpRequest =
new
DefaultHttpRequest(HTTP_1_1, GET,
"/")
client(request) onSuccess { response =>
println("Received
response: " + response)
}
Filter
对象
Filter 是一种把你的应用程序中不同的阶段的孤立出来组成一个流水线的有用的方式。比如,你可能需要在你的 Service
开始接受请求前处理异常、授权等问题。
一个 Filter 包裹了一个 Service,且潜在地,把 Service 的输入和输出类型转换成其它类型。换一句话说,Filter
是一个转换器。下面是一个用来确保一个 HTTP 请求有合法的 OAuth 证书的、且使用一个异步的认证服务的
filter。
下面是一个修饰了 Service 的Filter:
class
RequireAuthentication(a:
Authenticator)
extends Filter[...] {
def
apply(
request:
Request,
continue:
Service[AuthenticatedRequest, HttpResponse]
)
= {
a.authenticate(request)
flatMap {
case
AuthResult(OK,
passport)
=>
continue(AuthenticatedRequest(request,
passport))
case
AuthResult(Error(code))
=>
Future.exception(new
RequestUnauthenticated(code))
}
}
}
Finagle 是一个开源项目,使用 Apache License, Version 2.0。源代码和文档都可以在
GitHub 上找到。
鸣谢
Finagle 最早构思来自 Marius Eriksen 和 Nick Kallen。其它主要贡献人员有 Arya
Asemanfar, David Helder, Evan Meagher, Gary McCue, Glen Sanford,
Grant Monroe, Ian Ownbey, Jake Donham, James Waldrop, Jeremy Cloud,
Johan Oskarsson, Justin Zhu, Raghavendra Prabhu, Robey Pointer,
Ryan King, Sam Whitlock, Steve Jenson, Wanli Yang, Wilhelm
Bierbaum, William Morgan, Abhi Khune, and Srini Rajagopal。
原文链接:
http://engineering.twitter.com/2011/08/finagle-protocol-agnostic-rpc-system.html
原文发表日期:2011 年 8 月 19 日