内容
- Pacemaker2.1.7(RHEL8.10/RHEL9.4以降)で盛り込まれたcib更新のtransaction処理について記載します。
- なお、RHEL10.0に含まれるPacemaker 3.0系にも同様に盛り込まれています。
- transaction更新によって、連続した複数のcib更新の完了前にcib更新による通知が行われず、最終的に更新されたcib情報のみがschedullredなどで処理可能となります。
- 最終的な更新が完了するまでは、更新完了前のcibが処理されます。
- 現状、transaction処理を行っているのは、以下の3点のみとなっています。
- controldのノード情報の削除、更新
- attrd(Writerノード)の属性書き込み
- crm_nodeコマンドツール
- また、cibを取得・判定するような場合でも、更新完了までcib情報(node_stateなど)が中途半端な状態で処理されることが無くなります。
- 例えば、transaction導入前のcontroldのノード情報の削除、更新処理の場合、削除のタイミングでcib更新の通知から処理が行われる為、情報が削除された状態で、状態遷移の計算が行われる為、おかしなリソース移動の計算が行われる可能性があります。
- また、attrdのWriterノードによる属性書き込みも同様で、属性更新のタイミングでcib更新の通知処理が行われると、状態遷移計算などに影響が出る場合があります。
- 例えば、transaction導入前のcontroldのノード情報の削除、更新処理の場合、削除のタイミングでcib更新の通知から処理が行われる為、情報が削除された状態で、状態遷移の計算が行われる為、おかしなリソース移動の計算が行われる可能性があります。
- transaction処理は、以下のように処理されています。
- 該当するcib更新処理を行うcib(based)のクライアント(controld/attrd)が、トランザクションを開始する。
- cibへの接続情報のtransaction領域(cib->transaction)を確保する。
- クライアントが、cib削除、更新などの処理を行う。
- このcibへの処理は、basedへは送信されない。
- 処理はすべて、cibへの接続情報のtransaction領域へ積み上げされる。
- トランザクション導入前の場合は、cibの更新がbasedへその都度送信されるため、cibの内容が更新処理の都度、変更されていた。この為、#5815のcrm_resourceでの取得したcibの状態が発生する。
- クライアントがすべての処理の完了時に、トランザクションを終了する。
- このタイミングで、cibへの接続情報のtransaction領域のすべての処理データをbasedへ送信する。
- basedが要求メッセージを受信する。
- basedは、メッセージのすべてのcib更新処理を実行する。
- 更新を完了するまで、cibの更新・差分通知を行わない。
- これにより、途中の更新での更新通知はクライアントへは行われない。
- なお、cib(basedの処理はメッセージイベント内で完結している為、他の処理イベントの影響は受けない為、トランザクションの更新処理中に他の更新が実行されるようなことはありません。
- main_loopのイベント処理は単一のイベント完了まで、次のイベントの影響は受け付けない為。
- 上記の通りで、複数の更新を束ねてbasedへ依頼して、一括処理させる仕組みで、データベースのtransactionとは、ちょっと考えが違います。
- rollbackなども持っていない。
- トランザクションは1クライアントで1トランザクションまで。。。となっている。
- 該当するcib更新処理を行うcib(based)のクライアント(controld/attrd)が、トランザクションを開始する。
- また、cib(based)側のcibに対してcib->transactionを持たせてトランザクションを積み上げるのではなく、クライアント側で積み上げて、送信しているのが特徴です。
ソース箇所(Pacemaker3.0.x)
daemons/controld/controld_join_dc.c(attrdとcrm_nodeは割愛)
(snip)
/* A_DC_JOIN_PROCESS_ACK */
void
do_dc_join_ack(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input, fsa_data_t * msg_data)
{
int join_id = -1;
(snip)
/* Update CIB with node's current executor state. A new transition will be
* triggered later, when the CIB manager notifies us of the change.
*
* The delete and modify requests are part of an atomic transaction.
*/
★トランザクション開始(cib->transaction準備)
rc = cib->cmds->init_transaction(cib);
if (rc != pcmk_ok) {
goto done;
}
// Delete relevant parts of node's current executor state from CIB
if (pcmk_is_set(controld_globals.flags, controld_shutdown_lock_enabled)) {
section = controld_section_lrm_unlocked;
}
controld_node_state_deletion_strings(join_from, section, &xpath, NULL);
★cib更新(cib->transactionへ処理を追加)
rc = cib->cmds->remove(cib, xpath, NULL,
cib_xpath|cib_multiple|cib_transaction);
if (rc != pcmk_ok) {
goto done;
}
(snip)
★cib更新(cib->transactionへ、さらに処理を追加)
rc = cib->cmds->modify(cib, PCMK_XE_STATUS, state,
cib_can_create|cib_transaction);
pcmk__xml_free(execd_state);
if (rc != pcmk_ok) {
goto done;
}
★トランザクション終了(ここでcibへ送信。cib(based)で積み上げられた処理を一括処理する。)
// Commit the transaction
rc = cib->cmds->end_transaction(cib, true, cib_none);
fsa_register_cib_callback(rc, join_from, join_node_state_commit_callback);
(snip)
daemon/based/based_transaction.c
(snip)
static int
process_transaction_requests(xmlNodePtr transaction,
const pcmk__client_t *client, const char *source)
{
★すべてのtransactionを処理する。
for (xmlNode *request = pcmk__xe_first_child(transaction,
PCMK__XE_CIB_COMMAND, NULL,
NULL);
request != NULL;
request = pcmk__xe_next(request, PCMK__XE_CIB_COMMAND)) {
const char *op = crm_element_value(request, PCMK__XA_CIB_OP);
const char *host = crm_element_value(request, PCMK__XA_CIB_HOST);
const cib__operation_t *operation = NULL;
int rc = cib__get_operation(op, &operation);
if (rc == pcmk_rc_ok) {
if (!pcmk_is_set(operation->flags, cib__op_attr_transaction)
|| (host != NULL)) {
rc = EOPNOTSUPP;
} else {
/* Commit-transaction is a privileged operation. If we reached
* this point, the request came from a privileged connection.
*/
rc = cib_process_request(request, TRUE, client);
rc = pcmk_legacy2rc(rc);
}
}
if (rc != pcmk_rc_ok) {
crm_err("Aborting CIB transaction for %s due to failed %s request: "
"%s",
source, op, pcmk_rc_str(rc));
crm_log_xml_info(request, "Failed request");
return rc;
}
crm_trace("Applied %s request to transaction working CIB for %s",
op, source);
crm_log_xml_trace(request, "Successful request");
}
return pcmk_rc_ok;
}
(snip)
int
based_commit_transaction(xmlNodePtr transaction, const pcmk__client_t *client,
const char *origin, xmlNodePtr *result_cib)
{
★一旦、現在のcibポインタを保持
xmlNodePtr saved_cib = the_cib;
int rc = pcmk_rc_ok;
char *source = NULL;
pcmk__assert(result_cib != NULL);
CRM_CHECK(pcmk__xe_is(transaction, PCMK__XE_CIB_TRANSACTION),
return pcmk_rc_no_transaction);
/* *result_cib should be a copy of the_cib (created by cib_perform_op()). If
* not, make a copy now. Change tracking isn't strictly required here
* because:
* * Each request in the transaction will have changes tracked and ACLs
* checked if appropriate.
* * cib_perform_op() will infer changes for the commit request at the end.
*/
CRM_CHECK((*result_cib != NULL) && (*result_cib != the_cib),
*result_cib = pcmk__xml_copy(NULL, the_cib));
source = based_transaction_source_str(client, origin);
crm_trace("Committing transaction for %s to working CIB", source);
// Apply all changes to a working copy of the CIB
★変更対象のcibをセット
the_cib = *result_cib;
★トランザクションを処理
rc = process_transaction_requests(transaction, client, origin);
crm_trace("Transaction commit %s for %s",
((rc == pcmk_rc_ok)? "succeeded" : "failed"), source);
/* Some request types (for example, erase) may have freed the_cib (the
* working copy) and pointed it at a new XML object. In that case, it
* follows that *result_cib (the working copy) was freed.
*
* Point *result_cib at the updated working copy stored in the_cib.
*/
★トランザクション処理されたcibを結果にセット
*result_cib = the_cib;
// Point the_cib back to the unchanged original copy
★変更前のcibポインタに戻す
the_cib = saved_cib;
free(source);
return rc;
}
daemon/based/based_operation.c
(snip)
static const cib__op_fn_t cib_op_functions[] = {
[cib__op_abs_delete] = cib_process_delete_absolute,
[cib__op_apply_patch] = cib_server_process_diff,
[cib__op_bump] = cib_process_bump,
[cib__op_commit_transact] = cib_process_commit_transaction,
(snip)
}}}
* daemon/based/based_messages.c
{{{
(snip)
int
cib_process_commit_transaction(const char *op, int options, const char *section,
xmlNode *req, xmlNode *input,
xmlNode *existing_cib, xmlNode **result_cib,
xmlNode **answer)
{
/* On success, our caller will activate *result_cib locally, trigger a
* replace notification if appropriate, and sync *result_cib to all nodes.
* On failure, our caller will free *result_cib.
*/
int rc = pcmk_rc_ok;
const char *client_id = crm_element_value(req, PCMK__XA_CIB_CLIENTID);
const char *origin = crm_element_value(req, PCMK__XA_SRC);
pcmk__client_t *client = pcmk__find_client_by_id(client_id);
rc = based_commit_transaction(input, client, origin, result_cib);
if (rc != pcmk_rc_ok) {
char *source = based_transaction_source_str(client, origin);
crm_err("Could not commit transaction for %s: %s",
source, pcmk_rc_str(rc));
free(source);
}
return pcmk_rc2legacy(rc);
}
lib/cib/cib_utils.c
(snip)
int
cib__extend_transaction(cib_t *cib, xmlNode *request)
{
int rc = pcmk_rc_ok;
pcmk__assert((cib != NULL) && (request != NULL));
rc = validate_transaction_request(request);
if ((rc == pcmk_rc_ok) && (cib->transaction == NULL)) {
rc = pcmk_rc_no_transaction;
}
if (rc == pcmk_rc_ok) {
★cib->trasactionへ積み上げ
pcmk__xml_copy(cib->transaction, request);
} else {
const char *op = crm_element_value(request, PCMK__XA_CIB_OP);
const char *client_id = NULL;
cib->cmds->client_id(cib, NULL, &client_id);
crm_err("Failed to add '%s' operation to transaction for client %s: %s",
op, pcmk__s(client_id, "(unidentified)"), pcmk_rc_str(rc));
crm_log_xml_info(request, "failed");
}
return pcmk_rc2legacy(rc);
}
(sniip)
lib/cib/cib_native.c
(snip)
static int
cib_native_perform_op_delegate(cib_t *cib, const char *op, const char *host,
const char *section, xmlNode *data,
xmlNode **output_data, int call_options,
const char *user_name)
{
int rc = pcmk_ok;
(snip)
if (pcmk_is_set(call_options, cib_transaction)) {
★cib_transactionの場合は、cib(based)へは送信せずに積み上げて終了
rc = cib__extend_transaction(cib, op_msg);
goto done;
}
(snip)
★トランザクションではないcibへの処理は、以降で、cibへ送信して処理される。
lib/cib/cib_client.c
(snip)
cib_client_init_transaction(cib_t *cib)
{
int rc = pcmk_rc_ok;
if (cib == NULL) {
return -EINVAL;
}
if (cib->transaction != NULL) {
// A client can have at most one transaction at a time
rc = pcmk_rc_already;
}
if (rc == pcmk_rc_ok) {
★cib->transactionの生成
cib->transaction = pcmk__xe_create(NULL, PCMK__XE_CIB_TRANSACTION);
}
if (rc != pcmk_rc_ok) {
const char *client_id = NULL;
cib->cmds->client_id(cib, NULL, &client_id);
crm_err("Failed to initialize CIB transaction for client %s: %s",
client_id, pcmk_rc_str(rc));
}
return pcmk_rc2legacy(rc);
}
static intcib_client_end_transaction(cib_t *cib, bool commit, int call_options)
{
const char *client_id = NULL;
int rc = pcmk_ok;
if (cib == NULL) {
return -EINVAL;
}
cib->cmds->client_id(cib, NULL, &client_id);
client_id = pcmk__s(client_id, "(unidentified)");
if (commit) {
if (cib->transaction == NULL) {
rc = pcmk_rc_no_transaction;
crm_err("Failed to commit transaction for CIB client %s: %s",
client_id, pcmk_rc_str(rc));
return pcmk_rc2legacy(rc);
}
★全cib->transactionをcib(based)へ送信
rc = cib_internal_op(cib, PCMK__CIB_REQUEST_COMMIT_TRANSACT, NULL, NULL,
cib->transaction, NULL, call_options, cib->user);
} else {
// Discard always succeeds
if (cib->transaction != NULL) {
crm_trace("Discarded transaction for CIB client %s", client_id);
} else {
crm_trace("No transaction found for CIB client %s", client_id);
}
}
pcmk__xml_free(cib->transaction);
cib->transaction = NULL;
return rc;
}
(snip)
cib_t
*cib_new_variant(void){
cib_t *new_cib = NULL;
(snip)
new_cib->cmds->init_transaction = cib_client_init_transaction;
new_cib->cmds->end_transaction = cib_client_end_transaction;
(snip)
以上です。