之前是通过读取 jenkins 的缓存来获取的,后面发现了个更好的方式,官方有 http 请求提供出来。无开始时间,只获取前 10 条数据,有开始时间,获取到最新的数据。不需要把代码拉下来,直接请求远端 git。
get_git_all_url = 'http://git.xxxxxx.com/api/v4/projects/'
get_project_branch_url = get_git_all_url + '{}/repository/branches'
get_branch_commit_url = get_git_all_url + '{}/repository/commits'
get_commit_msg_url = get_git_all_url + '{}/repository/commits/{}'
base_token = {'private_token': 'xxxxx'}
def get_git_project_id(private_token, project_repo_url):
"""
:param private_token: gitlab个人token,请在gitlab上生产
:param project_repo_url: http://xxxx.git,需要带上.git
:return:
"""
if all([private_token, project_repo_url]):
params_dict = {'private_token': private_token, 'per_page': 100}
for i in range(1, 101):
params_dict['page'] = i
response = requests.get(url=get_git_all_url, params=params_dict)
print(response.text)
if response.status_code == 200:
msg_dict = json.loads(response.text)
for j in range(len(msg_dict)):
msg = msg_dict[j]
if 'http_url_to_repo' in msg.keys() and msg['http_url_to_repo'] == project_repo_url:
if 'id' in msg.keys():
project_id = str(msg['id'])
print('查询到该token{}名下对git地址{}的项目id为{}'.format(private_token, project_repo_url, project_id))
return project_id
print('未查询到该token{}名下git地址{}的项目id'.format(private_token, project_repo_url))
return
def get_git_project_branchs(private_token, git_project_id):
"""
:param private_token: gitlab个人token,请在gitlab上生产
:param git_project_id: 字符串类型,项目id
:return:
"""
branch_list = []
if all([private_token, git_project_id]):
param_dict = {'private_token': private_token, 'per_page': 100}
for i in range(1, 101):
param_dict['page'] = i
response = requests.get(url=get_project_branch_url.format(git_project_id), params=param_dict)
print(response.text)
if len(response.text)<=2:
break
elif response.status_code == 200:
msg_dict = json.loads(response.text)
for j in range(len(msg_dict)):
if 'name' in msg_dict[j].keys():
if not msg_dict[j]['name']:
break
branch_list.append(msg_dict[j]['name'])
print('该项目id:{}对应的所有分支:{}'.format(git_project_id,str(branch_list)))
return branch_list
def get_branch_commit(private_token, git_project_id, branch_name, begin_time=None):
"""
:param private_token: gitlab的token
:param git_project_id: git项目id
:param branch_name: 分支名称
:param begin_time: 从这开始之后的commit时间
:return:
"""
commit_list = []
if all([private_token, git_project_id, branch_name]):
para_dict = {'private_token': private_token, 'ref_name': branch_name}
if not begin_time:
para_dict['per_page'] = 10
response = requests.get(url=get_branch_commit_url.format(git_project_id), params=para_dict)
print(response.text)
if response.status_code == 200:
msg_dict = json.loads(response.text)
for i in range(len(msg_dict)):
if 'id' in msg_dict[i].keys():
commit_id = msg_dict[i]['id']
commit_list.append(commit_id)
else:
para_dict['per_page'] = 100
for i in range(1, 101):
para_dict['page'] = i
response = requests.get(url=get_branch_commit_url.format(git_project_id), params=para_dict)
if response.status_code == 200:
msg_dict = json.loads(response.text)
for j in range(len(msg_dict)):
# 这里有三个时间,不确定是哪个,先全部拿下来,暂时用committed_data_time
id = msg_dict[j]['id']
authored_data_time = msg_dict[j]['authored_date']
committed_data_time = msg_dict[j]['committed_date']
created_at_time = msg_dict[j]['created_at']
if compare_time(committed_data_time, begin_time):
commit_list.append(id)
print('获取到分支{}对应的commit内容{}'.format(branch_name,str(commit_list)))
return commit_list
def get_commit_msg(private_token, git_project_id, commit_id):
commit_msg_dict = {}
if all([private_token, git_project_id, commit_id]):
para_dict = {'private_token': private_token}
response = requests.get(url=get_commit_msg_url.format(git_project_id, commit_id), params=para_dict)
print(response.text)
if response.status_code == 200:
msg_dict = json.loads(response.text)
commit_msg_dict['id'] = msg_dict['id']
commit_msg_dict['title'] = msg_dict['title']
if 'feat' in msg_dict['title'].lower():
commit_msg_dict['git_type']='feat'
elif 'fix' in msg_dict['title'].lower():
commit_msg_dict['git_type'] = 'fix'
elif 'merge' in msg_dict['title'].lower():
commit_msg_dict['git_type'] = 'merge'
else:
commit_msg_dict['git_type'] = 'update'
commit_msg_dict['message'] = msg_dict['message']
commit_msg_dict['author_name'] = msg_dict['author_name']
commit_msg_dict['committed_date'] = msg_dict['committed_date']
commit_msg_dict['additions'] = msg_dict['stats']['additions']
commit_msg_dict['deletions'] = msg_dict['stats']['deletions']
commit_msg_dict['total'] = msg_dict['stats']['total']
return commit_msg_dict
def get_gitProject_branch_commit_list(private_token, project_repo_url, branch, begin_time=None):
commit_list = []
try:
git_project_id = get_git_project_id(private_token=private_token,
project_repo_url=project_repo_url)
if git_project_id:
git_project_branch = get_git_project_branchs(private_token=private_token,
git_project_id=git_project_id)
if git_project_branch and branch in git_project_branch:
branch_commit_list = get_branch_commit(private_token=private_token,
git_project_id=git_project_id,
branch_name=branch,
begin_time=begin_time)
if branch_commit_list:
for i in range(len(branch_commit_list)):
commit_msg_dict = get_commit_msg(private_token=private_token,
git_project_id=git_project_id,
commit_id=branch_commit_list[i])
if commit_msg_dict:
commit_list.append(commit_msg_dict)
else:
print('该分支{}不在远端分支列表中{},或获取到的远端分支为空'.format(branch, git_project_branch))
print('获取到该项目地址{},分支{}的commit内容为{}'.format(project_repo_url,branch,commit_list))
except Exception as e:
print('获取该项目地址{},分支{}的commit内容过程中发生异常'.format(project_repo_url,branch))
print('异常信息为:{}'.format(e))
return commit_list
def compare_time(time1, time2_compare):
if any(key in time1 for key in ['T','+08:']):
time1=time1.replace('T',' ').replace('+08:','')
d1 = datetime.datetime.strptime(time1, '%Y-%m-%d %H:%M:%S.%f')
d2 = datetime.datetime.strptime(time2_compare, '%Y-%m-%d %H:%M:%S.%f')
delta = d1 - d2
print(delta.total_seconds())
if delta.total_seconds()>=0:
return True
else:
return False
# get_gitProject_branch_commit_list(private_token='xxxxx',
# project_repo_url='http://xxxxx.git',
# branch='dev',
# begin_time='2020-10-30 14:56:07.999999')