概要
この記事は、JiraのREST APIを使ってバーンダウンチャートの画像を自前で作成し、
SlackerによりSlackチャンネル上にその画像を日常的に投稿するためのBotを作成する手順をまとめた記事です。
なぜバーンダウンチャートを日常的に投稿する必要があるか?
近年のプロダクト開発の現場では、スクラムを導入したチームが増えつつあります。
スクラムチームの質を高めるためには、スクラムガイドで提唱されている、透明性・検査・適応の3つの原則に基づくプロセスの質を高めていくことが重要です。
中でも「透明性」の質を上げることは、その後の検査・適応を行いやすくするという意味では特に改善を行う価値が高いとも言えます。
スクラムの開発現場では、Jiraのバーンダウンチャートを確認することで、開発状況を把握することも多いと思います。
しかし、JiraのサイトはSlackのように常にチームメンバーに目に付きやすい場所というわけでもなく、毎日欠かさずに全員が見に行くのには少々負荷があります。
また、昨今のコロナ禍で在宅勤務へとシフトしたり、個々がフレックスな時間で出勤している状況では、メンバーが集まって一緒にチャートを見るといったことは更に難しくなってきています。
そのような状況もあって、チームメンバーがいつでも目に付きやすいSlackチャンネル上に日常的にバーンダウンチャートを投稿し、毎日意識せずとも情報を閲覧できる状況にしておくほうが、透明性は高まるはずです。
既存のツール
Standuply
先に既存のツールを紹介しておきます。
本記事で実装するbotを使わずとも、Standuplyを使えば簡易的にバーンダウンチャートをSlack上に投稿することができます。
しかしStaduplyは以下のデメリットがあります
- 30日間のトライアルの後に月額課金に移行する必要がある
- 取得できるバーンダウンチャートはスクショのような決まった形式の画像のみ
- 使用感的に画像サイズがやや小さいため、長い期間やチケットの多いスプリントだと、ラベルやPlotが重なって見づらい場合がある
- バーンダウンチャート以外に付与したい情報を出せない
- チケットのステータスごとのポイント数や、誰がどのチケットをどれだけ持っているのかなど、etc...
前述したようにスクラムではあらゆることを可視化し、チームの透明性を高めることが重要であり、Standuplyでは実現したい機能を自由に持たせるには不十分である。
jira-python
Pythonを使ってJiraの機能を利用しようと検索された方は、ネットでjira-pythonを見つけたかもしれません。
Jiraの簡単な機能であれば、このライブラリをimportすれば手軽にJiraの機能を扱えるかもしれません。
しかしこのライブラリは、本記事を執筆時点ではAPIの使用で不十分な点が多く、利用できるJiraの機能もあまり多くありません。
バーンダウンチャートを作成するために使いたい機能の実装は、greenhopper REST APIを利用するなどされており、こちらは既にDepricatedになっています。(OSSなので今後改善される可能性はあります)
Jira REST API
今回利用するJiraの REST APIについて説明します。
JiraのREST APIは複数種類存在しており使っている環境や、目的に応じて使い分けが必要です。
今回は以下2つのREST APIを利用することで、本記事の目的を実現します。
- Jira Cloud platform REST API
- https://developer.atlassian.com/cloud/jira/platform/rest/v3/
- JiraのIssueなどの情報を取得できます
- Jira Software Cloud REST API
- https://developer.atlassian.com/cloud/jira/software/rest
- Sprintの情報など、agile開発に特化した情報を取得できます
requestsライブラリの導入
PythonでREST APIを利用するにはHTTPを利用するためのライブラリが必要です。
PythonにはHTTPの機能を扱うためのライブラリがいくつかありますが、requestsを使うのがおそらく最も手軽で良いでしょう。
以下のようにpipのコマンドを使って導入することができます。
$ pip install requests
使い方に関してはこちらのクイックスタートを読むのがわかりやすいです。
Slackerの導入
Slackerは、PythonからSlack REST APIを手軽に呼び出すことができるライブラリです。
前述したrequestsを使って直接SlackのAPIを呼び出しても良いのですが、Slackerを使ったほうがが扱いやすいため、こちらを使う方をおすすめします。
以下のようにpipのコマンドを使って導入することができます。
$ pip3 install slackbot
使い方に関してはこちらの記事などがわかりやすいです。
また、Slackerと似たようなものに、slackbotというものもあります。
こちらを使えば、チャンネル内の投稿やbotへのメンションに反応するようなものを作れるので、用途に合わせてこちらを使うのも良いと思います。
機能要件
今回は以下の要件を満たすbotを作成します。
- 最新のスプリントのバーンダウンチャートのimageを作成できる。
- 任意のSlackチャネルに投稿できる。
ファイル構成
.
├── config.ini
├── main.py
├── libs
│ ├── model.py
│ ├── platform_api.py
│ └── software_api.py
└── output
└── burndownchart.png
以下より実装の説明をしていきます。
JiraのAPIクラス
JIRAのREST APIを呼び出すためのメソッドを定義します。
前述したように2種類のAPIを使うため今回は分けて作成しています。
class PlatformApi:
def __init__(self, auth, board_id, url=None):
self.url = url
self.headers = {"Accept": "application/json"}
self.auth = auth
self.board_id = board_id
# 以下APIメソッドを定義
class SoftwareApi:
def __init__(self, auth, board_id, url=None):
self.url = url
self.headers = {"Accept": "application/json"}
self.auth = auth
self.board_id = board_id
# 以下APIメソッドを定義
API使用のためのconfig設定
Jira、SlackのそれぞれのREST APIを扱うために必要な情報をconfig.iniに記述し、main.pyからそれらの値を使いAPI操作のためのオブジェクトを作成します。
[jira]
platform_url = https://<your-domain>.atlassian.net/rest/api/3
software_url= https://<your-domain>.atlassian.net/rest/agile/latest
user = <your-userid>
token = <your-jira-token>
board_id = <your-board>
[slack]
token = <your-slack-token>
channel_name = <your-channelname>
platform_url = config['jira']['platform_url']
software_url = config['jira']['software_url']
jira_user = config['jira']['user']
jira_token = config['jira']['token']
board_id = config['jira']['board_id']
slack_token = config['slack']['token']
channel_name = config['slack']['channel_name']
# REST APIを操作するオブジェクト
platform_api = PlatformApi(url=platform_url, auth=(jira_user, jira_token), board_id=board_id)
software_api = SoftwareApi(url=software_url, auth=(jira_user, jira_token), board_id=board_id)
slacker = Slacker(slack_token)
ボードの列の設定情報の取得
バーンダウンチャートでは、チケットのステータスが、ボードの列の設定のどの列にマッピングされているかでバーンダウンするかを判断します。そのため、各列のステータス名のリストを取得して保持しておきます。
@dataclass
class BoardColumn():
name: str
status_ids: List[int] = field(default_factory=list)
ボードの情報は、agile関連のメソッドが定義されているsoftware_apiを使用して取得します。
def get_configuration(self):
url = "{}/board/{}/configuration".format(self.url, self.board_id)
response = requests.request(
method="GET",
url=url,
headers=self.headers,
auth=self.auth
)
return response.json()
board_columns = []
for raw_column in software_api.get_configuration()["columnConfig"]["columns"]:
column = BoardColumn(name=raw_column["name"], status_ids=[status["id"] for status in raw_column["statuses"]])
board_columns.append(column)
最新のSprintの取得
取得したSprintの情報を格納するため、dataclassを作成します。
@dataclass
class Sprint:
_id: str
name: str
originBoardId: str
start_date: datetime
end_date: datetime
goal: str
Sprintの情報は、agile関連のメソッドが定義されているsoftware_apiを使用して取得します。
def get_sprint(self):
url = "{}/board/{}/sprint".format(self.url, self.board_id)
response = requests.request(
method="GET",
url=url,
headers=self.headers,
auth=self.auth
)
return response.json()["values"]
最新のスプリントを取得するため、indexに-1を指定しておき、取得した値を使ってSprintオブジェクトに格納します。
date_format = '%Y-%m-%dT%H:%M:%S.%f%z'
raw_latest_sprint = [
sprint
for sprint in software_api.get_sprint()
if sprint["originBoardId"] == int(board_id)
][-1]
latest_sprint = Sprint(
_id=raw_latest_sprint["id"],
name=raw_latest_sprint["name"],
originBoardId=raw_latest_sprint["originBoardId"],
start_date=datetime.strptime(
raw_latest_sprint["startDate"].replace(
'Z',
'+00:00'
),
date_format
).astimezone(timezone(timedelta(hours=+9))),
end_date=datetime.strptime(
raw_latest_sprint["endDate"].replace(
'Z',
'+00:00'
),
date_format
).astimezone(timezone(timedelta(hours=+9))),
goal=raw_latest_sprint["goal"],
)
Issueの取得
Issueの情報は、スプリントバックログとバックログ用のそれぞれのAPIメソッドを使って取得します。
一見スプリントバックログのIssueのみでも要件は満たせそうですが、Issueを途中からスプリントから取り除いたり、追加したりする場合はよくあるので両方呼び出しておくのが良いです。
またバーンダウンチャートでは時系列に変更履歴を載せていくため、
Issue取得時にそれらの情報を取得できるよう、paramsにchangelogを指定しておきます。
def get_issues_for_sprint(self, sprint_id):
url = "{}/sprint/{}/issue".format(self.url, sprint_id)
payload = {'maxResults': 200, 'expand': 'changelog'}
response = requests.request(
method="GET",
url=url,
headers=self.headers,
auth=self.auth,
params=payload
)
return response.json()["issues"]
def get_issues_for_backlog(self):
url = "{}/board/{}/backlog".format(self.url, self.board_id)
payload = {'maxResults': 200, 'expand': 'changelog'}
response = requests.request(
method="GET",
url=url,
headers=self.headers,
auth=self.auth,
params=payload
)
return response.json()["issues"]
取得したIssueの情報を格納するため、dataclassを作成します。
@dataclass
class Issue:
key: str
summary: str
status_id: int
initial_point: float
initial_status: str
latest_point: float
initial_joined_sprint_id: str
sprint_id: str
change_logs: List[ChangeLog] = field(default_factory=list)
取得したIssueの情報をオブジェクトに格納し、Issueのリストを作成します。
格納する際の、
- get_changelog
- get_initial_point
- get_initial_status
- get_joined_initial_sprint_id
に関しては後述します。
issues = []
change_logs = []
for raw_issue in (
software_api.get_issues_for_sprint(latest_sprint._id) +
software_api.get_issues_for_backlog()
):
tmp_change_logs = []
fields = raw_issue["fields"]
sprint_id = ""
if fields["sprint"] is not None:
sprint_id = fields["sprint"]["id"]
latest_point = 0.0
if fields["customfield_10026"] is not None:
latest_point = fields["customfield_10026"]
raw_change_logs = raw_issue["changelog"]["histories"]
latest_status = fields["status"]["id"]
for raw_change_log in raw_change_logs:
change_log = get_changelog(raw_changelog=raw_change_log, issue_key=raw_issue["key"])
if change_log is not None:
tmp_change_logs.append(change_log)
if fields["customfield_10026"] is None:
fields["customfield_10026"] = 0.0
issue = Issue(
key=raw_issue["key"],
summary=fields["summary"],
initial_status=get_initial_status(latest_status, tmp_change_logs),
status_id=latest_status,
initial_point=get_initial_point(latest_point, tmp_change_logs),
latest_point=latest_point,
initial_joined_sprint_id=get_joined_initial_sprint_id(sprint_id, tmp_change_logs),
sprint_id=sprint_id,
change_logs=tmp_change_logs
)
issues.append(issue)
change_logs.extend(tmp_change_logs)
ChangeLogの取得
ChangeLogを格納するためのdataclassを作成します。
バーンダウンチャートを実現するにおいて、ChangeLogの状態遷移をそのまま用いると扱いが難しいため、
独自でEventTypeを定義しておき、それらに状態を集約します。
class EventType(Enum):
ADD_TO_SPRINT = 1
REMOVE_FROM_SPRINT = 2
CHANGE_TO_TODO = 3
CHANGE_TO_DONE = 4
CHANGE_POINT = 5
@dataclass
class ChangeLog:
date: datetime
_from: str
_to: str
fromString: str
toString: str
event_type: EventType
issue_key: str
バーンダウンチャートのポイントが上下するイベント条件は以下のようになっています。
- バーンアップするイベント条件
- 対象のスプリントにpointが付与されたIssueが追加される
- pointが変更された結果増える
- statusがボード設定で定義した完了状態のstatusに変わる
- バーンダウンするイベント条件
- 対象のスプリントからにpointが付与されたIssueが除去される
- pointが変更された結果減る
- statusがボード設定で定義した完了状態以外のstatusに変わる
これらの条件に沿って、EventTypeにイベント条件を格納していきます。
def get_changelog(raw_changelog, issue_key):
date = datetime.strptime(
raw_change_log["created"],
date_format
)
items = raw_change_log["items"]
for item in items:
if item["field"] == "status":
# 完了になったとき
if item["from"] not in board_columns[-1].status_ids and item["to"] in board_columns[-1].status_ids:
return ChangeLog(
date=date,
_from=item["from"],
_to=item["to"],
fromString=item["fromString"],
toString=item["toString"],
event_type=EventType.CHANGE_TO_DONE,
issue_key=issue_key
)
# 完了以外になったとき
elif item["from"] in board_columns[-1].status_ids and item["to"] not in board_columns[-1].status_ids:
return ChangeLog(
date=date,
_from=item["from"],
_to=item["to"],
fromString=item["fromString"],
toString=item["toString"],
event_type=EventType.CHANGE_TO_TODO,
issue_key=issue_key
)
# Story Pointを変更したとき
elif item["field"] == "Story Points":
before_point = item["fromString"]
after_point = item["toString"]
if before_point is None or before_point == '':
before_point = "0"
if after_point is None or after_point == '':
after_point = "0"
return ChangeLog(
date=date,
_from=item["from"],
_to=item["to"],
fromString=before_point,
toString=after_point,
event_type=EventType.CHANGE_POINT,
issue_key=issue_key
)
elif item["field"] == "Sprint":
# 現在のSprintに追加したとき
if str(latest_sprint._id) not in str(item["from"]):
if str(latest_sprint._id) in str(item["to"]):
return ChangeLog(
date=date,
_from=item["from"],
_to=item["to"],
fromString=item["fromString"],
toString=item["toString"],
event_type=EventType.ADD_TO_SPRINT,
issue_key=issue_key
)
# 現在のSprintから除去したとき
elif str(latest_sprint._id) in str(item["from"]):
if str(latest_sprint._id) not in str(item["to"]):
return ChangeLog(
date=date,
_from=item["from"],
_to=item["to"],
fromString=item["fromString"],
toString=item["toString"],
event_type=EventType.REMOVE_FROM_SPRINT,
issue_key=issue_key
)
スプリント開始時点の状態の取得
バーンダウンチャートは、スプリント開始時点から時系列にグラフを描画していく必要があります。
しかし、APIから取得したIssueに含まれるpointには、最新の単一のポイントやステータス、スプリントしか含まれていません。
そこで、前述したChangeLogを活用することで、最新の状態から過去の方向にそれぞれの値を遡って算出し、スプリント開始時点の状態を取得します。
def get_initial_point(latest_point, change_logs):
initial_point = latest_point
for change_log in sorted(
change_logs,
key=attrgetter('date'),
reverse=True
):
if change_log.date >= latest_sprint.start_date:
if change_log.event_type == EventType.CHANGE_POINT:
initial_point = float(change_log.fromString)
return initial_point
def get_initial_status(latest_status, change_logs):
initial_status = latest_status
for change_log in sorted(
change_logs,
key=attrgetter('date'),
reverse=True
):
if change_log.date >= latest_sprint.start_date:
initial_status = change_log._from
return initial_status
def get_joined_initial_sprint_id(latest_joined_sprint_id, change_logs):
initial_joined_sprint_id = latest_joined_sprint_id
for change_log in sorted(
change_logs,
key=attrgetter('date'),
reverse=True
):
if change_log.date >= latest_sprint.start_date:
if change_log.event_type == EventType.ADD_TO_SPRINT:
initial_joined_sprint_id = change_log._from
elif change_log.event_type == EventType.REMOVE_FROM_SPRINT:
initial_joined_sprint_id = change_log._from
return initial_joined_sprint_id
スプリント開始時点から最新までの状態の算出
今度は先程とは逆で、スプリント開始時点から最新までの状態を算出していきます。
算出途中の状態は逐次dictに入れて管理します。
issue_point_dict = {}
issue_status_dict = {}
issue_sprint_id_dict = {}
for issue in issues:
issue_point_dict[issue.key] = issue.initial_point
issue_status_dict[issue.key] = issue.initial_status
issue_sprint_id_dict[issue.key] = issue.initial_joined_sprint_id
先程ChangeLogに格納したイベント条件に沿って、story pointを加算・減算していき、同時に実線用のリストに値を格納していきます。
x_list = []
y_list = []
# Start Sprint時点でのTotalポイントを格納
x_list.append(latest_sprint.start_date)
y_list.append(goal_sprint_point)
tmp_total_point = goal_sprint_point
highest_timing_point = 0.0
# スプリント開始後のポイントの変化を格納する
for change_log in sorted(change_logs, key=attrgetter('date')):
if change_log.date < latest_sprint.start_date:
pass
else:
if change_log.event_type == EventType.CHANGE_TO_DONE:
issue_status_dict[change_log.issue_key] = change_log.toString
if str(issue_sprint_id_dict[change_log.issue_key]) == str(latest_sprint._id):
tmp_total_point -= issue_point_dict[change_log.issue_key]
print_event_log(change_log)
elif change_log.event_type == EventType.CHANGE_TO_TODO:
issue_status_dict[change_log.issue_key] = change_log.toString
if str(issue_sprint_id_dict[change_log.issue_key]) == str(
latest_sprint._id):
tmp_total_point += issue_point_dict[change_log.issue_key]
print_event_log(change_log)
elif change_log.event_type == EventType.REMOVE_FROM_SPRINT:
if str(issue_sprint_id_dict[change_log.issue_key]) == str(
latest_sprint._id):
issue_sprint_id_dict[change_log.issue_key] = change_log._to
if issue_status_dict[change_log.issue_key] not in board_columns[-1].status_ids:
tmp_total_point -= issue_point_dict[change_log.issue_key]
print_event_log(change_log)
elif change_log.event_type == EventType.ADD_TO_SPRINT:
if issue_status_dict[change_log.issue_key] not in board_columns[-1].status_ids:
issue_sprint_id_dict[change_log.issue_key] = change_log._to
tmp_total_point += issue_point_dict[change_log.issue_key]
print_event_log(change_log)
# Story Pointの前後計算して加算
elif change_log.event_type in {
EventType.CHANGE_POINT
}:
issue_point_dict[change_log.issue_key] = float(change_log.toString)
if str(issue_sprint_id_dict[change_log.issue_key]) == str(
latest_sprint._id):
if issue_status_dict[change_log.issue_key] not in board_columns[-1].status_ids:
tmp_total_point += (float(change_log.toString) -
float(change_log.fromString))
print_event_log(change_log)
else:
continue
else:
continue
x_list.append(change_log.date)
y_list.append(tmp_total_point)
highest_timing_point = max(highest_timing_point, tmp_total_point)
# 最新のポイントを格納する
x_list.append(datetime.now())
y_list.append(tmp_total_point)
Plotの設定と出力
バーンダウンチャートのラベルを付与
plt.title('Burn down chart for ' + latest_sprint.name)
plt.xlabel('TIME')
plt.ylabel('STORY POINTS')
先程の実線用のリストをplotします。
actual_df = pd.DataFrame(y_list, index=x_list)
ax.plot(actual_df.index, actual_df.values, color="r", marker=".")
実線以外にも、日々の消費すべきポイントの指標となる理想曲線を入れておきましょう。
date_range = pd.date_range(
datetime.strftime(latest_sprint.start_date, "%Y-%m-%d 00:00:00"),
datetime.strftime(latest_sprint.end_date + timedelta(days=1), "%Y-%m-%d 00:00:00"),
freq='12H'
)
number_of_days = date_range.size
ideal_points = {'values': np.linspace(goal_sprint_point, 0, number_of_days)}
ideal_df = pd.DataFrame(ideal_points, index=date_range)
plt.ylim(0, highest_timing_point + 1)
ax.grid(True)
ax.plot(ideal_df.index, ideal_df.values, color="b")
ax.set_xticks(ideal_df.index)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m/%d(%a) %H"))
ax.xaxis.set_minor_formatter(mdates.DateFormatter("%m/%d(%a) %H"))
_ = plt.xticks(rotation=90)
出力するImageとコメントを作成します。
# バーンダウンチャートのimageを作成
output_dir_name = 'output'
output_file_name = '/burndownchart.png'
if not os.path.exists(output_dir_name):
os.mkdir(output_dir_name)
image_file = output_dir_name + output_file_name
plt.savefig(image_file, dpi=300, bbox_inches="tight", pad_inches=0.1)
# コメントを作成
comment = "【Burn down chart for {}】\n".format(latest_sprint.name) +\
"*:goals: Sprint Goal:*\n" +\
"\n" +\
"```" + latest_sprint.goal + "```" + "\n"
comment += "\n"
comment += "*▪ Remaining tasks:*\n"
comment += "*[Todo]*\n"
for issue in sorted(issues, key=attrgetter('key')):
if str(latest_sprint._id) in str(issue.sprint_id):
if issue.status_id in board_columns[0].status_ids:
comment += ('https://<your-domain>.atlassian.net/browse/' + issue.key + ' :' + issue.summary + '\n')
comment += '\n'
comment += "*[Doing]*\n"
for issue in sorted(issues, key=attrgetter('key')):
if str(latest_sprint._id) in str(issue.sprint_id):
if issue.status_id in board_columns[1].status_ids:
comment += ('https://<your-domain>.atlassian.net/browse/' + issue.key + ' :' + issue.summary + '\n')
comment += '\n'
作成したimageとコメントを、slackerを使って指定のチャンネルにpostします。
slacker.files.upload(
image_file,
channels=channel_name,
initial_comment=comment
)
以上で実装部分は完成です。
あとはcronなどでpythonコマンドを定時実行するなど行えば定期的にSlackにpostすることができます。
Source Code (GitHub)
そのうちリポジトリのコード載せる(予定)