之前检测某平台的运行情况,需要定时截取平台指定页面截图通知到钉钉群,当时的实现过程,存备忘录。

1:钉钉群里配置机器人 (群设置>智能群助手>添加机器人>选择自定义机器人>添加机器人,安全设置选择加签>复制加签的密钥和 webhook 链接)
2:selenium 截图并保存在本地
3:本地图片推送到外网(腾讯云 COS)指定路径
4:获取 step3 路径下的指定图片调用钉钉开放接口 (/robot/send) 推送至钉钉群
5:jenkins 配置任务,定时执行脚本
代码实现

from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
import os
import json
import hmac
import hashlib
import base64
import urllib
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import time
from selenium.webdriver.chrome.options import Options
filename ="monitor_"+time.strftime('%Y-%m-%d-%H_%M_%S', time.localtime(time.time()))+'.png'


class Notification () :
    def __init__(self):
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('window-size=1920x1080')
        chrome_options.add_argument('--start-maximized')
        self.driver = webdriver.Chrome(options=chrome_options ,executable_path='C:\\Users\\Administrator\\AppData\\Local\\Google\\Chrome\\Application\\chromedriver.exe')
        self.driver.maximize_window()

    def get_screen(self):
        try:
            self.driver.get("https://www.baidu.com/")
            # self.driver.implicitly_wait(2)
            if self.find_element('xpath', '//input[@id="su"]'):
                self.driver.save_screenshot(filename)
                time.sleep(2)
                self.send_dingding()
        except Exception as ex:
            print("获取主页异常 {}".format(ex))
        self.driver.quit()

    def find_element(self,*args):
        try:
            loc = args[0]
            value = args[1]
            if loc == 'id':
                element = By.ID
            if loc == 'xpath':
                element = By.XPATH
            if loc == 'css':
                element = By.CSS_SELECTOR
            WebDriverWait(self.driver, 30).until(
                lambda driver: True if driver.find_element(element, value) is not None else False)
            print('{}存在,断言成功'.format(value))
            return True
        except Exception as ex:
            print('{}不存在,断言失败'.format(value))
            self.driver.save_screenshot('element_not_exist_'+time.strftime('%m-%d %H:%M', time.localtime(time.time()))+'.png')

    def get_url(self):
        timestamp = str(round(time.time() * 1000))
        secret = 'SEC*********'  # 添加机器人步骤选择加签选项所生成的secret
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        has_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(has_code))
        return timestamp ,sign

    def send_dingding(self):
        try:
            # URL参数值为添加机器人生成的webhook链接
            url = 'https://oapi.dingtalk.com/robot/send?access_token=**********' \ 
                  '&timestamp='+self.get_url()[0]+\
                  '&sign='+self.get_url()[1]
            header = {'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'}
            param = json.dumps(
                        {
                            "msgtype": "markdown",
                            "markdown": {
                                "title": "卡片title自定义",
                                "text": "#### 自定义文案 ("+time.strftime('%m-%d %H:%M', time.localtime(time.time()))+") \n> > ![screenshot]("+self.upload_cos(os.getcwd()+'\\'+filename)+")\n> \n"
                            }
                        }
                    )
            requests.post(url, data=param, headers=header)
        except Exception as ex:
            print('发送钉钉群异常{}'.format(ex))
            raise ex

    def upload_cos(self,path):
        """
        图片上传至腾讯云cos
        :param path:
        :return:
        """
        secret_id = 'AKID**********'  # cos secretId
        secret_key = 'EC**********'   # cos secretKey
        region = 'ap-guangzhou'  # cos region
        config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)
        client = CosS3Client(config)
        file_name ='es_monitor_'+time.strftime('%Y-%m-%d-%H_%M', time.localtime(time.time()))+'.png'
        client.upload_file(
            Bucket='**-*****',  # cos bucket
            LocalFilePath=path,  # 本地文件的路径
            Key=file_name,  # 上传到桶之后的文件名
        )
        visit = 'https://**-*****.cos.ap-guangzhou.myqcloud.com/'  # cos visitUrl
        image_link = visit+file_name
        print('image link:{}'.format(image_link))
        return image_link


if __name__ == '__main__':
    no = Notification()
    no.get_screen()


↙↙↙阅读原文可查看相关链接,并与作者交流