4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PostgreSQLAdvent Calendar 2023

Day 23

オレオレPostgreSQL!

Posted at

この記事はPostgreSQL Advent Calendar 2023の23日目の記事です。

はじめに

早いもので、恒例のこの季節になりました。
毎年wktkしながら書いているのですが、かみさんから「あんたのだけ、毎年くだらないな」的なことを言われたため、今年は真面目なテーマにしようと悩みました。

そんなある日、在宅ワークしていると自治体のアナウンスが流れてきました。

自治体のアナウンス:「近頃、警察を名乗って詐欺の電話がかかってきます。ご注意ください〜」

「特殊詐欺」っていうやつ?通称、オレオレ詐欺。

オレオレ詐欺とは
警察庁・特殊詐欺対策ページより:

親族、警察官、弁護士等を装い、親族が起こした事件・事故に対する示談金等を名目に金銭等をだまし取る(脅し取る)手口です。

まだ流行ってるんだなぁと思いながら、「せやっ!これをテーマにしよう!!」と閃めきました。

ちなみに、私(?)は、母に2回、父・姉にそれぞれ1回の前科4犯です。
なんたる悪党、成敗してくれようぞ!

オレオレ詐欺とPostgreSQL

「オレオレ詐欺とPostgreSQLに何の関係があるのさ?」とツッこむ方もいらっしゃると思いますので、はじめに簡単に説明します。

オレオレ詐欺の流れ

次の図はオレオレ詐欺の一般的なフローです。
オレオレ詐欺.png

とてもシンプルなやりとりで大金を手に入れていることがわかります。
全く同じ流れがPostgreSQLの"ある処理"で見ることができます。

PostgreSQLのある処理の流れ

懸命な読者はもうお分かりですね。
StreamReplication.png

そうです。StreamReplicationの処理はオレオレ詐欺だったのです。
ということで、詐欺師(Standby)を懲らしめてやりましょう。

詐欺師(Standby)の手口

前述のフローは簡略化されているため、より具体的な手口をマニュアルに記載されている「詐欺師の近づき方」や「詐欺師の常套句」等で詳細を確認します。

コマンド 挙動 期待してる返却値 備考
connect 接続&認証 AuthenticationOk
[Byte1('R'),
int32x2]
detail
IDENTIFY_SYSTEM 相手の確認 systemid[text],
timeline[int8],
xlogpos[text],
dbname[text]
detail
START_REPLICATION 情報の要求 XLogData
[Byte1('w'),
int64x3,
Byten]
detail

上記やりとりした後は、情報(WAL)が振り込まれてくるので、それを受け取っているようです。

真打登場

一通りの手口を確認したところで、助っ人Gマン「みつお」を召喚します。
コードネームは「ぴーじーわるせんだーみつお」。

彼は、あたかもマスターサーバ(pg_walsender)のように振る舞い、情報の要求が会ったタイミングで情報(WAL)ではなく正義の鉄槌を喰らわせます。
要するに、「おとり捜査」が得意です。

みつお.png

ーースペシャル袋とじ:みつおの全容(無駄に長いけど興味のある人は開いてね★)ーー

ーー8<ーー
pg_walsender_mitsuo.h
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <string.h>

#define MYSOCK_FMT "/tmp/.s.PGSQL.%d"
#define INT8OID 20
#define INT4OID 23
#define TEXTOID 25

// Global parameters
char *systemid;
char *timeline;
char *xlogpos;

#define AUTH_REQ_OK 0
typedef struct {
  char type;
  int  mes_len;
  int  ret_code;
} auth_ok_struct;

typedef struct {
  char type;
  int  mes_len;
  char mode;
} ready_for_query_struct;

typedef struct {
  char type;
  int  mes_len;
  char *command_tag;
} command_complete_struct;

typedef struct {
  char  *field_name;
  int   rel_oid;
  short rel_field_oid;
  int   field_oid;
  short typlen;
  int   atttypmod;
  short field_format_code;
} row_description_field_struct;
typedef struct {
  char  type;
  int   mes_len;
  short field_len;
  row_description_field_struct *rdfs;
} row_description_struct;

typedef struct {
  int  systemid_len;
  char *systemid;
  int  timeline_len;
  char  *timeline;
  int  xlogpos_len;
  char *xlogpos;
  int  dbname_len;
  char *dbname;
} response_identify_system_struct;
typedef struct {
  char  type;
  int   mes_len;
  short field_len;
  response_identify_system_struct *riss;
} data_row_struct;

typedef struct {
  char   type;
  int    mes_len;
  int8_t copy_format;
  short  field_len;
  short  *field_format;
} copy_both_response_struct;

char buf[16777216];

void error_message(int line);
void pg_walsender_mitsuo(int sock);
int  setRDFS(row_description_field_struct *rdfs, int field_number, const char *fieldname, int fieldoid, short typlen);
void setDRSandRISS(data_row_struct *drs, response_identify_system_struct *riss);
pg_walsender_mitsuo.c
#include "pg_walsender_mitsuo.h"

/*
  main function.
*/
int main(int argc, char *argv[]) {

//  struct sockaddr_in client;
//  struct sockaddr_in server;
  struct sockaddr_un client;
  struct sockaddr_un server;
  int server_sock;
  int client_sock;

  int port = atoi(argv[1]);
  systemid = argv[2];
  timeline = argv[3];
  xlogpos  = "0/0";

  char MYSOCK[100];
  sprintf(MYSOCK, MYSOCK_FMT, port);
  remove(MYSOCK);
//  if ((server_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
  if ((server_sock = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0)
    error_message(__LINE__);

//  server.sin_family = AF_INET;
//  server.sin_addr.s_addr = htonl(INADDR_ANY);
//  server.sin_port = htons(port);
  server.sun_family = AF_LOCAL;
  strcpy(server.sun_path, MYSOCK);

  if (bind(server_sock, (struct sockaddr *)&server, sizeof(server)) < 0)
    error_message(__LINE__);

  if (listen(server_sock, 5) < 0) error_message(__LINE__);

  while (1) {
    int size = sizeof(client);
    if ((client_sock = accept(server_sock, (struct sockaddr *)&client, (socklen_t *)&size)) < 0)
      error_message(__LINE__);

    pg_walsender_mitsuo(client_sock);
  }
  return 1;
}

void error_message(int line) {
  printf("ERROR: LINE %d", line);
  exit(1);
}

/*
  buffer set functions.
*/
int setBuffer2(char *buf, int idx, char *wrk2){
  buf[idx++] = wrk2[0]; buf[idx++] = wrk2[1];
  return idx;
}
int setBuffer4(char *buf, int idx, char *wrk4){
  buf[idx++] = wrk4[0]; buf[idx++] = wrk4[1];
  buf[idx++] = wrk4[2]; buf[idx++] = wrk4[3];
  return idx;
}
int setBuffer8(char *buf, int idx, char *wrk8){
  buf[idx++] = wrk8[0]; buf[idx++] = wrk8[1];
  buf[idx++] = wrk8[2]; buf[idx++] = wrk8[3];
  buf[idx++] = wrk8[4]; buf[idx++] = wrk8[5];
  buf[idx++] = wrk8[6]; buf[idx++] = wrk8[7];
  return idx;
}

/*
  mitsuo main.
*/
void pg_walsender_mitsuo(int sock) {
  int mes_size;
  int idx;

  /* RECEIVE FROM WALRECEIVER */
  if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);

  printf("電話「トゥルゥルル・・・」\n");
  getchar();

  /* SEND ATUH_OK TO WALRECEIVER */
  auth_ok_struct aos;
  memcpy(&aos.type, "R", 1);
  aos.mes_len = 8;
  aos.ret_code = AUTH_REQ_OK;
  mes_size = 9;
  memcpy(buf, (const void *)&aos, mes_size);
  char wrk4[4];
  idx = 1;
  *(int *)wrk4 = htonl(aos.mes_len);
  idx = setBuffer4(buf, idx, wrk4);
  *(int *)wrk4 = htonl(aos.ret_code);
  idx = setBuffer4(buf, idx, wrk4);
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  /* SEND READY_FOR_QUERY TO WALRECEIVER */
  ready_for_query_struct rfqs;
  memcpy(&rfqs.type, "Z", 1);
  rfqs.mes_len = 5;
  memcpy(&rfqs.mode, "I", 1);
  mes_size = 6;
  memcpy(buf, (const void *)&rfqs, mes_size);
  *(int *)wrk4 = htonl(rfqs.mes_len);
  idx = setBuffer4(buf, 1, wrk4);
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);
  
  printf("みつお「もしもし」\n");

  /* RECEIVE FROM WALRECEIVER */
  memset(buf, 0, sizeof(buf));
  if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);

  printf("詐欺師「オレだよオレオレ」\n");
  getchar();

  /* SEND RESULT OF IDENTIFY_SYSTEM TO WALRECEIVER */
  // 1. send tuple descriptor
  row_description_struct rds;
  row_description_field_struct rdfs[4];
  memcpy(&rds.type, "T", 1);
  rds.mes_len = 78; // rds(fixed 6) + rdfs(fixed 18 x field 4)
  rds.mes_len += setRDFS(rdfs, 1, "systemid", TEXTOID, -1);
  rds.mes_len += setRDFS(rdfs, 2, "timeline", INT4OID, 4);
  rds.mes_len += setRDFS(rdfs, 3, "xlogpos", TEXTOID, -1);
  rds.mes_len += setRDFS(rdfs, 4, "dbname", TEXTOID, -1);
  rds.field_len = 4;
  rds.rdfs = (row_description_field_struct *)malloc(rds.mes_len);
  memcpy(rds.rdfs, (const void *)&rdfs, rds.mes_len);
  mes_size = rds.mes_len + 1;
  memcpy(&buf[0], (const void *)&rds.type, 1);
  *(int *)wrk4 = htonl(rds.mes_len);
  idx = setBuffer4(buf, 1, wrk4);
  char wrk2[2];
  *(short *)wrk2 = htons(rds.field_len);
  idx = setBuffer2(buf, idx, wrk2);
  for(int i = 0; i < 4; i++){
    memcpy(&buf[idx], rdfs[i].field_name, strlen(rdfs[i].field_name)+1);
    idx += strlen(rdfs[i].field_name)+1;
    *(int *)wrk4 = htonl(rdfs[i].rel_oid);
    idx = setBuffer4(buf, idx, wrk4);
    *(short *)wrk2 = htons(rdfs[i].rel_field_oid);
    idx = setBuffer2(buf, idx, wrk2);
    *(int *)wrk4 = htonl(rdfs[i].field_oid);
    idx = setBuffer4(buf, idx, wrk4);
    *(short *)wrk2 = htons(rdfs[i].typlen);
    idx = setBuffer2(buf, idx, wrk2);
    *(int *)wrk4 = htonl(rdfs[i].atttypmod);
    idx = setBuffer4(buf, idx, wrk4);
    *(short *)wrk2 = htons(rdfs[i].field_format_code);
    idx = setBuffer2(buf, idx, wrk2);
  }

  // 2. send data
  data_row_struct drs;
  response_identify_system_struct riss;
  setDRSandRISS(&drs, &riss);
  mes_size += drs.mes_len + 1;
  memcpy(&buf[idx++], (const void *)&drs.type, 1);
  *(int *)wrk4 = htonl(drs.mes_len);
  idx = setBuffer4(buf, idx, wrk4);
  *(short *)wrk2 = htons(drs.field_len);
  idx = setBuffer2(buf, idx, wrk2);

  // systemid
  *(int *)wrk4 = htonl(riss.systemid_len);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx], riss.systemid, strlen(riss.systemid));
  idx += strlen(riss.systemid);

  // timeline
  *(int *)wrk4 = htonl(1);
  idx = setBuffer4(buf, idx, wrk4);
  *(int *)wrk4 = htonl(atoi(riss.timeline));
  buf[idx++] = 0x31;

  // xlogpos
  *(int *)wrk4 = htonl(riss.xlogpos_len);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx], riss.xlogpos, strlen(riss.xlogpos));
  idx += strlen(riss.xlogpos);

  // dbname
  *(int *)wrk4 = htonl(riss.dbname_len);
  //*(int *)wrk4 = htonl(-1);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx], riss.dbname, strlen(riss.dbname));
  idx += strlen(riss.dbname);

  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  // 3. send command_complete
  command_complete_struct ccs;
  memcpy(&ccs.type, "C", 1);
  ccs.mes_len = 20;
  ccs.command_tag = (char *)malloc(strlen("IDENTIFY_SYSTEM")+1);
  memset(ccs.command_tag, '\0', strlen("IDENTIFY_SYSTEM")+1);
  memcpy(ccs.command_tag, "IDENTIFY_SYSTEM", strlen("IDENTIFY_SYSTEM"));
  mes_size = 21;
  idx = 0;
  memcpy(&buf[idx++], (const void *)&ccs.type, 1);
  *(int *)wrk4 = htonl(ccs.mes_len);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx], ccs.command_tag, strlen(ccs.command_tag)+1);
  idx += strlen(ccs.command_tag);
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  // 4. send ready_for_query
  memcpy(&rfqs.mode, "I", 1);
  mes_size = 6;
  memcpy(buf, (const void *)&rfqs, mes_size);
  //memcpy(&buf[idx++], (const void *)&rfqs.type, 1);
  idx = 1;
  *(int *)wrk4 = htonl(rfqs.mes_len);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx++], (const void *)&rfqs.mode, 1);
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  printf("みつお「kingtomo、どうした?」\n");

  /* RECEIVE FROM WALRECEIVER */
  if ((mes_size = recv(sock, buf, 300, 0)) < 0) error_message(__LINE__);

  printf("詐欺師「事業に失敗して金が必要なんだ・・・」\n");
  char *char_h = (char *)malloc(mes_size+1);
  char *char_l = (char *)malloc(mes_size+1);
  memset(char_h, '\0', mes_size+1);
  memset(char_l, '\0', mes_size+1);
  memcpy(char_h, buf+5, mes_size-5);
  memcpy(char_l, buf+5, mes_size-5);
  char_h = strchr(char_h, ' ') + 1;
  char_l = strchr(char_l, '/') + 1;
  char *eos = (char *)malloc(1);
  eos = strchr(char_h, '/');
  memset(eos, '\0', 1);
  eos = strchr(char_l, ' ');
  memset(eos, '\0', 1);

  /* SEND WAL DATA */
  // 1. response for START_STREAMING
  copy_both_response_struct cbrs;
  memcpy(&cbrs.type, "W", 1);
  cbrs.mes_len = 7;
  cbrs.copy_format = 0;
  cbrs.field_len = 0;
  mes_size = 8;
  memcpy(buf, (const void *)&cbrs, mes_size);
  *(int *)wrk4 = htonl(cbrs.mes_len);
  idx = 1;
  idx = setBuffer4(buf, idx, wrk4);
  buf[idx++] = 0x30;
  *(short *)wrk2 = htons(cbrs.field_len);
  idx = setBuffer2(buf, idx, wrk2);
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  printf("みつお「そうかい・・・」\n");
  getchar();

  // 2. copy data
  // type 'd', leng(int32), bytes(type 'w', startLSN(int64), endLSN(int64), time(int64), bytes(wal records))
  int dummy_record_size = 16777216 - 30;
  // SEND Dummy_Record
  memcpy(buf, "d", 1);
  idx = 1;
  *(int *)wrk4 = htonl(dummy_record_size);
  idx = setBuffer4(buf, idx, wrk4);
  memcpy(&buf[idx++], "w", 1);
  char wrk8[8];
  // startLSN
  int lsn_h = strtol(char_h, &eos, 16);
  int lsn_l = strtol(char_l, &eos, 16);
  *(int *)wrk4 = htonl(lsn_h);
  idx = setBuffer4(buf, idx, wrk4);
  *(int *)wrk4 = htonl(lsn_l);
  idx = setBuffer4(buf, idx, wrk4);
  // endLSN
  lsn_h = 0xffffffff;
  lsn_l = 0xffffffff;
  *(int *)wrk4 = htonl(lsn_h);
  idx = setBuffer4(buf, idx, wrk4);
  *(int *)wrk4 = htonl(lsn_l);
  idx = setBuffer4(buf, idx, wrk4);
  // TIME
  *(long *)wrk8 = htonl(0);
  idx = setBuffer8(buf, idx, wrk8);
  // WAL RECORD
  memcpy(&buf[idx++], "0", dummy_record_size);
  //memcpy(&buf[idx++], (char *)((void *)255), dummy_record_size);
  mes_size = 16777216;
  if (send(sock, buf, mes_size, 0) != mes_size) error_message(__LINE__);

  close(sock);
  printf("またつまらぬものを斬ってしまった・・・\n");
}

/*
  row description field set function.
*/
int setRDFS(row_description_field_struct *rdfs, int field_number, const char *fieldname, int fieldoid, short typlen){
  field_number--;
  rdfs[field_number].field_name = (char *)malloc(strlen(fieldname)+1);
  memset(rdfs[field_number].field_name, '\0', strlen(fieldname)+1);
  memcpy(rdfs[field_number].field_name, fieldname, strlen(fieldname));
  rdfs[field_number].rel_oid = 0;
  rdfs[field_number].rel_field_oid = 0;
  rdfs[field_number].field_oid = fieldoid;
  rdfs[field_number].typlen = typlen;
  rdfs[field_number].atttypmod = -1;
  rdfs[field_number].field_format_code = 1;

  return strlen(fieldname)+1;
}

/*
  data row and response_identify_system set function.
*/
void setDRSandRISS(data_row_struct *drs, response_identify_system_struct *riss){
  memcpy(&((*drs).type), "D", 1);
  (*drs).mes_len = 22; // drs.mes_len(4) + drs.field_len(2) + riss.xxx_len(16)

  // about systemid
  (*riss).systemid_len = 19;
  (*riss).systemid = (char *)malloc((*riss).systemid_len+1);
  memset((*riss).systemid, '\0', (*riss).systemid_len+1);
  memcpy((*riss).systemid, systemid, (*riss).systemid_len);

  // about timeline
  (*riss).timeline_len = strlen(timeline);
  (*riss).timeline = (char *)malloc((*riss).timeline_len+1);
  memset((*riss).timeline, '\0', (*riss).timeline_len+1);
  memcpy((*riss).timeline, timeline, (*riss).timeline_len);

  // about xlogpos
  (*riss).xlogpos_len  = strlen(xlogpos);
  (*riss).xlogpos = (char *)malloc((*riss).xlogpos_len+1);
  memset((*riss).xlogpos, '\0', (*riss).xlogpos_len+1);
  memcpy((*riss).xlogpos, xlogpos, (*riss).xlogpos_len);

  // about dbname
  (*riss).dbname_len  = 6;
  (*riss).dbname = (char *)malloc((*riss).dbname_len+1);
  memset((*riss).dbname, '\0', (*riss).dbname_len+1);
  memcpy((*riss).dbname, "mitsuo", (*riss).dbname_len);
  (*drs).mes_len += 19 + (*riss).timeline_len + (*riss).xlogpos_len + (*riss).dbname_len;
  (*drs).field_len = 4;
  (*drs).riss = (response_identify_system_struct *)malloc((*drs).mes_len);
  memcpy((*drs).riss, (const void *)&riss, (*drs).mes_len);
}

密着!みつお24時!!

これは、ある日のみつおに密着して彼の日常を観察したドキュメンタリーである。

事件発生

事件発生の連絡を受け、みつおは急いで現場に向かった。

pic1.png

pic2.png

みつお「こりゃ、完全にカモられちょるわ。」

そう呟くと、手際よくMasterを停止した。

pic3.png

事情聴取

みつお曰く、一度カモにしたところには再度魔の手が忍びよるとのこと。
再犯に及んだところを叩くのがみつお流捜査の基本らしい。

犯人の手がかりを聞くべく、みつおは疲労困憊している被害者に優しく囁いた。
みつお「pg_controldata

pic4.png

みつお「ありがとう。これで犯人を迎え撃てます」

Punishment on behalf of the moon!

pic5.png

聞き出した情報を元にスタンバイし、待つこと数秒。

pic6.png

みつお「キタ━(゚∀゚)━!」

みつお「落ち着いてよく聞くんだ。...あの言葉を教えて。...ぼくも一緒に言う。」

準備はよろしいでしょうか?ご唱和ください!

みつお「バルス!!」(空のWALレコード!!)

pic7.png

見事撃退した!と思いきや、浮かない顔のみつお。状況を聞いてみると、、、

みつお「よく見てください。今やっつけたのは、受け子の一人だけなんです。黒幕は今ものうのうと悪事を働いてる。」

pic8.png

みつお「今後は黒幕に一矢報いるためにも、より強力なバルスを身につけるよう精進するよ・・・」

そう言い残すと、次の現場を求めて夜の街に溶けていった。
ーFinー

現場からは以上です。

まとめ

今年は、真面目なテーマである「特殊詐欺」について、PostgreSQLで憂さ晴らししました。
学びとしては、

  1. 社会問題をテーマに取り上げ、家族対話のきっかけを創造できました。皆様も年末年始に家族で作戦会議しておくことオススメします!
  2. IDENTIFY_SYSTEMの些細なバグ修正でコミュニティ貢献もできました(CommitLog)。
  3. 「バルス(空のWALレコード)」では、中途半端な撃退しかできませんでしたが、イカれたWALレコードを構築・送出すればよさそうです。改めてWALの奥深さを知ることができました。

これからも楽しくPostgreSQLライフを送りたいと思います!

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?