LoginSignup
1
2

More than 5 years have passed since last update.

Bitbucketのコミット/プルリクエストからコメント/タスクの一覧を抜き出す

Last updated at Posted at 2018-03-27

何ができるの?

期間指定で、コミット/プルリクのコメント/タスクの一覧をCSVで出力します。
指摘を分析したい場合とかにご活用下さい。

前提

  1. Jenkinsから実行
  2. pythonプラグインを入れておく
  3. 以下をジョブのパラメータに用意する
    • pj_id:プロジェクトID
    • repos_id:リポジトリID
    • start_date:開始日(yyyymmdd)
    • end_date:終了日(yyyymmdd)

コード

python3
#!/usr/bin/python
# coding: utf-8

import requests
import json
import sys
import os
import csv
from datetime import datetime

sys.setrecursionlimit(10000)
PJ_ID = os.getenv("pj_id")
REPOS_ID = os.getenv("repos_id")
bitbucket = 'http://<base-url>/<context-path>'
rest_api = 'rest/api/1.0/'
REST_URL = bitbucket + rest_api
PR_FILE = open('PullRequestComments.csv', 'w', newline='')
PR_WRITER = csv.writer(PR_FILE, delimiter=',')
C_FILE = open('CommitComments.csv', 'w', newline='')
C_WRITER = csv.writer(C_FILE, delimiter=',')

s = os.getenv("start_date")
if len(s) != 8:
  sys.exit(1) 
e = os.getenv("end_date")
if len(e) != 8:
  sys.exit(1)
DT_S = datetime(int(s[0:4]), int(s[4:6]), int(s[6:8]))
DT_E = datetime(int(e[0:4]), int(e[4:6]), int(e[6:8]))
print("start_date:" + str(DT_S.timestamp()*1000))
print("end_date:" + str(DT_E.timestamp()*1000))

def get_prs():
  print("----get_prs----")
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/pull-requests'.format(PJ_ID, REPOS_ID)
  print(url)
  p = {'state': 'all', 'limit': '100'}
  r = requests.get(url, auth=('admin', 'admin'), params=p)
  print(str(r))
  # print(str(r.json()))

  list = []
  flg = (1 is 2)

  # 期間条件に合致していればリストに追加
  dt_head = datetime.fromtimestamp(r.json()["values"][0]["createdDate"]/1000)
  dt_foot = datetime.fromtimestamp(r.json()["values"][-1]["createdDate"]/1000)
  print("dt_head:" + str(dt_head.timestamp()*1000))
  print("dt_foot:" + str(dt_foot.timestamp()*1000))
  if dt_foot <= DT_E and dt_head >= DT_S:
    list.extend(r.json()["values"])
    flg = not r.json()["isLastPage"]

  while flg:
    # 判定条件
    head_idx = r.json()["nextPageStart"] - 100
    dt_head = datetime.fromtimestamp(list[head_idx]["createdDate"]/1000)
    dt_foot = datetime.fromtimestamp(list[-1]["createdDate"]/1000)
    print("dt_head:" + str(dt_head.timestamp()*1000))
    print("dt_foot:" + str(dt_foot.timestamp()*1000))

    # request
    p = {'state': 'all', 'limit': '100', 'start': r.json()["nextPageStart"]}
    r = requests.get(url, auth=('admin', 'admin'), params=p)
    print(str(r))

    if dt_foot <= DT_E and dt_head >= DT_S:
      # 期間条件に合致していればリストに追加:次のリストがあれば取得
      flg = not r.json()["isLastPage"]
      list.extend(r.json()["values"])
    elif dt_foot > DT_E:
      # この先期間条件に合致する場合:次のリストがあれば取得
      flg = not r.json()["isLastPage"]
    else:
      # いずれでもない場合はbreak
      flg = (1 is 2)

  print("r.json()['start']:" + str(r.json()["start"]))
  #条件に合致するものだけを抽出
  r = [i for i in list if DT_S <= datetime.fromtimestamp(i["createdDate"]/1000) and datetime.fromtimestamp(i["createdDate"]/1000) <= DT_E ]
  return r

def get_commits():
  print("----get_commits----")
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/commits'.format(PJ_ID, REPOS_ID)
  print(url)
  p = {'limit': '100'}
  r = requests.get(url, auth=('admin', 'admin'), params=p)
  print(str(r))
  # print(str(r.json()))

  list = []
  flg = (1 is 2)

  # 期間条件に合致していればリストに追加
  dt_head = datetime.fromtimestamp(r.json()["values"][0]["authorTimestamp"]/1000)
  dt_foot = datetime.fromtimestamp(r.json()["values"][-1]["authorTimestamp"]/1000)
  if dt_foot <= DT_E and dt_head >= DT_S:
    list.extend(r.json()["values"])
    flg = not r.json()["isLastPage"]

  while flg:
    # 判定条件
    head_idx = r.json()["nextPageStart"] - 100
    dt_head = datetime.fromtimestamp(list[head_idx]["authorTimestamp"]/1000)
    dt_foot = datetime.fromtimestamp(list[-1]["authorTimestamp"]/1000)

    # request
    p = {'limit': '100', 'start': r.json()["nextPageStart"]}
    r = requests.get(url, auth=('admin', 'admin'), params=p)
    print(str(r))

    if dt_foot <= DT_E and dt_head >= DT_S:
      # 期間条件に合致していればリストに追加:次のリストがあれば取得
      flg = not r.json()["isLastPage"]
      list.extend(r.json()["values"])
    elif dt_foot > DT_E:
      # この先期間条件に合致する場合:次のリストがあれば取得
      flg = not r.json()["isLastPage"]
    else:
      # いずれでもない場合はbreak
      flg = (1 is 2)

  print("r.json()['start']:" + str(r.json()["start"]))
  #条件に合致するものだけを抽出
  r = [i for i in list if DT_S <= datetime.fromtimestamp(i["authorTimestamp"]/1000) and datetime.fromtimestamp(i["authorTimestamp"]/1000) <= DT_E ]
  return r

def get_pr_files(pr_no):
  print("----get_pr_files----")
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/pull-requests/{2}/changes'.format(PJ_ID, REPOS_ID, pr_no)
  p = {'limit': '100'}
  print(url)
  r = requests.get(url, auth=('admin', 'admin'), params=p)
  print(str(r))
  list = r.json()["values"]

  while (not r.json()["isLastPage"]):  
    p = {'limit': '100', 'start': r.json()["nextPageStart"]}
    r = requests.get(url, auth=('admin', 'admin'), params=p)
    list.extend(r.json()["values"])

  print(r.json()["start"])
  return list

def get_c_files(c_no):
  print("----get_c_files----")
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/commits/{2}/changes'.format(PJ_ID, REPOS_ID, c_no)
  p = {'limit': '100'}
  print(url)
  r = requests.get(url, auth=('admin', 'admin'), params=p)
  print(str(r))
  # print(str(r.json()))
  list = r.json()["values"]

  while (not r.json()["isLastPage"]):  
    p = {'limit': '100', 'start': r.json()["nextPageStart"]}
    r = requests.get(url, auth=('admin', 'admin'), params=p)
    list.extend(r.json()["values"])

  print(r.json()["start"])
  return list

def get_pr_comments(filename, pr_no):
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/pull-requests/{2}/comments?path={3}'.format(PJ_ID, REPOS_ID, pr_no, filename)
  print(url)
  r = requests.get(url, auth=('admin', 'admin'))
  print(str(r))
  # print(str(r.json()))

  if (not r.json()["isLastPage"]): 
    print('Error: isLastPage is false')
    sys.exit(1)         # プログラムの終了

  return r.json()["values"]

def get_c_comments(filename, c_no):
  # apiで取得
  url = REST_URL + 'projects/{0}/repos/{1}/commits/{2}/comments?path={3}'.format(PJ_ID, REPOS_ID, c_no, filename)
  print(url)
  r = requests.get(url, auth=('admin', 'admin'))
  print(str(r))
  # print(str(r.json()))

  if (not r.json()["isLastPage"]): 
    print('Error: isLastPage is false')
    sys.exit(1)         # プログラムの終了

  return r.json()["values"]

def export_tasks(this, line, file_name, parent_id, pr_id, pr_title, pr_state, writer):
  # print("----export_tasks----")
  tasks = this["tasks"]
  for t in tasks:
    list = [pr_id, pr_title, pr_state, 'タスク', str(t["id"]), parent_id, file_name, line, t["author"]["name"], t["text"], t["state"]]
    writer.writerow(list)

def export_comments(this, line, file_name, parent_id, pr_id, pr_title, pr_state, writer):
  # print("----export_comments----")
  list = [pr_id, pr_title, pr_state, 'コメント', str(this["id"]), parent_id, file_name, line, this["author"]["name"], this["text"], "-"]
  writer.writerow(list)
  export_tasks(this, line, file_name, this["id"], pr_id, pr_title, pr_state, writer)
  for child in this["comments"]:
    export_comments(child, line, file_name, this["id"], pr_id, pr_title, pr_state, writer)

def write_title():
  print("----write_title----")
  list = ['PR_ID', 'PR_TITLE', 'PR_STATE', 'ID', '親ID', '対象ファイル', '対象行', 'author', 'text', 'タスク状態']
  PR_WRITER.writerow(list)
  list = ['Commit_ID', 'PR_TITLE', 'PR_STATE', '種別', 'ID', '親ID', '対象ファイル', '対象行', 'author', 'text', 'タスク状態']
  C_WRITER.writerow(list)

def write_pr():
  prs = get_prs()
  for p in prs:
    files = get_pr_files(p["id"])
    for f in files:
      comments = get_pr_comments(f["path"]["toString"], p["id"])
      for c in comments:
        # print(str(c))
        l = str(c["anchor"]["line"]) if "line" in c["anchor"] else ("-")
        fn = c["anchor"]["path"]
        export_comments(c, l, fn, "-", p["id"], p["title"], p["state"], PR_WRITER)
  PR_FILE.close()

def write_c():
  cs = get_commits()
  for c in cs:
    files = get_c_files(c["id"])
    for f in files:
      comments = get_c_comments(f["path"]["toString"], c["id"])
      for c in comments:
        # print(str(c))
        l = str(c["anchor"]["line"]) if "line" in c["anchor"] else ("-")
        fn = c["anchor"]["path"]
        export_comments(c, l, fn,"-", c["id"], "-", "-", C_WRITER)
  C_FILE.close()

def main():
  write_title()
  write_pr()
  write_c()

if __name__ == "__main__":
  main()

さいごに

品質分析に使うかどうかわからんけど、とりあえず作ってみたツール。
結局まだ使ってないので、供養のために記事化。

あ、期間の長さはほどほどにしておいて下さいね。

以上です。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2