这篇文档介绍了统一查询层的底层运行库 --
cats-effect
.
统一查询层是个综合工程, 在这里我们选取其中的一个 -- apm module
作为例子, 分析cats-effect
的原理和具体用法.
apm module
是一个web service项目, 他连接了前端请求和后端数据存储. apm module
提供的api基本遵循一个统一的模式:
前端请求q1 --> 将q1转化成即席查询请求q2 --> 从mysql补全q2的元信息 --> 发起即席查询(es/druid/prometheus), 获得结果q2 --> 将q2转化成前端需要的格式q1 --> 返回给前端
作为一个重IO任务的web service, apm module
需要处理以下几个关键问题:
- 管理http server链接池
- 管理mysql元数据库链接池
- 管理http client资源, client用于查询es/druid/promethus
- 不同IO逻辑之间不能同步依赖, 需要异步解耦
- 管理cpu资源. cpu既要处理解析请求/返回结果等cpu-bound任务, 还要处理mysql查询/http client请求等io-bound任务, 最后还要处理有各种task的调度
- 实现周期性/定时的执行某个逻辑
- 能够支撑100qps左右的用户查询和警报查询的并发流量
下面文档将先介绍一些并发的基础知识, 接着结合具体使用场景讲一下IO
, Fiber
, Resource
, ContextShift
, Timer
等cats-effect
重要组件的含义和使用方法, 最后从整体角度说明apm module
的代码组织方式.
首先要明确, 并发不是一个简单的话题. 这里只把几个最重要的概念拿出来强调一下. 这些概念跟统一查询层工程实现和cats-effect
有密切的关系.
CPU-bound任务指的是主要消耗处理器资源的任务, 例如序列化, 正则表达式匹配, 对数据加减乘除等等. IO-bound任务指任务主要在进行I/O, 并不依赖处理器资源. 例如等待磁盘操作, 等待外部服务响应请求等等. 另外还有non-terminating任务, 它指不会结束且不消耗处理器资源或者不会阻塞线程的任务.
在JVM中, 线程与操作系统的native线程是1:1的关系. 在任一时刻, 每个core只能运行1个线程. 如果线程很多, 我们会碰到context switch问题, 消耗额外的CPU资源. 因此执行CPU-bound任务最高效的方式是有多少核就跑多少线程. 对于生命周期很短的任务, 每个任务都创建一个线程是很低效的模式; 而且如果创建了过多的线程, 内存就会被过度消耗. 针对这些问题, 我们可以用线程池解决. 线程池由一个任务队列和一定数量的工作线程组成.每个任务都会先进入任务队列, 线程池消费任务队列, 分发任务给某个工作线程, 工作线程完成任务.
根据最佳实践, JVM的线程池应该被分成三类:
- CPU-bound
- Blocking IO
- Non-block IO polling
每种线程池都有不同的使用模式.
对于CPU-bound任务, 我们应该使用bounded线程池. 它是预先创建的, 并且线程数量固定, 等于CPU核数. 也就是fixedThreadPool
.
如果在fixedThreadPool
中blocking, 珍贵的CPU资源就会被白白的消耗. 我们必须想尽一切办法避免在CPU-bound线程池中blocking, 所以我们把blocking操作转移到另一个独立的线程迟中 -- 一个caching, unbounded并且不预先设定线程池大小的线程池. 使用这种unbounded线程池时, 必须前置流量控制逻辑, 不然可能会OOM.
最后一种线程池是异步IO poll线程池. 它里面的线程不断的询问kernel "有没有异步的IO通知啊, 亲?", 并且把通知传达给程序其他部分. 我们用很小的, 最高优先级的fixedThreadPool
来满足这个需求. 这种线程池的大小一般只有1. 注意, 千万不能在这类线程池中运行CPU-bound任务或者blocking逻辑 -- 即使是1纳秒的延迟, 也会大幅度拖慢整个程序.
下面的漫画图生动的展示了这三种线程池的使用模式:
绿色线程是cats-effect
平台层面的线程. 他们不是由操作系统调度的, 因此更轻量级, 可以同时创建很多绿色线程.
绿色线程的基础概念来自于Cooperative multitasking
. 指线程不是被动的被抢断(preempted), 而是主动放弃control.
绿色线程在很多语言的并发模型中有重要地位, 例如golang的goroutine. 在cats-effect
中, Fiber
和IO shift
两个重要机制设计都来源于此.
线程阻塞的概念大家应该都很熟悉了, 这里借助线程阻塞介绍异步阻塞的概念.
我们应该尽可能的避免线程阻塞. 但是很多时候不得不这样做. 上面提到, 当不得不处理阻塞线程时, 可以把I/O阻塞任务"shift"到一个专门用于阻塞的unbounded线程池里面运行. 使用unbounded线程池和IO shift
技术, 处理线程阻塞代码例子如下:
import java.util.concurrent.Executors
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val blockingEC = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
def blockingOp: IO[Unit] = IO(/* I/O操作 */ ())
def doSth(): IO[Unit] = IO(/* 计算操作 */ ())
val prog =
for {
_ <- contextShift.evalOn(blockingEC)(blockingOp) // blockingOp被shift到blockingEC, 结束后又被shift回隐式的globalEC
_ <- doSth()
} yield ()
上面的代码中, 我们通过IO shift
实现了异步阻塞. 异步阻塞指的是阻塞任务(或者说阻塞绿色线程, 阻塞fiber), 但不阻塞线程. 其他未被阻塞的任务还是可以在当前线程中运行. 这样的阻塞方式浪费资源很少, 十分高效.
上面代码中的blockingOp
就是被异步阻塞了.
操作系统通过时间片调度native线程, 当时间片结束时, 当前运行线程被抢断(preempted), 发生上下文切换, 另一个线程运行. 这种调度机制保证了不同线程之间的fairness.
在线程池的抽象下, 调度方式有所不同 -- 线程池按顺序消费任务队列, 每个任务要执行完才有可能执行下一个任务. 也就是说线程池内部的不同任务无法互相抢断.
为了在线程池中实现多任务的fairness, 需要一个任务主动放弃控制的机制(上面提到的Cooperative multitasking
), 要么暂停当前任务把它放到队尾, 要么把当前任务转移到其他线程池. 如果把一个任务的逻辑看做一步步先后序执行语句的同步过程, 让这个任务执行到一半时主动放弃控制运行其他任务, 就叫做插入了一个异步边界(asynchronous boundary) -- 任务的前后两部分是异步执行的.
Asynchronous boundary是响应式编程的核心概念, 详细解释看这个官方解释reactive-streams/reactive-streams-jvm#46
IO shift
****就是显式的插入异步边界.
下面举一个简单例子说明异步边界的用法. 一个不断打印数字的程序:
import java.util.concurrent.Executors
import cats.effect.{ContextShift, Fiber, IO}
import cats.syntax.apply._
import scala.concurrent.ExecutionContext
// 大小只有1的线程池
val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
val cs: ContextShift[IO] = IO.contextShift(ec)
def infiniteIO(id: Int)(cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
// 无限循环
def repeat: IO[Unit] = IO(println(id)).flatMap(_ => repeat)
// 创建一个fiber, 在background运行
repeat.start(cs)
}
val prog =
for {
_ <- infiniteIO(1)(cs) // 第一个任务已经把线程池占满了
_ <- infiniteIO(11)(cs) // 第二个任务永远无法执行
} yield ()
prog.unsafeRunSync()
这个程序只会不断打印1, 永远不会打印11. 把repeat
函数修改一下, 通过IO shift
插入异步边界:
def infiniteIO(id: Int)(cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
def repeat: IO[Unit] = IO(println(id)).flatMap(_ => IO.shift *> repeat)
repeat.start(cs)
}
这样修改之后, 每次打印结束, 整个任务就会重新进入线程池任务队列的队尾. 程序跑稳定之后会交替打出1和11.
Scala自带的
Future
对任务的fairness做了特殊优化 -- 在每个map和flatMap之后都插入了异步边界.
看了统一查询层代码, 你会发现到处都是IO类型. IO到底是个啥, 为啥他的应用这么广泛呢?
这里的IO是一个类型, 不是I/O操作的那个IO
IO类型是side-effect wrapper. IO杂糅了来自functional programming和asynchronous and concurrent programming两个领域的多个概念, 下面我们一层层剥开它神秘的面纱.
FP要求pure functions:
- Total — Functions always return an output for every input.
- Deterministic — Functions return the same output for the same input.
- Pure — The only effect of providing a function an input is computing the output.
关于为啥FP要这么做, 根本上都是为了程序的scalability, 为了代码能更容易的compose和decompose. Pure functions are easier to understand, easier to test, easier to refactor, and easier to abstract over.
但是程序运行必须有side-effect. 由纯函数组成的程序不直接与外界直接接触, 因为如果发生接触, 必然会产生side-effect. 为了与接触, FP程序需要构造并返回一种描述side-effect的immutable的数据结构, 这种数据结构叫做functional effects. IO类型是functional effects的一种实现.
一个IO[A]类型的值描述一个计算, 当调用特定"unsafe"方法时, 它描述的计算被执行, 执行结束后返回一个类型A的值
注意这个定义中的两个关键词 -- 描述 和 执行:
- 创建一个IO[A]类型的变量并给他赋值, 只是描述一个计算, 计算没有真正执行.
- 当调用
IO[A].unsafeRunXXXX()
时IO[A]描述的计算才被执行, 这时候会产生side-effect
我们要以monadic的风格, 使用map或者flatMap串联各个IO, 最后在"very end of your program"只调用一次unsafe方法运行计算. 这是FP处理side-effect的通用模式. 下面我们通过一个统一查询层的例子来说明IO的使用.
QueryService
中有个方法query
, query
方法的定义如下:
def query[T <: QueryPhrase](req: T): IO[ExecutionResult] =
for {
metas <- fetchMetas(req.table)
exe <- selectExecutor(metas.item.dbType)
result <- exe.queryExternalDb(req)
} yield result
fetchMetas
从mysql获得查询的元数据, selectExecutor
根据数据库类型获得对应的executor, Executor.queryExternalDb
查询db. 这三个都是IO-bound操作, 都有side-effect.
fetchMetas
返回类型是IO[Meta]
. 当程序跑到 metas <- fetchMetas(req.table)
这一行时, 创建了一个描述查询mysql元数据的, 类型为IO[Meta]
的变量metas
.这时并不会真的去查mysql.
同样道理, exe <- selectExecutor(metas.item.dbType)
也只是创建了一个类型为IO[QueryExecutor]
的变量exe
, 并没有真正生成一个executor.
三个个IO类型的变量metas
和exe
和result
, 通过flatMap串联在一起, 意思是先查myql元数据, 成功后, 再生成一个executor, 成功后, 最后查询数据库获得结果. 串联后, 整个函数只返最后result
的类型IO[ExecutionResult]
.
那么真正的计算在何时执行呢? 这个query函数, 还会继续跟其他IO[A]串联, 最终形成一个大IO[A]变量. 这个变量几乎描述了整个code path的计算逻辑, 但是只是描述, 没有执行. 统一查询层是web service, 因此他的程序边界在向外部client返回http response的地方. 因此, 在返回http response的代码中, 会调用IO[A].unsafeRunXXXX()
方法, 执行计算.
前面asynchronous boundary讲到, 异步任务是任务逻辑执行不依赖主程序control flow或者当前call stack的任务. 异步根本的需求是解耦计算逻辑. 有一些场景, 需要用到异步的机制, such as 加锁. 加锁是个危险操作, 可能引起block. 因此, 我们不能用一个线程一直同步的等待锁, 当遇到加锁时, 当前线程先切换其他任务, 等到需要锁的任务成功获得了锁, 再返回执行加锁任务. 统一查询层中, freemarker渲染模板就是这个模式. 下面结合代码描述.
def renderPromQl(dataModel: TemplateDataModel,
template: String,
fields: List[MonitorItemField],
aliases: List[FieldAlias]): IO[String] =
globalCS.evalOn(unboundEC)(IO.delay {
val templateLoader = new StringTemplateLoader
templateLoader.putTemplate("template", template)
this.synchronized {
fmConfig.setTemplateLoader(templateLoader)
Try {
val sw = new StringWriter
fmConfig.getTemplate("template").process(dataModel2Map(dataModel, fields, aliases), sw)
sw.toString
} match {
case Success(rendered) =>
logger.debug(s"rendered template string: $rendered")
rendered
case Failure(exception) =>
logger.error(exception)("freemarker render failed.")
throw exception
}
}
})
globalCS.evalOn(unboundEC)(ioInstance)
完成了异步切来切去的操作. 这里隐含了两个线程池globalEC
和unboundEC
. ioInstance
是blocking加锁逻辑. 程序的控制先是在globalEC
中, 执行到加锁逻辑时, 当前线程主动放弃手头任务, 把任务shift到unboundEC
中, 转而执行其他任务. unboundEC
完成加锁后, 再把任务shift回globalEC
. 这样一个任务经过了两次异步shift, 达到了合理分配了线程资源的目的.
首先明确一个定义, 并发必然是异步. 如果把并发和异步建模成trait(typeclass), 那么并发是异步的sub-type:
trait Concurrent[F[_]] extends Async[F]
有两个IO实例ioa和iob, 他们描述了两个不同的计算逻辑, 如何让他们两个并发执行? 答案是将IO实例转化成Fiber实例.
Fiber是一种green thread. 调用IO.start
方法, 可以把IO实例转化成Fiber实例. Fiber会在background执行. 如果要等待执行结果或者取消background执行, 可以调用join或cancel方法:
trait Fiber[F[_], A] {
def cancel: F[Unit]
def join: F[A]
}
Apm项目中, 生成报表要发起多个即席查询请求. 某些查询不互相依赖, 因此可以并发查询以降低相应时间. 并发查询的代码举例如下:
for {
countF <- (queryService.query(countTopQuery) >>= generateCountReportItem).start
p0p1CountF <- (queryService.query(p0p1CountTopQuery) >>= generateCountReportItem).start
resolveDurationF <- (queryService.query(resolveDurationTopQuery) >>= generateDurationReportItem).start
p0p1ResolveDurationF <- (queryService.query(p0p1ResolveDurationTopQuery) >>= generateDurationReportItem).start
count <- countF.join
p0p1Count <- p0p1CountF.join
resolveDuration <- resolveDurationF.join
p0p1ResolveDuration <- p0p1ResolveDurationF.join
} yield {
val r = AlertTopReport(count, p0p1Count, resolveDuration, p0p1ResolveDuration)
AlertTopReportResp(Status.success, r)
}
这里将四种查询并发执行, 4个IO实例转化成了4个fiber, 并且在main control flow里调用join等待每个fiber的返回结果.
同步, 异步, 并发等概念和他们的操作被抽象成一个个type class, IO data type 实现了所有的type class, 因此IO能够描述同步, 异步, 并发等计算逻辑. Typeclass, 是FP实现多态的方法, 详细解释请看这个文档的第一章 scala-with-cats.pdf
如果要实现周期性的执行某个IO实例, 就需要使用Timer data type. 每天计算应用稳定性报表的代码如下:
// 周期性计算top表
// eagerly, 10分钟判断一次, 每天只会计算一次
def periodicCompute(lastDateKey: Option[String] = None): IO[Unit] = {
val curDateTime = DateTime.now(defaultTimeZone)
val dateKey = curDateTime.withTimeAtStartOfDay.minusSeconds(1).toString(dateFormat)
val reportStartTime = curDateTime.minusDays(7).withTimeAtStartOfDay.getMillis
val reportEndTime = curDateTime.withTimeAtStartOfDay.minusSeconds(1).getMillis
if (lastDateKey.contains(dateKey)) {
// 今天的已经计算过, 跳过
implicitly[Timer[IO]].sleep(retryPause) >> periodicCompute(lastDateKey)
} else {
for {
_ <- IO.shift
_ <- IO(.../* 具体计算逻辑 */)
_ <- implicitly[Timer[IO]].sleep(retryPause) >> /* 循环 */ periodicCompute(Some(dateKey))
} yield ()
}
}
implicitly[Timer[IO]].sleep(retryPause) >> periodicCompute(Some(dateKey))
这段代码实现了周期循环的功能.
这里的循环是通过无限递归实现的flatMap实现的, 配合IO.shift, 不会爆栈. 详细解释见https://typelevel.org/cats-effect/datatypes/io.html#shift
开启/关闭一个InputStream或者新建/销毁一个数据库连接池, 都是side-effect操作. 这些资源一般要经历获取/使用/释放三个阶段. 我们把获取/使用/释放资源都用IO类型表描述, Resource类型抽象了整个资源使用生命周期. 统一查询层中, 要临时创建k8s client, 访问client, 访问完成后销毁client. 使用Resource类型实现代码如下:
// 定义创建和销毁client
private def buildClient(certsCaData: String,
masterUrl: String,
token: String): Resource[IO, DefaultKubernetesClient] =
Resource.make {
IO.delay {
val config = new ConfigBuilder()
.withCaCertData(certsCaData)
.withMasterUrl(masterUrl)
.withOauthToken(token)
.build()
new DefaultKubernetesClient(config)
}
} { client =>
IO.delay(client.close())
}
// 使用client
client.use { kClient =>
IO.delay {
/* 具体使用逻辑 */
}
}
每次client.use
之后, client会自动销毁. 注意make
, use
方法里面都是IO实例描述的side-effect逻辑.
有几点:
- 依赖注入都是通过构造函数注入完成, 并且都在
com.qunhe.ep.monitor.apm.Server的main
函数中 - 所有http enpoint定义, route, 序列化定义都在
com.qunhe.ep.monitor.apm.HttpEndPoints
中定义 com.qunhe.ep.monitor.apm.service
package包含了具体业务逻辑. Package按domain划分文件. 每个domain用一个class和companion object描述.