序
あるリプレイス案件で現行システムから新システムへのデータ移行を行った。
作業効率化のために色々とツールを作ったので、なぜそれを使ったのか含めて概要とソースの一部を公開します。
あんまりすごいことは書いていませんが、
「現行のデータをファイルでもらって、Oracleにとりあえず入れてSQLでザクザク加工してなんとか新のデータを作る」
みたいなことをやる仕事は一定量あり、作業を効率化するということは一定量の需要があると思います。
大量データ(数千万件とかザラ)を扱うので、パフォーマンスの工夫もしています。
こういうライトな環境を使うには、下記のメリット・デメリットがあります。
メリット
・環境構築が簡便である。
・SQLさえ書ければ誰でも業務仕様を実装できる。
・Loaderを使えば、どんなデータ種でも扱える
デメリット
・自由度が高い分、データ管理が面倒。
・段階移行をする場合、データ調査用に複数スナップをもつ。
・そのスナップからデータ戻しをしたりする
今回はこういったデメリットを弱める意図で作ったツールも含まれます。
移行環境利用の前提
- Oracle11gを利用する。EEなので、パーティションは使える。
- 外部表は使わない
- ロジックはSQL・PL/SQLオンリー。JP1でジョブネットを作って、sql実行シェルでサーバにおいたSQLをキックする。
- データのスナップは同一スキーマ・表領域に作成。
- DataPumpも使わない
早速ツール紹介に移ります。(ソースは記事の末尾に記載します。)
今回は以下の3つの作業を効率的にするツールを紹介します。
- データスナップ
- データコピー
- パーティションテーブル圧縮
1. データスナップ
概要
テーブルのスナップを日付つきのプレフィックスでスナップをとる。
何のデータかあとから見てわかるようにコメント備考をつける。
工夫
ログテーブルにバックアップ履歴の書き出しを行う。
圧縮属性をつける
NOLOGGING属性をつける
バックアップ履歴をとる理由は、移行スケジュールがながければ長いほどどのテーブルがいつのスナップかわらなくなる。ただし、たまに参照することもあるので、ザクザク消すこともできない。
エクセルで管理するなんてバカバカしいということで、
「いつ、だれが、何の目的で」スナップをとったかわかる履歴テーブルを作成する。
使用例
売上テーブルのスナップを作成。
exec sp_backup_table('TR_SALES','売上テーブルのスナップ')
bkup COMPRESS NOLOGGING ,TR_SALES,B1218TR_SALES,
CREATE TABLE B1218TR_SALES COMPRESS NOLOGGING AS SELECT * FROM TR_SALES;
COMMENT ON TABLE B1218TR_SALES IS '売上テーブルのスナップ bkup TR_SALES → B1211TR_SALES by osuser at 2015/12/11 00:19:13'
PL/SQLが実行されました(390 msec.)
--------------------------------------------------------------------------------
売上テーブルのスナップ作成履歴を参照。
SELECT * FROM LG_BACKUP_LOAD_TABLE a where a.FROM_TABLE_NAME = 'TR_SALES'
SEQ_NUMBER KBN EXEC_ID KSN_YMD TO_TABLE_NAME FROM_TABLE_NAME BIKO KSN_USER KSN_DATE
130001 BACKUP FB999 20151211 B1218TR_SALES TR_SALES 売上テーブルのスナップ bkup TR_SALES → B1218TR_SALES by osuser at 2015/12/18 00:19:13 osuser 2015-12-11 00:19:13
2. データコピー
概要
パーティション単位でダイレクト・パス・インサートする。
本番並行稼働しているときに、すでに稼働済みのデータを移行で使用するということはよくある。
その際に、DBLINKで検証環境にデータを引っ張ってコピーする。
※ちなみに、DBLINKを使ったデータのPUSHではappend ヒントをつけてもダイレクトパスインサートは無効となる。
工夫
・排他ロックのダイレクトパスインサートでも並列実行できるようにする(1月、2月、3月、4月計上日のデータを4並列でINSERTするなど)
・予めテーブルに圧縮属性をつけておけばセグメント圧縮された状態でデータコピーができる。
・(並列度によっては)DataPumpより速くすることも可能
・移行環境と本番環境とカラム数が違う場合があるので、必要カラムを絞り込むロジックを入れる
※移行環境ではトレースキーなど、本番には無い項目をADDしておくことがある。
・カラム選択のときに本番のディクショナリを参照するが、権限管理&オペミス防止のためテーブル自体には参照専用で別スキーマからシノニム参照している場合があるので、テーブル実体のスキーマ名も引数に入れる
使用例
本番環境のSALSEスキーマの売上テーブルの20151101日付のデータを移行環境の同名のテーブルにコピーする。
--------------------------------------------------------------------------------
exec SP_LOAD_DATA_PARTITIONS('TR_SALES','TR_SALES','20151101','20151101','DBLINK_HONBAN','SALES')
--■実行
INSERT PARTITION : P_1511_01
TR_SALES: 20151101 165258 件
PL/SQLが実行されました(30810 msec.)
--------------------------------------------------------------------------------
3. パーティションテーブル圧縮
概要
大量データをジャンジャカ作るデータ移行にとっては表領域のリソース不足は死活問題。
しかも、移行は検証環境の1スキーマを間借りして利用することがあるので、慢性的に表領域は足りないことはよくある。
かつ、並列処理をしないとまず時間内に移行が終わらないために並列実行するようにすることはよくあるが、適切なパーティション設計がなされていないとダイレクト・パス・インサートは使えない(表ロックしてしまうので)
そこで、パーティション単位の圧縮ツールを作った。(JP1ジョブにも組み込めるようにPL/SQL化しておく)
#ちなみに、OLTP圧縮も使ったのだが、UNDOを著しく使って並列処理には向いてなかった。この辺の検証もいつかしたい。
工夫
・紐づく索引のリビルドを入れた(単にMOVE COMPRESSするとインデックスは無効になる)。
この処理は結構遅いので、パラレル度4でリビルド。
・パーティション単位で任意のレンジの圧縮ができるようにする(データ作成したところから圧縮する)
・複数テーブルを同タイミングで圧縮するため、管理テーブルを作成して集約コードを引数にして実行する。
使用例
TR_SALESの11/1パーティションを圧縮。集約コードはDA000(例なので、1テーブルのみ。
exec SP_COMPRESS_PARTITION_TABLES('DA000','P_1511_01','P_1511_01')
■パーティションCOMPRESS 実行
ALTER TABLE TR_SALES MOVE PARTITION P_1511_01 COMPRESS
ALTER INDEX IX_TR_SALES01 REBUILD PARTITION P_1511_01 PARALLEL 4
ALTER INDEX IX_TR_SALES02 REBUILD PARTITION P_1511_01 PARALLEL 4
ALTER INDEX IX_TR_SALES03 REBUILD PARTITION P_1511_01 PARALLEL 4
ALTER INDEX PK_TR_SALES REBUILD PARTITION P_1511_01 PARALLEL 4
PL/SQLが実行されました(11888 msec.)
まとめ
データ移行というのは、スピートと品質が求められる割には、人的リソースも環境的リソースも足りないことが多い。
短期的に使用する環境であることから一時的に人は投入されるがリソース割り当てにお金を使うという選択はされづらい。
このような中で速く正確に確実に作業ができるような仕組みづくりを心がけた。
今回は、その一部です。
移行のプログラムもたくさん設計・構築したので、通常のバッチ処理とどう違うのか等を気が向いたら書ければと思います。
最後にソースを末尾に記載します。
付録:ツールのソース(PL/SQL)
1.データスナップ
CREATE OR REPLACE PROCEDURE sp_backup_table(
p_table_name in varchar2
,p_table_comment in varchar2 default null
,p_bkup_table_prefix in varchar2 default null
,p_compress_option in char default 'Y'
)
IS
w_bk_table_prefix VARCHAR2(10);
w_bk_table_name VARCHAR2(30);
w_compress_clause VARCHAR2(30);
w_bk_comment VARCHAR2 (4000);
w_os_user VARCHAR2(30);
BEGIN
IF p_compress_option = 'Y' THEN
w_compress_clause := ' COMPRESS NOLOGGING ';
ELSIF p_compress_option = 'N' THEN
w_compress_clause := ' ';
END IF
;
IF p_bkup_table_prefix IS NULL THEN
w_bk_table_prefix := 'B' || TO_CHAR(SYSDATE,'MMDD');
ELSIF p_bkup_table_prefix IS NOT NULL THEN
w_bk_table_prefix := p_bkup_table_prefix;
END IF
;
w_bk_table_name := SUBSTRB(w_bk_table_prefix || p_table_name,1,30);
SELECT substr(sys_context('userenv','OS_USER'),1,30) INTO w_os_user FROM dual;
w_bk_comment := p_table_comment;
w_bk_comment := w_bk_comment||' bkup '||p_table_name||' → '||w_bk_table_name||' by '||w_os_user||' at '||to_char(sysdate,'YYYY/MM/DD HH24:MI:SS');
DBMS_OUTPUT.PUT_LINE('bkup'||w_compress_clause||','||p_table_name||','||w_bk_table_name||',');
EXECUTE IMMEDIATE 'CREATE TABLE ' || w_bk_table_name ||
' COMPRESS AS SELECT * FROM ' || p_table_name;
EXECUTE IMMEDIATE 'COMMENT ON TABLE '||w_bk_table_name||' IS '''||w_bk_comment||'''';
-- LOG登録
SP_GET_LOG_BAKUP_LOAD_TABLE(
P_TO_TABLE_NAME => w_bk_table_name
,P_FROM_TABLE_NAME => p_table_name
,P_KBN => 'BACKUP'
,P_BIKO => w_bk_comment
)
;
DBMS_OUTPUT.PUT_LINE('CREATE TABLE ' || w_bk_table_name ||w_compress_clause||'AS SELECT * FROM ' || p_table_name||';');
DBMS_OUTPUT.PUT_LINE('COMMENT ON TABLE '||w_bk_table_name||' IS '''||w_bk_comment||'''');
-- DBMS_OUTPUT.PUT_LINE(w_bk_table_name);
END;
/
2.データコピー
CREATE OR REPLACE PROCEDURE SP_LOAD_DATA_PARTITIONS(
p_from_table_name IN VARCHAR2
,p_to_table_name IN VARCHAR2
,p_start_ymd IN VARCHAR2
,p_end_ymd IN VARCHAR2
,p_db_link IN VARCHAR2
,p_from_table_owner IN VARCHAR2 DEFAULT 'DBLINK_TO_SCHEMA'
)
/* パーティションメンテナンスDDL出力 日付レンジパーティション未来分*/
IS
w_part_key_column USER_PART_KEY_COLUMNS.COLUMN_NAME%TYPE;
w_date CHAR(8 CHAR);
w_part VARCHAR2 (16 CHAR);
w_cnt NUMBER;
w_select VARCHAR2(512 CHAR);
w_select_2 VARCHAR2(512 CHAR);
w_sql VARCHAR2(10240 CHAR);
v_sta_time NUMBER := DBMS_UTILITY.GET_TIME;
into_column USER_TAB_COLUMNS.COLUMN_NAME%TYPE;
w_column VARCHAR2(5120 CHAR);
w_from_owner VARCHAR2(60 CHAR);
BEGIN
IF p_from_table_owner = 'DBLINK_TO_SCHEMA' THEN
w_select_2 := 'SELECT USERNAME FROM DBA_USERS@'||p_db_link ;
EXECUTE IMMEDIATE w_select_2 INTO w_from_owner;
ELSE
w_from_owner := p_from_table_owner;
END IF;
DBMS_OUTPUT.PUT_LINE('--■実行');
/* ディクショナリを参照してパーティションキーを取得 */
w_select := 'SELECT MAX(COLUMN_NAME) FROM DBA_PART_KEY_COLUMNS@'||p_db_link||' where NAME = '''||p_from_table_name||''' AND OWNER = '''||w_from_owner||''' ';
EXECUTE IMMEDIATE w_select INTO w_part_key_column;
/* ディクショナリを参照してカラム数を取得 */
w_select := 'SELECT MAX(COLUMN_ID) FROM DBA_TAB_COLUMNS@'||p_db_link||' WHERE TABLE_NAME = '''||p_from_table_name||''' AND OWNER = '''||w_from_owner||'''';
EXECUTE IMMEDIATE w_select INTO w_cnt;
FOR i IN 1 .. w_cnt LOOP
w_select := 'SELECT COLUMN_NAME FROM DBA_TAB_COLUMNS@'||p_db_link||' WHERE TABLE_NAME = '''||p_from_table_name||''' AND COLUMN_ID = '''|| i ||''' AND OWNER = '''||w_from_owner||'''';
EXECUTE IMMEDIATE w_select INTO into_column;
w_column := w_column ||','|| into_column;
END LOOP;
/*先頭のカンマを除去*/
w_column := SUBSTR(w_column,2,LENGTH(w_column)-1);
/* 開始・終了の差を取得 */
w_cnt := to_date(p_end_ymd,'YYYYMMDD') - to_date(p_start_ymd,'YYYYMMDD');
FOR i IN 0 .. w_cnt LOOP
w_date := to_char(to_date(p_start_ymd,'YYYYMMDD') + i ,'YYYYMMDD');
w_part := 'P_'||to_char(to_date(p_start_ymd,'YYYYMMDD') + i ,'YYMM_DD');
w_sql := 'INSERT /*+ APPEND */ INTO '||p_to_table_name||' PARTITION ('||w_part||') ('||w_column||') SELECT * FROM '||p_from_table_name||'@'||p_db_link||' WHERE '||w_part_key_column||' = '''||w_date||'''';
DBMS_OUTPUT.PUT_LINE('INSERT PARTITION : '||w_part);
EXECUTE IMMEDIATE w_sql;
DBMS_OUTPUT.PUT_LINE(p_to_table_name|| ': '||w_date||' '||SQL%ROWCOUNT||' 件 ');
COMMIT;
END LOOP;
EXCEPTION
WHEN NO_DATA_FOUND THEN
DBMS_OUTPUT.PUT_LINE('パーテション・カラム取得に失敗');
ROLLBACK;
RAISE;
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('データコピーに失敗');
ROLLBACK;
RAISE;
END;
/
3.データ圧縮
CREATE OR REPLACE PROCEDURE sp_compress_partition_tables(
P_TRUNCATE_KBN IN VARCHAR2
,P_PARTITION_FROM IN VARCHAR2
,P_PARTITION_TO IN VARCHAR2
,P_EXECUTE_MODE IN VARCHAR2 DEFAULT 'EXECUTE'
)
IS
CURSOR c1
IS
SELECT
pt.TABLE_NAME
,t.PARTITION_NAME
FROM
KR_IKO_TRUNCATE_PT_TABLES pt
,USER_TAB_PARTITIONS t
WHERE 1=1
AND pt.TABLE_NAME = t.TABLE_NAME
AND pt.TRUNCATE_KBN = P_TRUNCATE_KBN
AND t.PARTITION_NAME BETWEEN P_PARTITION_FROM AND P_PARTITION_TO
order by t.TABLE_NAME,t.PARTITION_NAME
;
w_table_name VARCHAR2(30 CHAR);
w_partition_name VARCHAR2(30 CHAR);
w_ddl VARCHAR2(1024 CHAR);
w_ddl2 VARCHAR2(1024 CHAR);
BEGIN
IF P_EXECUTE_MODE = 'EXECUTE' THEN
DBMS_OUTPUT.PUT_LINE('■パーティションCOMPRESS 実行');
FOR c1_rec IN c1 LOOP
w_table_name := c1_rec.TABLE_NAME;
w_partition_name := c1_rec.PARTITION_NAME;
-- DDL文
w_ddl := 'ALTER TABLE '||w_table_name||' MOVE PARTITION '||w_partition_name||' COMPRESS';
DBMS_OUTPUT.PUT_LINE(w_ddl);
EXECUTE IMMEDIATE w_ddl;
FOR IND_REC IN (
SELECT
pi.INDEX_NAME
,pi.PARTITION_NAME
FROM
USER_IND_PARTITIONS pi
,USER_INDEXES i
WHERE 1=1
AND pi.INDEX_NAME = i.INDEX_NAME
AND i.TABLE_NAME = w_table_name
AND pi.PARTITION_NAME = w_partition_name
order by pi.INDEX_NAME
)
LOOP
w_ddl2 := 'ALTER INDEX '||IND_REC.INDEX_NAME||' REBUILD PARTITION '||IND_REC.PARTITION_NAME||' PARALLEL 4';
DBMS_OUTPUT.PUT_LINE(w_ddl2);
EXECUTE IMMEDIATE w_ddl2;
END LOOP
;
END LOOP;
ELSIF P_EXECUTE_MODE = 'VERIFY' THEN
DBMS_OUTPUT.PUT_LINE('■パーティションTRUNCATE ★VERIFYモード');
FOR c1_rec IN c1 LOOP
w_table_name := c1_rec.TABLE_NAME;
w_partition_name := c1_rec.PARTITION_NAME;
-- DDL文
w_ddl := 'ALTER TABLE '||w_table_name||' MOVE PARTITION '||w_partition_name||' COMPRESS';
DBMS_OUTPUT.PUT_LINE(w_ddl);
FOR IND_REC IN (
SELECT
pi.INDEX_NAME
,pi.PARTITION_NAME
FROM
USER_IND_PARTITIONS pi
,USER_INDEXES i
WHERE 1=1
AND pi.INDEX_NAME = i.INDEX_NAME
AND i.TABLE_NAME = w_table_name
AND pi.PARTITION_NAME = w_partition_name
order by pi.INDEX_NAME
)
LOOP
w_ddl2 := 'ALTER INDEX '||IND_REC.INDEX_NAME||' REBUILD PARTITION '||IND_REC.PARTITION_NAME||' PARALLEL 4';
DBMS_OUTPUT.PUT_LINE(w_ddl2);
END LOOP
;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('EXECUTE_MODE が不正');
END IF;
END;
/