• 可以做一下监控,相当于 diff 里面的 json 数据,发生改动了,企微机器人或者钉钉机器人通知

  • 弱弱问一句,荔枝还招人么?哈哈哈

  • 几种思路可参考一下:

    1. mock 数据,模拟上游系统的数据,mock 数据很多种方法,直接请求下游入参接口或者直接 sql 逻辑入库,也可以写个存储过程之类的
    2. 弄个造数脚本,并且 api 化,执行用例前直接请求造数 api 后,再执行用例
    3. 自动化框架是纯 code 方式,弄个用例基类,通过用例继承,执行用例时,将造数相关的方法执行
    4. 自动化框架是数据驱动方式,维护一个前置条件,递归找所有的前置条件用例,放在一个集合中,循环执行集合中的所有前置用例后,再执行用例
  • python rsa 加密问题 at August 17, 2021
    import rsa
    import base64
    from common.FILE_PATH import PRIVATE_KEY_FILE_PATH,PUBLIC_KEY_FILE_PATH
    
    class RsaSign(object):
    
        @classmethod
        def to_encrypt(cls, msg, pub_key=None):
            """
            非对称加密
            :param msg: 待加密字符串或者字节
            :param pub_key: 公钥
            :return: 密文
            """
            if isinstance(msg, str):            # 如果msg为字符串, 则转化为字节类型
                msg = msg.encode('utf-8')
            elif isinstance(msg, bytes):        # 如果msg为字节类型, 则无需处理
                pass
            else:                               # 否则抛出异常
                raise TypeError('msg必须为字符串或者字节类型!')
    
            if not pub_key:                     # 如果pub_key为空, 则使用全局公钥
                with open(PUBLIC_KEY_FILE_PATH, 'rb') as file:   # 读取公钥
                    pub_key = file.read()
    
            elif isinstance(pub_key, str):      # 如果pub_key为字符串, 则转化为字节类型
                pub_key = pub_key.encode('utf-8')
            elif isinstance(pub_key, bytes):    # 如果msg为字节类型, 则无需处理
                pass
            else:                               # 否则抛出异常
                raise TypeError('pub_key必须为None、字符串或者字节类型!')
            #后端开发密钥格式为pkcs8格式公钥,'-----BEGIN PUBLIC KEY-----' 用load_pkcs1_openssl_pem()这个方法
            #pkcs1格式公钥用load_pkcs1()这个方法
            public_key_obj = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key)  # 创建 PublicKey 对象
    
            cryto_msg = rsa.encrypt(msg, public_key_obj)  # 生成加密文本
            cipher_base64 = base64.b64encode(cryto_msg)   # 将加密文本转化为 base64 编码
    
            return cipher_base64.decode()   # 将字节类型的 base64 编码转化为字符串类型
    
    
        @classmethod
        def to_decrypt(cls, msg, pri_key=None):  # 用私钥解密
            if isinstance(msg, str):  # 如果msg为字符串, 则转化为字节类型
                msg = msg.encode('utf-8')
            elif isinstance(msg, bytes):  # 如果msg为字节类型, 则无需处理
                pass
            else:  # 否则抛出异常
                raise TypeError('msg必须为字符串或者字节类型!')
    
            if not pri_key:  # 如果pub_key为空, 则使用全局公钥
                with open(PRIVATE_KEY_FILE_PATH, 'rb') as file:  # 读取公钥
                    pri_key = file.read()
    
            elif isinstance(pri_key, str):  # 如果pub_key为字符串, 则转化为字节类型
                pri_key = pri_key.encode('utf-8')
            elif isinstance(pri_key, bytes):  # 如果msg为字节类型, 则无需处理
                pass
            else:  # 否则抛出异常
                raise TypeError('pub_key必须为None、字符串或者字节类型!')
                # python只支持pkcs1格式的私钥解密,需要使用openssl将pkcs8转为pkcs1格式
            #https://blog.csdn.net/sanpangouba/article/details/100997735
            privateKey_key_obj = rsa.PrivateKey.load_pkcs1(pri_key)  # 创建 PublicKey 对象
    
            msg=base64.b64decode(msg) #base64 解码
    
            decrypt_msg = rsa.decrypt(msg, privateKey_key_obj)  # 生成加密文本
    
            return decrypt_msg.decode('utf-8')  # 将字节类型的文本转化为字符串类型
    
        @classmethod
        def create_keys(cls):  # 生成公钥和私钥
            # 设置1024位长度
            (pubkey, privkey) = rsa.newkeys(1024)
            #密钥格式为pkcs1格式, -----BEGIN RSA PUBLIC KEY-----
            pub = pubkey.save_pkcs1()
            with open(PUBLIC_KEY_FILE_PATH, 'wb+')as f:
                f.write(pub)
            pri = privkey.save_pkcs1()
            with open(PRIVATE_KEY_FILE_PATH, 'wb+')as f:
                f.write(pri)
    
        @classmethod
        def generate_sign(cls, message):
            """
            生成sign
            :param message: message, 代加密内容, 为str类型
            :return: 加密后字符串
            """
            sign = cls.to_encrypt(message)  # 生成sign
            return sign
    
        @classmethod
        def decrypt_sign(cls,message):
            """
            解密sign
            :param message: message, 加密内容, 为str类型
            :return: 解密后字符串
            """
            sign = cls.to_decrypt(message)  # 解密sign
            return sign
    
    
    
    
    if __name__ == '__main__':
        msg='F5^t~#6H'
        aaaa=RsaSign.generate_sign(msg)
        print(aaaa)
    

    试一下将公钥保存成文件试试

    -----BEGIN PUBLIC KEY-----
    MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDIvvJN1o5dsjSx2c8Ju0W5nbuli
    ZuU/xJ00julQ2r4VJYnYRCzlLjYaWJvL60HJ9DiEeTo6LzgM7cq5PM9aqI6sLo8T1
    zuUJ7qYWYAdH1tCHXnXT9tRJ/e+TT7scv5H7mOViao28Ne+5dawVvSZtODwUT+fbS
    vpk9IUYJ+13jvRwIDABCD
    -----END PUBLIC KEY-----
    
  • 荔枝大佬厉害!

  • 无敌哥牛逼

  • 填坑 (两个解决方案)

    博主 - 测试开发坑货(人称无敌哥)刮遍了整个搜索引擎和 fastapi 的 issues,找到了两个解决方案,无敌卷魔牛逼!

    自定义中间件-Middleware

    在 FastAPI 应用中使用中间件。
    中间件实际上是一个函数,在每个 request 处理之前被调用,同时又在每个 response 返回之前被调用。

    1. 首先接收访问过来的 request。
    2. 然后针对 request 或其他功能执行自定义逻辑。
    3. 传递 request 给应用程序继续处理。
    4. 接收应用所产生的 response。
    5. 然后针对 response 或其他功能执行自定义逻辑。
    6. 返回 response。

    详细说明可看官方文档:https://fastapi.tiangolo.com/tutorial/middleware/?h=middleware

    下面是无敌哥 pity 平台代码,详细可看 git(https://github.com/wuranxu/pity

    async def set_body(request: Request, body: bytes):
        async def receive() -> Message:
            return {"type": "http.request", "body": body}
    
        request._receive = receive
    
    
    async def get_body(request: Request) -> bytes:
        body = await request.body()
        await set_body(request, body)
        return body
    
    
    @pity.middleware("http")
    async def errors_handling(request: Request, call_next):
        body = await request.body()
        try:
            await set_body(request, await request.body())
            return await call_next(request)
        except Exception as exc:
            return JSONResponse(
                status_code=status.HTTP_200_OK,
                content=jsonable_encoder({
                    "code": 110,
                    "msg": str(exc),
                    "request_data": body,
                })
            )
    

    参考 issues:https://github.com/tiangolo/fastapi/issues/394
    https://stackoverflow.com/questions/61358669/raise-exception-in-python-fastapi-middleware

    自定义路由类-APIRoute

    在某些情况下,您可能希望覆盖 Request 和 APIRoute 类使用的逻辑。特别是,这可能是中间件中逻辑的一个很好的替代方案。例如,如果您想在应用程序处理请求正文之前读取或操作请求正文。
    详细说明可看官方文档:https://fastapi.tiangolo.com/advanced/custom-request-and-route

    class ErrorRouter(APIRoute):
        def get_route_handler(self) -> Callable:
            original_route_handler = super().get_route_handler()
    
            async def custom_route_handler(request: Request) -> Union[Response]:
                try:
                    return await original_route_handler(request)
                except Exception as exc:
                    log_msg = f"造数平台捕获到系统错误:请求路径:{request.url.path}\n"
                    params = request.query_params
                    if params:
                        log_msg += f"路径参数:{params}\n"
                    boby = await request.body()
                    if boby:
                        body = json.dumps(json.loads(boby),ensure_ascii=False)
                        log_msg +=f"请求参数:{body}\n"
                    if isinstance(exc, NormalException):
                        return JSONResponse(
                            status_code=status.HTTP_200_OK,
                            content={
                                "responseCode": Status.SYSTEM_EXCEPTION.get_code(),
                                "responseMsg": exc.errorMsg
                            },
                        )
                    elif isinstance(exc, RequestValidationError):
                        message = ""
                        for error in exc.errors():
                            message += str(error.get('loc')[-1]) + ":" + str(error.get("msg")) + ","
                        return JSONResponse(
                            status_code=status.HTTP_200_OK,
                            content=jsonable_encoder({
                                "responseCode": Status.PARAM_ILLEGAL.get_code(),
                                "responseMsg": Status.PARAM_ILLEGAL.get_msg() + message[:-1]
                            })
                        )
                    log_msg +=f"错误信息:{str(exc.args[0])}"
                    mylog.error(log_msg)
                    if PlatConfig.SWITCH == 1:
                        WeGroupChatBot.send_text(log_msg)
                    return JSONResponse(
                        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                        content=jsonable_encoder({
                            "responseCode": Status.FAIL.get_code(),
                            "responseMsg": Status.FAIL.get_msg(),
                            "errorMsg":str(exc.args[0])
                        },
                        ))
            return custom_route_handler
    
    def APIRouter():
        router = AppRouter()
        router.route_class = ErrorRouter
        return router
    

    统一处理之后,再通过类型判断 Exception,返回不同的 Response~
    注意:用了自定义错误路由,就不能再用 @app.exception_handler否则会重复捕获!!!

    参考 issues:https://github.com/tiangolo/fastapi/issues/1216
    https://github.com/tiangolo/fastapi/issues/2750

  • 记一次不该发生的 bug at July 29, 2021

    总结一句话,涉及到金额的,以后端计算为准的,前端只是试算。

  • 赞同大佬的说法!

  • 自己写个回调接口,然后让开发对接起来

  • 1、问开发相关接口的逻辑,自己通过代码逻辑入库,坑比较多
    2、存储过程

  • 了解,现在直接用 fastapi 写造数服务的

  • 刚搜索了一下,发现 yapi 可以满足我的需求

  • 谢谢各位大佬的回复,小弟感激不尽,也想了一下如何处理以上的困惑

    • 小伙伴在写造数脚本时,尽量写明注释,这里实现是什么的功能,最好把调用的接口文档地址也注明一下
    • 组内推动规范性,我这边出遍指导文章,教导小伙伴如何规范,统一写法,因为自身不是领导级别的,可能也比较难推动,可能需要小组长进行规范
    • 关于平台化的话,目前也不是十分需要,个人也对前端不是很熟悉,以后平台的话,可以做类似大佬这段话的功能 可以配置接入不同组的 http 服务,并基于 swagger 提供的内容,生成对应的界面。

  • 引用博主饭佬的一句话~此贴关闭讨论~

  • Python 熟练了,那就学学 Java,看看开发代码,理解某些功能的实现逻辑,go 可以做一些压测工具

  • 学习下 MQ at June 18, 2021

    想起老带新,这不就是我以前接触过的分销业务😂 😂 订单系统生产 mq 过去,分润系统消费 mq,分润结果返回给订单系统;因为分润逻辑可能会经常改动,所以分润这一块独立出来比较好

  • 手动编排试过了,其他人说难排 (其实是懒)

  • 第 3 点,我理解的是生成 [4, 2, 4, 3, 1]

  • 谢谢大佬的建议😀

  • 不要为了 KPI 而 KPI,要想清楚,接口平台到底能带来什么收益

  • 是啊,没办法了,解决痛点就好了😂

  • 这个之前问过开发,开发说公司内部系统对接大多走的是 Dubbo 协议,这是公司的开发规范

  • 我这里捕获的是全局报错,不是那种手动抛出异常的。

    这段代码是手动捕获异常的处理

    # 通用异常处理
    class NormalException(Exception):
        def __init__(self, errorMsg: str,body:dict):
            self.errorMsg = str(errorMsg)
            self.body = str(body, encoding = "utf-8")
        # 手动捕获异常处理
        @user1r(NormalException)
        async def unicorn_exception_handler(request: Request, exc: NormalException):
            log_msg = f"手动捕获异常成功====请求路径:{request.url.path}\n请求参数:{exc.body}\n错误信息:{exc.errorMsg}"
            mylog.info(log_msg)
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={
                    "responseCode": Status.SYSTEM_EXCEPTION.get_code(),
                    "responseMsg":Status.SYSTEM_EXCEPTION.get_msg(),
                    "errorMsg" : exc.errorMsg
                },
            )