Skip to content

Instantly share code, notes, and snippets.

@SabaPing
Last active July 29, 2024 08:05
Show Gist options
  • Save SabaPing/8413e933724fc7413bbbf1746aba41ba to your computer and use it in GitHub Desktop.
Save SabaPing/8413e933724fc7413bbbf1746aba41ba to your computer and use it in GitHub Desktop.
Cats-Effect简介

Cats-Effect

Introduction

image.png 这篇文档介绍了统一查询层的底层运行库 -- cats-effect.  统一查询层是个综合工程, 在这里我们选取其中的一个 -- apm module作为例子, 分析cats-effect的原理和具体用法. 

apm module是一个web service项目, 他连接了前端请求和后端数据存储. apm module提供的api基本遵循一个统一的模式:

前端请求q1 --> 将q1转化成即席查询请求q2 --> 从mysql补全q2的元信息 --> 发起即席查询(es/druid/prometheus), 获得结果q2 --> 将q2转化成前端需要的格式q1 --> 返回给前端

作为一个重IO任务的web service, apm module 需要处理以下几个关键问题:

  • 管理http server链接池
  • 管理mysql元数据库链接池
  • 管理http client资源, client用于查询es/druid/promethus
  • 不同IO逻辑之间不能同步依赖, 需要异步解耦
  • 管理cpu资源. cpu既要处理解析请求/返回结果等cpu-bound任务, 还要处理mysql查询/http client请求等io-bound任务, 最后还要处理有各种task的调度
  • 实现周期性/定时的执行某个逻辑
  • 能够支撑100qps左右的用户查询和警报查询的并发流量

下面文档将先介绍一些并发的基础知识, 接着结合具体使用场景讲一下IOFiberResourceContextShiftTimercats-effect重要组件的含义和使用方法, 最后从整体角度说明apm module的代码组织方式.

并发基础

首先要明确, 并发不是一个简单的话题. 这里只把几个最重要的概念拿出来强调一下. 这些概念跟统一查询层工程实现和cats-effect有密切的关系.

CPU-bound任务 和 IO-bound任务

CPU-bound任务指的是主要消耗处理器资源的任务, 例如序列化, 正则表达式匹配, 对数据加减乘除等等. IO-bound任务指任务主要在进行I/O, 并不依赖处理器资源. 例如等待磁盘操作, 等待外部服务响应请求等等. 另外还有non-terminating任务, 它指不会结束且不消耗处理器资源或者不会阻塞线程的任务.

线程和线程池

在JVM中, 线程与操作系统的native线程是1:1的关系. 在任一时刻, 每个core只能运行1个线程. 如果线程很多, 我们会碰到context switch问题, 消耗额外的CPU资源. 因此执行CPU-bound任务最高效的方式是有多少核就跑多少线程. 对于生命周期很短的任务, 每个任务都创建一个线程是很低效的模式; 而且如果创建了过多的线程, 内存就会被过度消耗. 针对这些问题, 我们可以用线程池解决. 线程池由一个任务队列和一定数量的工作线程组成.每个任务都会先进入任务队列, 线程池消费任务队列, 分发任务给某个工作线程, 工作线程完成任务.

线程池分类

根据最佳实践, JVM的线程池应该被分成三类:

  • CPU-bound
  • Blocking IO
  • Non-block IO polling

每种线程池都有不同的使用模式. 对于CPU-bound任务, 我们应该使用bounded线程池. 它是预先创建的, 并且线程数量固定, 等于CPU核数. 也就是fixedThreadPool. 如果在fixedThreadPool中blocking, 珍贵的CPU资源就会被白白的消耗. 我们必须想尽一切办法避免在CPU-bound线程池中blocking, 所以我们把blocking操作转移到另一个独立的线程迟中 -- 一个caching, unbounded并且不预先设定线程池大小的线程池. 使用这种unbounded线程池时, 必须前置流量控制逻辑, 不然可能会OOM. 最后一种线程池是异步IO poll线程池. 它里面的线程不断的询问kernel "有没有异步的IO通知啊, 亲?", 并且把通知传达给程序其他部分. 我们用很小的, 最高优先级的fixedThreadPool来满足这个需求. 这种线程池的大小一般只有1. 注意, 千万不能在这类线程池中运行CPU-bound任务或者blocking逻辑 -- 即使是1纳秒的延迟, 也会大幅度拖慢整个程序. 下面的漫画图生动的展示了这三种线程池的使用模式: image.png

绿色线程(Green thread)

绿色线程是cats-effect平台层面的线程. 他们不是由操作系统调度的, 因此更轻量级, 可以同时创建很多绿色线程. 绿色线程的基础概念来自于Cooperative multitasking. 指线程不是被动的被抢断(preempted), 而是主动放弃control. 绿色线程在很多语言的并发模型中有重要地位, 例如golang的goroutine. 在cats-effect中, FiberIO shift两个重要机制设计都来源于此.

阻塞线程与异步阻塞

线程阻塞的概念大家应该都很熟悉了, 这里借助线程阻塞介绍异步阻塞的概念. 我们应该尽可能的避免线程阻塞. 但是很多时候不得不这样做. 上面提到, 当不得不处理阻塞线程时, 可以把I/O阻塞任务"shift"到一个专门用于阻塞的unbounded线程池里面运行. 使用unbounded线程池和IO shift技术, 处理线程阻塞代码例子如下:

import java.util.concurrent.Executors
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val blockingEC = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

def blockingOp: IO[Unit] = IO(/* I/O操作 */ ())
def doSth(): IO[Unit] = IO(/* 计算操作 */ ())

val prog =
  for {
    _ <- contextShift.evalOn(blockingEC)(blockingOp) // blockingOp被shift到blockingEC, 结束后又被shift回隐式的globalEC
    _ <- doSth()                                     
  } yield ()

上面的代码中, 我们通过IO shift实现了异步阻塞. 异步阻塞指的是阻塞任务(或者说阻塞绿色线程, 阻塞fiber), 但不阻塞线程. 其他未被阻塞的任务还是可以在当前线程中运行. 这样的阻塞方式浪费资源很少, 十分高效. 上面代码中的blockingOp就是被异步阻塞了.

线程调度与异步边界

操作系统通过时间片调度native线程, 当时间片结束时, 当前运行线程被抢断(preempted), 发生上下文切换, 另一个线程运行. 这种调度机制保证了不同线程之间的fairness. 在线程池的抽象下, 调度方式有所不同 -- 线程池按顺序消费任务队列, 每个任务要执行完才有可能执行下一个任务. 也就是说线程池内部的不同任务无法互相抢断.  为了在线程池中实现多任务的fairness, 需要一个任务主动放弃控制的机制(上面提到的Cooperative multitasking), 要么暂停当前任务把它放到队尾, 要么把当前任务转移到其他线程池. 如果把一个任务的逻辑看做一步步先后序执行语句的同步过程, 让这个任务执行到一半时主动放弃控制运行其他任务, 就叫做插入了一个异步边界(asynchronous boundary) -- 任务的前后两部分是异步执行的.

Asynchronous boundary是响应式编程的核心概念, 详细解释看这个官方解释reactive-streams/reactive-streams-jvm#46

IO shift****就是显式的插入异步边界.

下面举一个简单例子说明异步边界的用法. 一个不断打印数字的程序:

import java.util.concurrent.Executors
import cats.effect.{ContextShift, Fiber, IO}
import cats.syntax.apply._
import scala.concurrent.ExecutionContext

// 大小只有1的线程池
val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

val cs: ContextShift[IO] = IO.contextShift(ec)

def infiniteIO(id: Int)(cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
  // 无限循环
  def repeat: IO[Unit] = IO(println(id)).flatMap(_ => repeat)
	// 创建一个fiber, 在background运行
  repeat.start(cs)
}

val prog =
  for {
    _ <- infiniteIO(1)(cs) // 第一个任务已经把线程池占满了
    _ <- infiniteIO(11)(cs) // 第二个任务永远无法执行
  } yield ()

prog.unsafeRunSync()

这个程序只会不断打印1, 永远不会打印11. 把repeat函数修改一下, 通过IO shift插入异步边界:

def infiniteIO(id: Int)(cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {

  def repeat: IO[Unit] = IO(println(id)).flatMap(_ => IO.shift *> repeat)
  
  repeat.start(cs)
}

这样修改之后, 每次打印结束, 整个任务就会重新进入线程池任务队列的队尾. 程序跑稳定之后会交替打出1和11.

Scala自带的Future对任务的fairness做了特殊优化 -- 在每个map和flatMap之后都插入了异步边界.

重要Data types与使用场景

IO[A] -- side-effect wrapper

看了统一查询层代码, 你会发现到处都是IO类型. IO到底是个啥, 为啥他的应用这么广泛呢?

这里的IO是一个类型, 不是I/O操作的那个IO

IO类型是side-effect wrapper. IO杂糅了来自functional programming和asynchronous and concurrent programming两个领域的多个概念, 下面我们一层层剥开它神秘的面纱.

从函数式编程角度

FP要求pure functions:

  • Total — Functions always return an output for every input.
  • Deterministic — Functions return the same output for the same input.
  • Pure — The only effect of providing a function an input is computing the output.

关于为啥FP要这么做, 根本上都是为了程序的scalability, 为了代码能更容易的compose和decompose.  Pure functions are easier to understand, easier to test, easier to refactor, and easier to abstract over.

但是程序运行必须有side-effect. 由纯函数组成的程序不直接与外界直接接触, 因为如果发生接触, 必然会产生side-effect. 为了与接触, FP程序需要构造并返回一种描述side-effect的immutable的数据结构, 这种数据结构叫做functional effects. IO类型是functional effects的一种实现.

一个IO[A]类型的值描述一个计算, 当调用特定"unsafe"方法时, 它描述的计算被执行, 执行结束后返回一个类型A的值

注意这个定义中的两个关键词 -- 描述 和 执行:

  • 创建一个IO[A]类型的变量并给他赋值, 只是描述一个计算, 计算没有真正执行.
  • 当调用IO[A].unsafeRunXXXX()时IO[A]描述的计算才被执行, 这时候会产生side-effect

我们要以monadic的风格, 使用map或者flatMap串联各个IO, 最后在"very end of your program"只调用一次unsafe方法运行计算. 这是FP处理side-effect的通用模式. 下面我们通过一个统一查询层的例子来说明IO的使用.

QueryService中有个方法query, query方法的定义如下:

  def query[T <: QueryPhrase](req: T): IO[ExecutionResult] =
    for {
      metas <- fetchMetas(req.table)
      exe   <- selectExecutor(metas.item.dbType)
      result <- exe.queryExternalDb(req)
    } yield result

fetchMetas从mysql获得查询的元数据, selectExecutor根据数据库类型获得对应的executor, Executor.queryExternalDb查询db. 这三个都是IO-bound操作, 都有side-effect. fetchMetas返回类型是IO[Meta]. 当程序跑到 metas <- fetchMetas(req.table) 这一行时, 创建了一个描述查询mysql元数据的, 类型为IO[Meta]的变量metas.这时并不会真的去查mysql. 同样道理, exe   <- selectExecutor(metas.item.dbType)也只是创建了一个类型为IO[QueryExecutor]的变量exe, 并没有真正生成一个executor. 三个个IO类型的变量metasexeresult, 通过flatMap串联在一起, 意思是先查myql元数据, 成功后, 再生成一个executor, 成功后, 最后查询数据库获得结果. 串联后, 整个函数只返最后result的类型IO[ExecutionResult]. 那么真正的计算在何时执行呢? 这个query函数, 还会继续跟其他IO[A]串联, 最终形成一个大IO[A]变量. 这个变量几乎描述了整个code path的计算逻辑, 但是只是描述, 没有执行. 统一查询层是web service, 因此他的程序边界在向外部client返回http response的地方. 因此, 在返回http response的代码中, 会调用IO[A].unsafeRunXXXX()方法, 执行计算.

从异步的角度

前面asynchronous boundary讲到, 异步任务是任务逻辑执行不依赖主程序control flow或者当前call stack的任务. 异步根本的需求是解耦计算逻辑. 有一些场景, 需要用到异步的机制, such as 加锁. 加锁是个危险操作, 可能引起block. 因此, 我们不能用一个线程一直同步的等待锁, 当遇到加锁时, 当前线程先切换其他任务, 等到需要锁的任务成功获得了锁, 再返回执行加锁任务. 统一查询层中, freemarker渲染模板就是这个模式. 下面结合代码描述.

  def renderPromQl(dataModel: TemplateDataModel,
                   template: String,
                   fields: List[MonitorItemField],
                   aliases: List[FieldAlias]): IO[String] =
    globalCS.evalOn(unboundEC)(IO.delay {
      val templateLoader = new StringTemplateLoader
      templateLoader.putTemplate("template", template)
      this.synchronized {
        fmConfig.setTemplateLoader(templateLoader)
        Try {
          val sw = new StringWriter
          fmConfig.getTemplate("template").process(dataModel2Map(dataModel, fields, aliases), sw)
          sw.toString
        } match {
          case Success(rendered) =>
            logger.debug(s"rendered template string: $rendered")
            rendered
          case Failure(exception) =>
            logger.error(exception)("freemarker render failed.")
            throw exception
        }
      }
    })

globalCS.evalOn(unboundEC)(ioInstance)完成了异步切来切去的操作. 这里隐含了两个线程池globalECunboundECioInstance是blocking加锁逻辑. 程序的控制先是在globalEC中, 执行到加锁逻辑时, 当前线程主动放弃手头任务, 把任务shift到unboundEC中, 转而执行其他任务. unboundEC完成加锁后, 再把任务shift回globalEC. 这样一个任务经过了两次异步shift, 达到了合理分配了线程资源的目的.

从并发的角度

首先明确一个定义, 并发必然是异步. 如果把并发和异步建模成trait(typeclass), 那么并发是异步的sub-type:

trait Concurrent[F[_]] extends Async[F]

image.png

有两个IO实例ioa和iob, 他们描述了两个不同的计算逻辑, 如何让他们两个并发执行? 答案是将IO实例转化成Fiber实例. Fiber是一种green thread. 调用IO.start方法, 可以把IO实例转化成Fiber实例. Fiber会在background执行. 如果要等待执行结果或者取消background执行, 可以调用join或cancel方法:

trait Fiber[F[_], A] {
  def cancel: F[Unit]
  def join: F[A]
}

Apm项目中, 生成报表要发起多个即席查询请求. 某些查询不互相依赖, 因此可以并发查询以降低相应时间. 并发查询的代码举例如下:

for {
      countF               <- (queryService.query(countTopQuery) >>= generateCountReportItem).start
      p0p1CountF           <- (queryService.query(p0p1CountTopQuery) >>= generateCountReportItem).start
      resolveDurationF     <- (queryService.query(resolveDurationTopQuery) >>= generateDurationReportItem).start
      p0p1ResolveDurationF <- (queryService.query(p0p1ResolveDurationTopQuery) >>= generateDurationReportItem).start
      count                <- countF.join
      p0p1Count            <- p0p1CountF.join
      resolveDuration      <- resolveDurationF.join
      p0p1ResolveDuration  <- p0p1ResolveDurationF.join
    } yield {
      val r = AlertTopReport(count, p0p1Count, resolveDuration, p0p1ResolveDuration)
      AlertTopReportResp(Status.success, r)
    }

这里将四种查询并发执行, 4个IO实例转化成了4个fiber, 并且在main control flow里调用join等待每个fiber的返回结果.

同步, 异步, 并发等概念和他们的操作被抽象成一个个type class, IO data type 实现了所有的type class, 因此IO能够描述同步, 异步, 并发等计算逻辑. Typeclass, 是FP实现多态的方法, 详细解释请看这个文档的第一章 scala-with-cats.pdf

Timer[F[_]] -- task scheduler

如果要实现周期性的执行某个IO实例, 就需要使用Timer data type. 每天计算应用稳定性报表的代码如下:

  // 周期性计算top表
  // eagerly, 10分钟判断一次, 每天只会计算一次
  def periodicCompute(lastDateKey: Option[String] = None): IO[Unit] = {

    val curDateTime     = DateTime.now(defaultTimeZone)
    val dateKey         = curDateTime.withTimeAtStartOfDay.minusSeconds(1).toString(dateFormat)
    val reportStartTime = curDateTime.minusDays(7).withTimeAtStartOfDay.getMillis
    val reportEndTime   = curDateTime.withTimeAtStartOfDay.minusSeconds(1).getMillis

    if (lastDateKey.contains(dateKey)) {
      // 今天的已经计算过, 跳过
      implicitly[Timer[IO]].sleep(retryPause) >> periodicCompute(lastDateKey)
    } else {
      for {
        _        <- IO.shift
        _        <- IO(.../* 具体计算逻辑 */)
        _        <- implicitly[Timer[IO]].sleep(retryPause) >> /* 循环 */ periodicCompute(Some(dateKey))
      } yield ()
    }
  }

implicitly[Timer[IO]].sleep(retryPause) >> periodicCompute(Some(dateKey)) 这段代码实现了周期循环的功能. 

这里的循环是通过无限递归实现的flatMap实现的, 配合IO.shift, 不会爆栈. 详细解释见https://typelevel.org/cats-effect/datatypes/io.html#shift

Resource[F[_], A] -- resource acquisition, usage, and release

开启/关闭一个InputStream或者新建/销毁一个数据库连接池, 都是side-effect操作. 这些资源一般要经历获取/使用/释放三个阶段. 我们把获取/使用/释放资源都用IO类型表描述, Resource类型抽象了整个资源使用生命周期. 统一查询层中, 要临时创建k8s client, 访问client, 访问完成后销毁client. 使用Resource类型实现代码如下:

// 定义创建和销毁client  
private def buildClient(certsCaData: String,
                          masterUrl: String,
                          token: String): Resource[IO, DefaultKubernetesClient] =
    Resource.make {
      IO.delay {
        val config = new ConfigBuilder()
          .withCaCertData(certsCaData)
          .withMasterUrl(masterUrl)
          .withOauthToken(token)
          .build()
        new DefaultKubernetesClient(config)
      }
    } { client =>
      IO.delay(client.close())
    }

// 使用client
client.use { kClient =>
  IO.delay {
    /* 具体使用逻辑 */
  }
}

每次client.use之后, client会自动销毁. 注意make, use方法里面都是IO实例描述的side-effect逻辑.

Apm module代码结构

有几点:

  • 依赖注入都是通过构造函数注入完成, 并且都在com.qunhe.ep.monitor.apm.Server的main函数中
  • 所有http enpoint定义, route, 序列化定义都在com.qunhe.ep.monitor.apm.HttpEndPoints中定义
  • com.qunhe.ep.monitor.apm.service package包含了具体业务逻辑. Package按domain划分文件. 每个domain用一个class和companion object描述.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment