孙军


**Dubbo 压测插件已开源,本文涉及代码详见gatling-dubbo

插件已开源,详见 gatling-dubbo【1】

上一篇《Dubbo 压测插件的实现——基于 Gatling》【2】中,我们介绍了基于 Dubbo 泛化调用实现的 Gatling Dubbo 压测插件,使用泛化调用发起 Dubbo 压测请求,consumer 端不需要拿到 provider 端的 API 包,使用上很便利,但是众所周知,Dubbo 泛化调用的性能不如普通 API 调用,虽然可以优化并使之达到与普通 API 调用相近的性能,但仍存在一些局限性。生产中除了网关等特殊应用外,一般很少使用泛化调用,如果以泛化调用的性能来表征生产中普通 API 调用的性能,其压测结论很难令人信服。做压测的时候,一般要求各种条件如环境等都尽可能保持一致。所以,我们又开发了基于普通 API 调用的 Gatling Dubbo 压测插件,即 gatling-dubbo 2.0。此外,依托于 Gatling 强大的基础能力,gatling-dubbo 2.0 相比于 Jmeter 还存在以下几方面的优势:

class DubboAction[A]( requestName:      Expression[String],  
                      f:                (Session) => A,
                      val executor:     ExecutorService,
                      val objectMapper: ObjectMapper,
                      checks:           List[DubboCheck],
                      coreComponents:   CoreComponents,
                      throttled:        Boolean,
                      val next:         Action
                    ) extends ExitableAction with NameGen {
  ......
  override def execute(session: Session): Unit = recover(session) {
    requestName(session) map { reqName =>
      val startTime = System.currentTimeMillis()
      val fu = Future {
        try {
          f(session)
        } finally {
        }
      }
      fu.onComplete {
        case Success(result) =>
          val endTime = System.currentTimeMillis()
          val resultJson = objectMapper.writeValueAsString(result)
          val (newSession, error) = Check.check(resultJson, session, checks)
          error match {
            case None =>
              statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("OK"), None, None)
              throttle(newSession(session))
            case Some(Failure(errorMessage)) =>
              statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
              throttle(newSession(session).markAsFailed)
          }
        case UFailure(e) =>
          val endTime = System.currentTimeMillis()
          statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
          throttle(session.markAsFailed)
      }
    }
  }
  private def throttle(s: Session): Unit = {
    if (throttled) {
      coreComponents.throttler.throttle(s.scenario, () => next ! s)
    } else {
      next ! s
    }
  }
}

DubboActionBuilder 负责创建线程池并实例化 DubboAction:

case class DubboActionBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck], threadPoolSize: Int) extends ActionBuilder {  
  override def build(ctx: ScenarioContext, next: Action): Action = {
    import ctx._
    val executor = Executors.newFixedThreadPool(threadPoolSize)
    val objectMapper: ObjectMapper = new ObjectMapper()
    new DubboAction[A](requestName, f, executor, objectMapper, checks, coreComponents, throttled, next)
  }
}

LambdaProcessBuilder 提供了设置 check 条件的 DSL 和 设置线程池大小的 DSL:

有赞的施压机是 4 核 8Gb 内存的,我们为其设置的默认线程池大小为 200,与 Dubbo 应用部署环境一致。你可以使用 DSL threadPoolSize(threadPoolSize: Int) 按照你的机器配置设置一个合适的线程池大小。如果施压机成了性能瓶颈,你可以考虑将其改造成集群来施压,具体可参考《有赞全链路压测引擎的设计与实现》【3】

case class DubboProcessBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck] = Nil, threadPoolSize: Int = 200) extends DubboCheckSupport {  
  def check(dubboChecks: DubboCheck*): DubboProcessBuilder[A] = copy[A](checks = checks ::: dubboChecks.toList)
  def threadPoolSize(threadPoolSize: Int): DubboProcessBuilder[A] = copy[A](threadPoolSize = threadPoolSize)
  def build(): ActionBuilder = DubboActionBuilder[A](requestName, f, checks, threadPoolSize)
}

2、Check

全链路压测中,我们使用 json path 校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 json path 的校验方法:

package object dubbo {  
  type DubboCheck = Check[String]
  val DubboStringExtender: Extender[DubboCheck, String] =
    (check: DubboCheck) => check
  val DubboStringPreparer: Preparer[String, String] =
    (result: String) => Success(result)
}
trait DubboJsonPathOfType {  
  self: DubboJsonPathCheckBuilder[String] =>
  def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)
}
object DubboJsonPathCheckBuilder {  
  val CharsParsingThreshold = 200 * 1000
  def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
    response => {
      if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
        jsonParsers.safeParseJackson(response)
      else
        jsonParsers.safeParseBoon(response)
    }
  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType
}
class DubboJsonPathCheckBuilder[X: JsonFilter](  
    private[check] val path:        Expression[String],
    private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
  extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
    DubboStringExtender,
    DubboJsonPathCheckBuilder.preparer(jsonParsers)
  ) {
  import extractorFactory._
  def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))
  def findAllExtractor = path.map(newMultipleExtractor[X])
  def countExtractor = path.map(newCountExtractor)
}

但有时候存在一些不规范的情况,dubbo 接口的返回结果并不能直接转化为 json,如返回了基本数据类型,所以我们还提供了自定义校验方法,可以将这样的返回结果转化为 String 类型,并使用字符串比较、正则表达式匹配等方法校验返回结果:

case class DubboCustomCheck(func: String => Boolean, failureMessage: String = "Dubbo check failed") extends DubboCheck {  
  override def check(response: String, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {
    func(response) match {
      case true => CheckResult.NoopCheckResultSuccess
      case _    => Failure(failureMessage)
    }
  }
}

DubboCheckSupport 则提供了 json pathcustom 两种检验方式的 DSL

trait DubboCheckSupport {  
  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    DubboJsonPathCheckBuilder.jsonPath(path)
  def custom = DubboCustomCheck
}

Dubbo 压测脚本中可以设置一个或多个 check 来校验请求结果

3、DSL

DubboDsl 提供顶层 DSL,隐式方法 dubboProcessBuilder2ActionBuilderScala 用于自动从 DubboProcessBuilder 构造 ActionBuilder

trait DubboDsl extends DubboCheckSupport {  
  def dubbo[A](requestName: Expression[String], f: (Session) => A) = DubboProcessBuilder[A](requestName, f)
  implicit def dubboProcessBuilder2ActionBuilder[A](builder: DubboProcessBuilder[A]): ActionBuilder = builder.build()
}

示例

压测脚本示例

class Mix extends Simulation {  
  val application = new ApplicationConfig()
  application.setName("gatling-dubbo")
  // 初始化 AService
  val referenceAService = new ReferenceConfig[AService]
  referenceAService.setApplication(application)
  referenceAService.setUrl("dubbo://IP:PORT/com.xxx.service.AService")
  referenceAService.setInterface(classOf[AService])
  val aService = referenceAService.get()
  // 初始化 BService
  val referenceBService = new ReferenceConfig[BService]
  referenceBService.setApplication(application)
  referenceBService.setUrl("dubbo://IP:PORT/com.yyy.service.BService")
  referenceBService.setInterface(classOf[BService])
  val bService = referenceBService.get()
  // 设置数据源
  val jsonFileFeeder = jsonFile("data.json").shuffle.circular
  val mixScenario = scenario("scenario of mix")
      .forever("tripsCount") {
      feed(jsonFileFeeder)
        .randomSwitch(11d -> exec(
          dubbo("com.xxx.service.AService.aMethod", fAMethod)
            .check(jsonPath("$.success").is("true"))
        )
        )
        .randomSwitch(4d -> exec(
          dubbo("com.yyy.service.BService.bMethod", fBMethod)
            .check(jsonPath("$.success").is("true"))
        )
        )
        .randomSwitch(5d -> exec(
          ......
        )
        ......
        )
    }
  setUp(mixScenario.inject(constantUsersPerSec(100) during (10 seconds)).throttle(reachRps(1000) in (1 seconds), holdFor(120 seconds)))
  // 设置 aMethod 的请求参数并调用
  def fAMethod(session: Session): Object = {
    val aParam = new AParam()
    aParam.setName("A Name");
    // 从 session 中获取动态参数并设置
    aParam.setAId(session.attributes("aId").asInstanceOf[Integer].toLong);
    aService.aMethod(aParam);
  }
  // 设置 bMethod 的请求参数并调用
  def fBMethod(session: Session): Object = {
    val bParam = new BParam()
    bParam.setAge(26)
    // 从 session 中获取动态参数并设置
    bParam.setBId(session.attributes("bId").asInstanceOf[Integer].toLong)
    bService.bMethod(bParam);
  }
  def fXxx(session: Session): Object = {
    ......
  }
}

randomSwitch 的作用:
以上示例其实是 gatling-dubbo 在有赞的一个典型使用场景,即评估一个应用的单实例性能。按生产环境真实的接口调用比例请求各个接口(该比例由场景执行各个请求的概率分布模拟),这样的压测结果就可以真实反映生产环境应用的单实例性能,并为容量报警、生产扩容等提供参考依据。

压测数据示例

[
  {
    "aId": 160,
    "bId": 859296
  },
  {
    "aId": 160,
    "bId": 1019040
  },
  {
    "aId": 160,
    "bId": 1221792
  },
  ......
]

压测数据使用 Json 数组保存,其中每一个 Json 对象都包含了一次压测请求所需的所有动态参数,且为了方便通过 session 设置动态参数,Json 对象中不再嵌套其他 Json 对象。

压测报告示例


我的系列博客
混沌工程 - 软件系统高可用、弹性化的必由之路
异步系统的两种测试方法
我的其他测试相关开源项目
捉虫记:方便产品、开发、测试三方协同自测的管理工具
招聘
有赞测试组在持续招人中,大量岗位空缺,只要你来,就能帮你点亮全栈开发技能树,有意向换工作的同学可以发简历到 sunjun【@】youzan.com
欢迎关注我们的公众号


↙↙↙阅读原文可查看相关链接,并与作者交流