概要
MagentaのREADME.mdを読んでMagentaの音楽生成方法について理解を深めようというこころみ。グラフの生成を読んで、MIDIからどうやってデータ変換を行っているんだ?と疑問に思った。そこで、Pipelineについて読んでまとめてみます。
Data processing in Magenta
Piplienは入力のデータ形式を出力のデータ形式に変更するデータ処理のモジュールである。
この、Piplineどうしをつなぐことで簡単に新しいモジュールをつくることができる。
ソースコード毎の説明
- pipeline.py:Pipelineのアブストラクトクラスとユーティリティ関数が定義されている。
- pipelines_common.py:QuantizedSequenceやMelodyのような共通のデータ形式を変更するPipelineが実装されている。
- dag_pipeline.py:Pipeline同士をつなげるPipelineを定義している。これらのパイプラインは、任意の有向非循環グラフ(DAG)に接続することができる。
-
statistics.py:
Statistic
のアブストラクトクラスや実装がされている。
Pipeline
PipelineはPipelineのアブストラクトクラスを継承する必要がある。
Pipelineの入力はオブジェクトまたは入力にラベルをつけたdictionaryである必要がある。
出力はリストか{name: [Class1, ....], name2: [Class2, ...]}のようなリストに名前をつけた形である必要がある。
Pipelineは2つの関数と3つの変数を保持している。
関数
- transform(input_object):入力から出力へ変換する関数
- get_stats():
Statistic
のリストを返す
変数
- input_type:入力の型
- output_type:出力の型
- name:Pipelineでユニークな名前。
Statistic
のnamespaceとしても使われる。
class MyPipeline(Pipeline):
...
print MyPipeline.input_type
> MyType1
print MyPipeline.output_type
> MyType2
print MyPipeline.name
> "MyPipeline"
my_input = MyType1(1, 2, 3)
outputs = MyPipeline.transform(my_input)
print outputs
> [MyType2(1), MyType2(2), MyType2(3)]
for stat in MyPipeline.get_stats():
print str(stat)
> MyPipeline_how_many_ints: 3
> MyPipeline_sum_of_ints: 6
入力がdictionaryである場合は出力もdictionaryである必要があり、output_typeに記した型で返さないといけない。
Type1->Type2が実装ずみの場合、Type1->Type3を実装したい時、Type2->Type3を実装するだけで事足りる。
Pipelineはrun_pipeline_serial
かload_pipeline
を使うことで、データセット上で実行できる。run_pipeline_serial
はディスクに書き出し、load_pipeline
はメモリ上でデータを保持する。出力がprotocol buffersの場合のみrun_pipeline_serial
を使ってTFRecordとして書き出すことができる。出力がdictionaryの場合、keyがデータセットの名前として使われる。
入力データに対する反復処理のための関数も用意されている。 file_iterator
はディレクトリ内のファイルを繰り返し処理し、バイトの形で返す。 tf_record_iteratorはTFRecordsを繰り返し処理し、protocol buffersを返す。
Pipelineの実装
-
Pipeline.__init__
をコンストラクタで呼び出し、input_type
,output_type
,name
を渡す。 -
transform
を実装する。
get_stats
はオーバーライドしない。get_stats
が返す値はtransform
内で_set_stats
を呼び出し、Statistic
を渡すことで指定できる。
class FooExtractor(Pipeline):
def __init__(self):
super(FooExtractor, self).__init__(
input_type=BarType,
output_type=FooType,
name='FooExtractor')
def transform(self, bar_object):
how_many_foo = Counter('how_many_foo')
how_many_features = Counter('how_many_features')
how_many_bar = Counter('how_many_bar', 1)
features = extract_features(bar_object)
foos = []
for feature in features:
if is_foo(feature):
foos.append(make_foo(feature))
how_many_foo.increment()
how_many_features.increment()
self._set_stats([how_many_foo, how_many_features, how_many_bar])
return foos
foo_extractor = FooExtractor()
print FooExtractor.input_type
> BarType
print FooExtractor.output_type
> FooType
print FooExtractor.name
> "FooExtractor"
print foo_extractor.transform(BarType(5))
> [FooType(1), FooType(2)]
for stat in foo_extractor.get_stats():
print str(stat)
> FooExtractor_how_many_foo: 2
> FooExtractor_how_many_features: 5
> FooExtractor_how_many_bar: 1
DAGPipeline
PipelineでAからBへ変換するときに、A->Bに直接変換するよりA->C->Bのように小さなPipelineを複数作り、繋ぎあわせる方をお勧めする。 DAGPipeline
がこれを提供する。
MagentaではNoteSequence->QuantizedSequence->Melody->train,evalに分けるの変換を行っている。Quantizer(コード)でNoteSequence->QuantizedSequenceに変換し、MelodyExtractor(コード)でQuantizedSequence->Melodyに変換し、RandomPartition(コード)で訓練データと評価データに分けている。
quantizer = pipelines_common.Quantizer(steps_per_quarter=4)
melody_extractor = pipelines_common.MelodyExtractor(
min_bars=7, min_unique_pitches=5,
gap_bars=1.0, ignore_polyphonic_notes=False)
encoder_pipeline = EncoderPipeline(config)
partitioner = pipelines_common.RandomPartition(
tf.train.SequenceExample,
['eval_melodies', 'training_melodies'],
[FLAGS.eval_ratio])
(EncoderPipelineについては書いてませんでした)
上記の4つのPipelineをつなぎ合わせて複合Pipelineを作成する。
dag = {quantizer: Input(music_pb2.NoteSequence),
melody_extractor: quantizer,
encoder_pipeline: melody_extractor,
partitioner: encoder_pipeline,
Output(): partitioner}
composite_pipeline = DAGPipeline(dag)
まとめ
途中で量を確認したらかなり長かったので今回はここで終わります。