この記事はPostgreSQL Advent Calendar 2023の23日目の記事です。
はじめに
早いもので、恒例のこの季節になりました。
毎年wktkしながら書いているのですが、かみさんから「あんたのだけ、毎年くだらないな」的なことを言われたため、今年は真面目なテーマにしようと悩みました。
そんなある日、在宅ワークしていると自治体のアナウンスが流れてきました。
自治体のアナウンス:「近頃、警察を名乗って詐欺の電話がかかってきます。ご注意ください〜」
「特殊詐欺」っていうやつ?通称、オレオレ詐欺。
オレオレ詐欺とは
警察庁・特殊詐欺対策ページより:
親族、警察官、弁護士等を装い、親族が起こした事件・事故に対する示談金等を名目に金銭等をだまし取る(脅し取る)手口です。
まだ流行ってるんだなぁと思いながら、「せやっ!これをテーマにしよう!!」と閃めきました。
ちなみに、私(?)は、母に2回、父・姉にそれぞれ1回の前科4犯です。
なんたる悪党、成敗してくれようぞ!
オレオレ詐欺とPostgreSQL
「オレオレ詐欺とPostgreSQLに何の関係があるのさ?」とツッこむ方もいらっしゃると思いますので、はじめに簡単に説明します。
オレオレ詐欺の流れ
とてもシンプルなやりとりで大金を手に入れていることがわかります。
全く同じ流れがPostgreSQLの"ある処理"で見ることができます。
PostgreSQLのある処理の流れ
そうです。StreamReplicationの処理はオレオレ詐欺だったのです。
ということで、詐欺師(Standby)を懲らしめてやりましょう。
詐欺師(Standby)の手口
前述のフローは簡略化されているため、より具体的な手口をマニュアルに記載されている「詐欺師の近づき方」や「詐欺師の常套句」等で詳細を確認します。
コマンド | 挙動 | 期待してる返却値 | 備考 |
---|---|---|---|
connect | 接続&認証 | AuthenticationOk [Byte1('R'), int32x2] |
detail |
IDENTIFY_SYSTEM | 相手の確認 | systemid[text], timeline[int8], xlogpos[text], dbname[text] |
detail |
START_REPLICATION | 情報の要求 | XLogData [Byte1('w'), int64x3, Byten] |
detail |
上記やりとりした後は、情報(WAL)が振り込まれてくるので、それを受け取っているようです。
真打登場
一通りの手口を確認したところで、助っ人Gマン「みつお」を召喚します。
コードネームは「ぴーじーわるせんだーみつお」。
彼は、あたかもマスターサーバ(pg_walsender)のように振る舞い、情報の要求が会ったタイミングで情報(WAL)ではなく正義の鉄槌を喰らわせます。
要するに、「おとり捜査」が得意です。
ーースペシャル袋とじ:みつおの全容(無駄に長いけど興味のある人は開いてね★)ーー
ーー8<ーー
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <string.h>
#define MYSOCK_FMT "/tmp/.s.PGSQL.%d"
#define INT8OID 20
#define INT4OID 23
#define TEXTOID 25
// Global parameters
char *systemid;
char *timeline;
char *xlogpos;
#define AUTH_REQ_OK 0
typedef struct {
char type;
int mes_len;
int ret_code;
} auth_ok_struct;
typedef struct {
char type;
int mes_len;
char mode;
} ready_for_query_struct;
typedef struct {
char type;
int mes_len;
char *command_tag;
} command_complete_struct;
typedef struct {
char *field_name;
int rel_oid;
short rel_field_oid;
int field_oid;
short typlen;
int atttypmod;
short field_format_code;
} row_description_field_struct;
typedef struct {
char type;
int mes_len;
short field_len;
row_description_field_struct *rdfs;
} row_description_struct;
typedef struct {
int systemid_len;
char *systemid;
int timeline_len;
char *timeline;
int xlogpos_len;
char *xlogpos;
int dbname_len;
char *dbname;
} response_identify_system_struct;
typedef struct {
char type;
int mes_len;
short field_len;
response_identify_system_struct *riss;
} data_row_struct;
typedef struct {
char type;
int mes_len;
int8_t copy_format;
short field_len;
short *field_format;
} copy_both_response_struct;
char buf[16777216];
void error_message(int line);
void pg_walsender_mitsuo(int sock);
int setRDFS(row_description_field_struct *rdfs, int field_number, const char *fieldname, int fieldoid, short typlen);
void setDRSandRISS(data_row_struct *drs, response_identify_system_struct *riss);
#include "pg_walsender_mitsuo.h"
/*
main function.
*/
int main(int argc, char *argv[]) {
// struct sockaddr_in client;
// struct sockaddr_in server;
struct sockaddr_un client;
struct sockaddr_un server;
int server_sock;
int client_sock;
int port = atoi(argv[1]);
systemid = argv[2];
timeline = argv[3];
xlogpos = "0/0";
char MYSOCK[100];
sprintf(MYSOCK, MYSOCK_FMT, port);
remove(MYSOCK);
// if ((server_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
if ((server_sock = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0)
error_message(__LINE__);
// server.sin_family = AF_INET;
// server.sin_addr.s_addr = htonl(INADDR_ANY);
// server.sin_port = htons(port);
server.sun_family = AF_LOCAL;
strcpy(server.sun_path, MYSOCK);
if (bind(server_sock, (struct sockaddr *)&server, sizeof(server)) < 0)
error_message(__LINE__);
if (listen(server_sock, 5) < 0) error_message(__LINE__);
while (1) {
int size = sizeof(client);
if ((client_sock = accept(server_sock, (struct sockaddr *)&client, (socklen_t *)&size)) < 0)
error_message(__LINE__);
pg_walsender_mitsuo(client_sock);
}
return 1;
}
void error_message(int line) {
printf("ERROR: LINE %d", line);
exit(1);
}
/*
buffer set functions.
*/
int setBuffer2(char *buf, int idx, char *wrk2){
buf[idx++] = wrk2[0]; buf[idx++] = wrk2[1];
return idx;
}
int setBuffer4(char *buf, int idx, char *wrk4){
buf[idx++] = wrk4[0]; buf[idx++] = wrk4[1];
buf[idx++] = wrk4[2]; buf[idx++] = wrk4[3];
return idx;
}
int setBuffer8(char *buf, int idx, char *wrk8){
buf[idx++] = wrk8[0]; buf[idx++] = wrk8[1];
buf[idx++] = wrk8[2]; buf[idx++] = wrk8[3];
buf[idx++] = wrk8[4]; buf[idx++] = wrk8[5];
buf[idx++] = wrk8[6]; buf[idx++] = wrk8[7];
return idx;
}
/*
mitsuo main.
*/
void pg_walsender_mitsuo(int sock) {
int mes_size;
int idx;
/* RECEIVE FROM WALRECEIVER */
if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);
printf("電話「トゥルゥルル・・・」\n");
getchar();
/* SEND ATUH_OK TO WALRECEIVER */
auth_ok_struct aos;
memcpy(&aos.type, "R", 1);
aos.mes_len = 8;
aos.ret_code = AUTH_REQ_OK;
mes_size = 9;
memcpy(buf, (const void *)&aos, mes_size);
char wrk4[4];
idx = 1;
*(int *)wrk4 = htonl(aos.mes_len);
idx = setBuffer4(buf, idx, wrk4);
*(int *)wrk4 = htonl(aos.ret_code);
idx = setBuffer4(buf, idx, wrk4);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
/* SEND READY_FOR_QUERY TO WALRECEIVER */
ready_for_query_struct rfqs;
memcpy(&rfqs.type, "Z", 1);
rfqs.mes_len = 5;
memcpy(&rfqs.mode, "I", 1);
mes_size = 6;
memcpy(buf, (const void *)&rfqs, mes_size);
*(int *)wrk4 = htonl(rfqs.mes_len);
idx = setBuffer4(buf, 1, wrk4);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
printf("みつお「もしもし」\n");
/* RECEIVE FROM WALRECEIVER */
memset(buf, 0, sizeof(buf));
if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);
printf("詐欺師「オレだよオレオレ」\n");
getchar();
/* SEND RESULT OF IDENTIFY_SYSTEM TO WALRECEIVER */
// 1. send tuple descriptor
row_description_struct rds;
row_description_field_struct rdfs[4];
memcpy(&rds.type, "T", 1);
rds.mes_len = 78; // rds(fixed 6) + rdfs(fixed 18 x field 4)
rds.mes_len += setRDFS(rdfs, 1, "systemid", TEXTOID, -1);
rds.mes_len += setRDFS(rdfs, 2, "timeline", INT4OID, 4);
rds.mes_len += setRDFS(rdfs, 3, "xlogpos", TEXTOID, -1);
rds.mes_len += setRDFS(rdfs, 4, "dbname", TEXTOID, -1);
rds.field_len = 4;
rds.rdfs = (row_description_field_struct *)malloc(rds.mes_len);
memcpy(rds.rdfs, (const void *)&rdfs, rds.mes_len);
mes_size = rds.mes_len + 1;
memcpy(&buf[0], (const void *)&rds.type, 1);
*(int *)wrk4 = htonl(rds.mes_len);
idx = setBuffer4(buf, 1, wrk4);
char wrk2[2];
*(short *)wrk2 = htons(rds.field_len);
idx = setBuffer2(buf, idx, wrk2);
for(int i = 0; i < 4; i++){
memcpy(&buf[idx], rdfs[i].field_name, strlen(rdfs[i].field_name)+1);
idx += strlen(rdfs[i].field_name)+1;
*(int *)wrk4 = htonl(rdfs[i].rel_oid);
idx = setBuffer4(buf, idx, wrk4);
*(short *)wrk2 = htons(rdfs[i].rel_field_oid);
idx = setBuffer2(buf, idx, wrk2);
*(int *)wrk4 = htonl(rdfs[i].field_oid);
idx = setBuffer4(buf, idx, wrk4);
*(short *)wrk2 = htons(rdfs[i].typlen);
idx = setBuffer2(buf, idx, wrk2);
*(int *)wrk4 = htonl(rdfs[i].atttypmod);
idx = setBuffer4(buf, idx, wrk4);
*(short *)wrk2 = htons(rdfs[i].field_format_code);
idx = setBuffer2(buf, idx, wrk2);
}
// 2. send data
data_row_struct drs;
response_identify_system_struct riss;
setDRSandRISS(&drs, &riss);
mes_size += drs.mes_len + 1;
memcpy(&buf[idx++], (const void *)&drs.type, 1);
*(int *)wrk4 = htonl(drs.mes_len);
idx = setBuffer4(buf, idx, wrk4);
*(short *)wrk2 = htons(drs.field_len);
idx = setBuffer2(buf, idx, wrk2);
// systemid
*(int *)wrk4 = htonl(riss.systemid_len);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx], riss.systemid, strlen(riss.systemid));
idx += strlen(riss.systemid);
// timeline
*(int *)wrk4 = htonl(1);
idx = setBuffer4(buf, idx, wrk4);
*(int *)wrk4 = htonl(atoi(riss.timeline));
buf[idx++] = 0x31;
// xlogpos
*(int *)wrk4 = htonl(riss.xlogpos_len);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx], riss.xlogpos, strlen(riss.xlogpos));
idx += strlen(riss.xlogpos);
// dbname
*(int *)wrk4 = htonl(riss.dbname_len);
//*(int *)wrk4 = htonl(-1);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx], riss.dbname, strlen(riss.dbname));
idx += strlen(riss.dbname);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
// 3. send command_complete
command_complete_struct ccs;
memcpy(&ccs.type, "C", 1);
ccs.mes_len = 20;
ccs.command_tag = (char *)malloc(strlen("IDENTIFY_SYSTEM")+1);
memset(ccs.command_tag, '\0', strlen("IDENTIFY_SYSTEM")+1);
memcpy(ccs.command_tag, "IDENTIFY_SYSTEM", strlen("IDENTIFY_SYSTEM"));
mes_size = 21;
idx = 0;
memcpy(&buf[idx++], (const void *)&ccs.type, 1);
*(int *)wrk4 = htonl(ccs.mes_len);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx], ccs.command_tag, strlen(ccs.command_tag)+1);
idx += strlen(ccs.command_tag);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
// 4. send ready_for_query
memcpy(&rfqs.mode, "I", 1);
mes_size = 6;
memcpy(buf, (const void *)&rfqs, mes_size);
//memcpy(&buf[idx++], (const void *)&rfqs.type, 1);
idx = 1;
*(int *)wrk4 = htonl(rfqs.mes_len);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx++], (const void *)&rfqs.mode, 1);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
printf("みつお「kingtomo、どうした?」\n");
/* RECEIVE FROM WALRECEIVER */
if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);
printf("詐欺師「事業に失敗して金が必要なんだ・・・」\n");
char *char_h = (char *)malloc(mes_size+1);
char *char_l = (char *)malloc(mes_size+1);
memset(char_h, '\0', mes_size+1);
memset(char_l, '\0', mes_size+1);
memcpy(char_h, buf+5, mes_size-5);
memcpy(char_l, buf+5, mes_size-5);
char_h = strchr(char_h, ' ') + 1;
char_l = strchr(char_l, '/') + 1;
char *eos = (char *)malloc(1);
eos = strchr(char_h, '/');
memset(eos, '\0', 1);
eos = strchr(char_l, ' ');
memset(eos, '\0', 1);
/* SEND WAL DATA */
// 1. response for START_STREAMING
copy_both_response_struct cbrs;
memcpy(&cbrs.type, "W", 1);
cbrs.mes_len = 7;
cbrs.copy_format = 0;
cbrs.field_len = 0;
mes_size = 8;
memcpy(buf, (const void *)&cbrs, mes_size);
*(int *)wrk4 = htonl(cbrs.mes_len);
idx = 1;
idx = setBuffer4(buf, idx, wrk4);
buf[idx++] = 0x30;
*(short *)wrk2 = htons(cbrs.field_len);
idx = setBuffer2(buf, idx, wrk2);
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
printf("みつお「そうかい・・・」\n");
getchar();
// 2. copy data
// type 'd', leng(int32), bytes(type 'w', startLSN(int64), endLSN(int64), time(int64), bytes(wal records))
int dummy_record_size = 16777216 - 30;
// SEND Dummy_Record
memcpy(buf, "d", 1);
idx = 1;
*(int *)wrk4 = htonl(dummy_record_size);
idx = setBuffer4(buf, idx, wrk4);
memcpy(&buf[idx++], "w", 1);
char wrk8[8];
// startLSN
int lsn_h = strtol(char_h, &eos, 16);
int lsn_l = strtol(char_l, &eos, 16);
*(int *)wrk4 = htonl(lsn_h);
idx = setBuffer4(buf, idx, wrk4);
*(int *)wrk4 = htonl(lsn_l);
idx = setBuffer4(buf, idx, wrk4);
// endLSN
lsn_h = 0xffffffff;
lsn_l = 0xffffffff;
*(int *)wrk4 = htonl(lsn_h);
idx = setBuffer4(buf, idx, wrk4);
*(int *)wrk4 = htonl(lsn_l);
idx = setBuffer4(buf, idx, wrk4);
// TIME
*(long *)wrk8 = htonl(0);
idx = setBuffer8(buf, idx, wrk8);
// WAL RECORD
memcpy(&buf[idx++], "0", dummy_record_size);
//memcpy(&buf[idx++], (char *)((void *)255), dummy_record_size);
mes_size = 16777216;
if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
close(sock);
printf("またつまらぬものを斬ってしまった・・・\n");
}
/*
row description field set function.
*/
int setRDFS(row_description_field_struct *rdfs, int field_number, const char *fieldname, int fieldoid, short typlen){
field_number--;
rdfs[field_number].field_name = (char *)malloc(strlen(fieldname)+1);
memset(rdfs[field_number].field_name, '\0', strlen(fieldname)+1);
memcpy(rdfs[field_number].field_name, fieldname, strlen(fieldname));
rdfs[field_number].rel_oid = 0;
rdfs[field_number].rel_field_oid = 0;
rdfs[field_number].field_oid = fieldoid;
rdfs[field_number].typlen = typlen;
rdfs[field_number].atttypmod = -1;
rdfs[field_number].field_format_code = 1;
return strlen(fieldname)+1;
}
/*
data row and response_identify_system set function.
*/
void setDRSandRISS(data_row_struct *drs, response_identify_system_struct *riss){
memcpy(&((*drs).type), "D", 1);
(*drs).mes_len = 22; // drs.mes_len(4) + drs.field_len(2) + riss.xxx_len(16)
// about systemid
(*riss).systemid_len = 19;
(*riss).systemid = (char *)malloc((*riss).systemid_len+1);
memset((*riss).systemid, '\0', (*riss).systemid_len+1);
memcpy((*riss).systemid, systemid, (*riss).systemid_len);
// about timeline
(*riss).timeline_len = strlen(timeline);
(*riss).timeline = (char *)malloc((*riss).timeline_len+1);
memset((*riss).timeline, '\0', (*riss).timeline_len+1);
memcpy((*riss).timeline, timeline, (*riss).timeline_len);
// about xlogpos
(*riss).xlogpos_len = strlen(xlogpos);
(*riss).xlogpos = (char *)malloc((*riss).xlogpos_len+1);
memset((*riss).xlogpos, '\0', (*riss).xlogpos_len+1);
memcpy((*riss).xlogpos, xlogpos, (*riss).xlogpos_len);
// about dbname
(*riss).dbname_len = 6;
(*riss).dbname = (char *)malloc((*riss).dbname_len+1);
memset((*riss).dbname, '\0', (*riss).dbname_len+1);
memcpy((*riss).dbname, "mitsuo", (*riss).dbname_len);
(*drs).mes_len += 19 + (*riss).timeline_len + (*riss).xlogpos_len + (*riss).dbname_len;
(*drs).field_len = 4;
(*drs).riss = (response_identify_system_struct *)malloc((*drs).mes_len);
memcpy((*drs).riss, (const void *)&riss, (*drs).mes_len);
}
密着!みつお24時!!
これは、ある日のみつおに密着して彼の日常を観察したドキュメンタリーである。
事件発生
事件発生の連絡を受け、みつおは急いで現場に向かった。
みつお「こりゃ、完全にカモられちょるわ。」
そう呟くと、手際よくMasterを停止した。
事情聴取
みつお曰く、一度カモにしたところには再度魔の手が忍びよるとのこと。
再犯に及んだところを叩くのがみつお流捜査の基本らしい。
犯人の手がかりを聞くべく、みつおは疲労困憊している被害者に優しく囁いた。
みつお「pg_controldata
」
みつお「ありがとう。これで犯人を迎え撃てます」
Punishment on behalf of the moon!
聞き出した情報を元にスタンバイし、待つこと数秒。
みつお「キタ━(゚∀゚)━!」
みつお「落ち着いてよく聞くんだ。...あの言葉を教えて。...ぼくも一緒に言う。」
準備はよろしいでしょうか?ご唱和ください!
みつお「バルス!!」
見事撃退した!と思いきや、浮かない顔のみつお。状況を聞いてみると、、、
みつお「よく見てください。今やっつけたのは、受け子の一人だけなんです。黒幕は今ものうのうと悪事を働いてる。」
みつお「今後は黒幕に一矢報いるためにも、より強力なバルスを身につけるよう精進するよ・・・」
そう言い残すと、次の現場を求めて夜の街に溶けていった。
ーFinー
現場からは以上です。
まとめ
今年は、真面目なテーマである「特殊詐欺」について、PostgreSQLで憂さ晴らししました。
学びとしては、
- 社会問題をテーマに取り上げ、家族対話のきっかけを創造できました。皆様も年末年始に家族で作戦会議しておくことオススメします!
- IDENTIFY_SYSTEMの些細なバグ修正でコミュニティ貢献もできました(CommitLog)。
- 「バルス(空のWALレコード)」では、中途半端な撃退しかできませんでしたが、イカれたWALレコードを構築・送出すればよさそうです。改めてWALの奥深さを知ることができました。
これからも楽しくPostgreSQLライフを送りたいと思います!