何ができるの?
期間指定で、コミット/プルリクのコメント/タスクの一覧をCSVで出力します。
指摘を分析したい場合とかにご活用下さい。
前提
- Jenkinsから実行
- pythonプラグインを入れておく
- 以下をジョブのパラメータに用意する
- pj_id:プロジェクトID
- repos_id:リポジトリID
- start_date:開始日(yyyymmdd)
- end_date:終了日(yyyymmdd)
コード
python3
#!/usr/bin/python
# coding: utf-8
import requests
import json
import sys
import os
import csv
from datetime import datetime
sys.setrecursionlimit(10000)
PJ_ID = os.getenv("pj_id")
REPOS_ID = os.getenv("repos_id")
bitbucket = 'http://<base-url>/<context-path>'
rest_api = 'rest/api/1.0/'
REST_URL = bitbucket + rest_api
PR_FILE = open('PullRequestComments.csv', 'w', newline='')
PR_WRITER = csv.writer(PR_FILE, delimiter=',')
C_FILE = open('CommitComments.csv', 'w', newline='')
C_WRITER = csv.writer(C_FILE, delimiter=',')
s = os.getenv("start_date")
if len(s) != 8:
sys.exit(1)
e = os.getenv("end_date")
if len(e) != 8:
sys.exit(1)
DT_S = datetime(int(s[0:4]), int(s[4:6]), int(s[6:8]))
DT_E = datetime(int(e[0:4]), int(e[4:6]), int(e[6:8]))
print("start_date:" + str(DT_S.timestamp()*1000))
print("end_date:" + str(DT_E.timestamp()*1000))
def get_prs():
print("----get_prs----")
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/pull-requests'.format(PJ_ID, REPOS_ID)
print(url)
p = {'state': 'all', 'limit': '100'}
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
# print(str(r.json()))
list = []
flg = (1 is 2)
# 期間条件に合致していればリストに追加
dt_head = datetime.fromtimestamp(r.json()["values"][0]["createdDate"]/1000)
dt_foot = datetime.fromtimestamp(r.json()["values"][-1]["createdDate"]/1000)
print("dt_head:" + str(dt_head.timestamp()*1000))
print("dt_foot:" + str(dt_foot.timestamp()*1000))
if dt_foot <= DT_E and dt_head >= DT_S:
list.extend(r.json()["values"])
flg = not r.json()["isLastPage"]
while flg:
# 判定条件
head_idx = r.json()["nextPageStart"] - 100
dt_head = datetime.fromtimestamp(list[head_idx]["createdDate"]/1000)
dt_foot = datetime.fromtimestamp(list[-1]["createdDate"]/1000)
print("dt_head:" + str(dt_head.timestamp()*1000))
print("dt_foot:" + str(dt_foot.timestamp()*1000))
# request
p = {'state': 'all', 'limit': '100', 'start': r.json()["nextPageStart"]}
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
if dt_foot <= DT_E and dt_head >= DT_S:
# 期間条件に合致していればリストに追加:次のリストがあれば取得
flg = not r.json()["isLastPage"]
list.extend(r.json()["values"])
elif dt_foot > DT_E:
# この先期間条件に合致する場合:次のリストがあれば取得
flg = not r.json()["isLastPage"]
else:
# いずれでもない場合はbreak
flg = (1 is 2)
print("r.json()['start']:" + str(r.json()["start"]))
#条件に合致するものだけを抽出
r = [i for i in list if DT_S <= datetime.fromtimestamp(i["createdDate"]/1000) and datetime.fromtimestamp(i["createdDate"]/1000) <= DT_E ]
return r
def get_commits():
print("----get_commits----")
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/commits'.format(PJ_ID, REPOS_ID)
print(url)
p = {'limit': '100'}
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
# print(str(r.json()))
list = []
flg = (1 is 2)
# 期間条件に合致していればリストに追加
dt_head = datetime.fromtimestamp(r.json()["values"][0]["authorTimestamp"]/1000)
dt_foot = datetime.fromtimestamp(r.json()["values"][-1]["authorTimestamp"]/1000)
if dt_foot <= DT_E and dt_head >= DT_S:
list.extend(r.json()["values"])
flg = not r.json()["isLastPage"]
while flg:
# 判定条件
head_idx = r.json()["nextPageStart"] - 100
dt_head = datetime.fromtimestamp(list[head_idx]["authorTimestamp"]/1000)
dt_foot = datetime.fromtimestamp(list[-1]["authorTimestamp"]/1000)
# request
p = {'limit': '100', 'start': r.json()["nextPageStart"]}
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
if dt_foot <= DT_E and dt_head >= DT_S:
# 期間条件に合致していればリストに追加:次のリストがあれば取得
flg = not r.json()["isLastPage"]
list.extend(r.json()["values"])
elif dt_foot > DT_E:
# この先期間条件に合致する場合:次のリストがあれば取得
flg = not r.json()["isLastPage"]
else:
# いずれでもない場合はbreak
flg = (1 is 2)
print("r.json()['start']:" + str(r.json()["start"]))
#条件に合致するものだけを抽出
r = [i for i in list if DT_S <= datetime.fromtimestamp(i["authorTimestamp"]/1000) and datetime.fromtimestamp(i["authorTimestamp"]/1000) <= DT_E ]
return r
def get_pr_files(pr_no):
print("----get_pr_files----")
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/pull-requests/{2}/changes'.format(PJ_ID, REPOS_ID, pr_no)
p = {'limit': '100'}
print(url)
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
list = r.json()["values"]
while (not r.json()["isLastPage"]):
p = {'limit': '100', 'start': r.json()["nextPageStart"]}
r = requests.get(url, auth=('admin', 'admin'), params=p)
list.extend(r.json()["values"])
print(r.json()["start"])
return list
def get_c_files(c_no):
print("----get_c_files----")
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/commits/{2}/changes'.format(PJ_ID, REPOS_ID, c_no)
p = {'limit': '100'}
print(url)
r = requests.get(url, auth=('admin', 'admin'), params=p)
print(str(r))
# print(str(r.json()))
list = r.json()["values"]
while (not r.json()["isLastPage"]):
p = {'limit': '100', 'start': r.json()["nextPageStart"]}
r = requests.get(url, auth=('admin', 'admin'), params=p)
list.extend(r.json()["values"])
print(r.json()["start"])
return list
def get_pr_comments(filename, pr_no):
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/pull-requests/{2}/comments?path={3}'.format(PJ_ID, REPOS_ID, pr_no, filename)
print(url)
r = requests.get(url, auth=('admin', 'admin'))
print(str(r))
# print(str(r.json()))
if (not r.json()["isLastPage"]):
print('Error: isLastPage is false')
sys.exit(1) # プログラムの終了
return r.json()["values"]
def get_c_comments(filename, c_no):
# apiで取得
url = REST_URL + 'projects/{0}/repos/{1}/commits/{2}/comments?path={3}'.format(PJ_ID, REPOS_ID, c_no, filename)
print(url)
r = requests.get(url, auth=('admin', 'admin'))
print(str(r))
# print(str(r.json()))
if (not r.json()["isLastPage"]):
print('Error: isLastPage is false')
sys.exit(1) # プログラムの終了
return r.json()["values"]
def export_tasks(this, line, file_name, parent_id, pr_id, pr_title, pr_state, writer):
# print("----export_tasks----")
tasks = this["tasks"]
for t in tasks:
list = [pr_id, pr_title, pr_state, 'タスク', str(t["id"]), parent_id, file_name, line, t["author"]["name"], t["text"], t["state"]]
writer.writerow(list)
def export_comments(this, line, file_name, parent_id, pr_id, pr_title, pr_state, writer):
# print("----export_comments----")
list = [pr_id, pr_title, pr_state, 'コメント', str(this["id"]), parent_id, file_name, line, this["author"]["name"], this["text"], "-"]
writer.writerow(list)
export_tasks(this, line, file_name, this["id"], pr_id, pr_title, pr_state, writer)
for child in this["comments"]:
export_comments(child, line, file_name, this["id"], pr_id, pr_title, pr_state, writer)
def write_title():
print("----write_title----")
list = ['PR_ID', 'PR_TITLE', 'PR_STATE', 'ID', '親ID', '対象ファイル', '対象行', 'author', 'text', 'タスク状態']
PR_WRITER.writerow(list)
list = ['Commit_ID', 'PR_TITLE', 'PR_STATE', '種別', 'ID', '親ID', '対象ファイル', '対象行', 'author', 'text', 'タスク状態']
C_WRITER.writerow(list)
def write_pr():
prs = get_prs()
for p in prs:
files = get_pr_files(p["id"])
for f in files:
comments = get_pr_comments(f["path"]["toString"], p["id"])
for c in comments:
# print(str(c))
l = str(c["anchor"]["line"]) if "line" in c["anchor"] else ("-")
fn = c["anchor"]["path"]
export_comments(c, l, fn, "-", p["id"], p["title"], p["state"], PR_WRITER)
PR_FILE.close()
def write_c():
cs = get_commits()
for c in cs:
files = get_c_files(c["id"])
for f in files:
comments = get_c_comments(f["path"]["toString"], c["id"])
for c in comments:
# print(str(c))
l = str(c["anchor"]["line"]) if "line" in c["anchor"] else ("-")
fn = c["anchor"]["path"]
export_comments(c, l, fn,"-", c["id"], "-", "-", C_WRITER)
C_FILE.close()
def main():
write_title()
write_pr()
write_c()
if __name__ == "__main__":
main()
さいごに
品質分析に使うかどうかわからんけど、とりあえず作ってみたツール。
結局まだ使ってないので、供養のために記事化。
あ、期間の長さはほどほどにしておいて下さいね。
以上です。