Help us understand the problem. What is going on with this article?

NiFi HTTPプロセッサーでファイルを取得・解凍し、特定パターンのファイルを整形した後でKafkaに流す

More than 1 year has passed since last update.

目的

NiFi HTTPプロセッサーでファイルを取得・解凍し、特定パターンのファイルを整形した後でKafkaに流す

このチュートリアルでPublishKafkaRecord_0_10プロセッサーを使ってみる。
リモートのHTTPサーバーから、ZIPに圧縮されたCVSファイルを取得したあとに、
解凍。
更に特定CSVファイルだけをPublishKafkaRecord_0_10プロセッサーに渡してKafkaに流す。

  • このチュートリアルはNiFi 1.2から実行可能。

環境

  • Mac OS X
  • HDF 3.2
    • Apache NiFi 1.7.0
    • Apache Kafka 1.1.0
    • Schema Registry

PublishKafkaRecord_0_10 (CSV to JSON)

Kafka環境を用意

HDF 3.2をインストール。
インストール手順はこちら

テンプレートファイルをImport

86CC1E15-F71B-46EF-9A32-C5DF41D4B100.png

image.png

Importした後の様子

image.png

GetHTTP:

このプロセッサーで、 MovieLensサイトからZipファイルをダウンロードする。
MovieLensは映画の推奨サービス。今回ダウンロードするデータファイルは (ml-20m.zip) 27,278本の映画に対しての 20,000,263 評価データと465,564 タグデータ。
設定は下記:
image.png

注意事項:

プロセッサーのデフォルト設定では、一回実行終わったら自動的に再実行します。
今回のHTTPデータを連続的に何回ダウンロードしていたらDDOS攻撃になるので、
感覚を0秒から10分間に開けましょう。
image.png
↑「Run Schedule」の設定を10 minsに変更。

ZIPファイルの解凍:

UnpackContent
image.png
image.png

ファイル名

image.png

Kafka topicへのMessage準備

image.png

Kafka へMessageを送る

image.png

テンプレートXML

<?xml version="1.0" ?>
<template encoding-version="1.1">
  <description>This tutorial workflow gets CSV data from the GroupLens website and
uses the PublishKafkaRecord processor to:
-Convert the data into JSON
-Publish the JSON data to the Kafka topic "Movies"</description>
  <groupId>855d9cd4-015d-1000-8dc5-71f1f96a4779</groupId>
  <name>PublishKafkaRecord</name>
  <snippet>
    <connections>
      <id>a21d3e99-68e1-3542-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>af5788a5-23eb-3d39-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <bends>
        <x>1043.3389261227253</x>
        <y>685.9336127468443</y>
      </bends>
      <bends>
        <x>1043.3389261227253</x>
        <y>735.9336127468443</y>
      </bends>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>failure</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>bca31e1c-69fc-3691-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>4004c547-2a1a-321b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>edd5e36a-f6d3-3dfb-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>f100ccb4-13c6-3e5c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>movies</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <controllerServices>
      <id>b8531252-2a7c-3648-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>
            <name>CSV Format</name>
          </value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>
            <name>Value Separator</name>
          </value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>
            <name>Skip Header Line</name>
          </value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>
            <name>Quote Character</name>
          </value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>
            <name>Escape Character</name>
          </value>
        </entry>
        <entry>
          <key>Comment Marker</key>
          <value>
            <name>Comment Marker</name>
          </value>
        </entry>
        <entry>
          <key>Null String</key>
          <value>
            <name>Null String</name>
          </value>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>
            <name>Trim Fields</name>
          </value>
        </entry>
      </descriptors>
      <name>CSVReader</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>custom</value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>,</value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>true</value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>"</value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>\</value>
        </entry>
        <entry>
          <key>Comment Marker</key>
        </entry>
        <entry>
          <key>Null String</key>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>true</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.csv.CSVReader</type>
    </controllerServices>
    <controllerServices>
      <id>bdeb04f8-7ece-385c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-registry-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>movies</key>
          <value>
            <name>movies</name>
          </value>
        </entry>
      </descriptors>
      <name>AvroSchemaRegistry</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>movies</key>
          <value>{
  "type": "record",
  "name": "MoviesRecord",
  "fields" : [
    {"name": "movieId", "type": "long"},
    {"name": "title", "type": ["null", "string"]},
    {"name": "genres", "type": ["null", "string"]}
  ]
}</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
    </controllerServices>
    <controllerServices>
      <id>290d56c9-1e35-3dba-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>Schema Write Strategy</key>
          <value>
            <name>Schema Write Strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>
            <name>Pretty Print JSON</name>
          </value>
        </entry>
      </descriptors>
      <name>JsonRecordSetWriter</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>Schema Write Strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>false</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.json.JsonRecordSetWriter</type>
    </controllerServices>
    <processors>
      <id>c4612b5a-0fe7-367b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>441.2567407123754</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Routing Strategy</key>
            <value>
              <name>Routing Strategy</name>
            </value>
          </entry>
          <entry>
            <key>movies</key>
            <value>
              <name>movies</name>
            </value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>
              <name>ratings</name>
            </value>
          </entry>
          <entry>
            <key>tags</key>
            <value>
              <name>tags</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Routing Strategy</key>
            <value>Route to Property name</value>
          </entry>
          <entry>
            <key>movies</key>
            <value>${filename:equals('movies.csv')}</value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>${filename:equals('ratings.csv')}</value>
          </entry>
          <entry>
            <key>tags</key>
            <value>${filename:equals('tags.csv')}</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>RouteOnAttribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>movies</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>ratings</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>tags</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>unmatched</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
    </processors>
    <processors>
      <id>f4cb8328-1431-3494-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>224.95887663044255</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Packaging Format</key>
            <value>
              <name>Packaging Format</name>
            </value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>
              <name>File Filter</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Packaging Format</key>
            <value>zip</value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>.*</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Unzip</name>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>original</name>
      </relationships>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.UnpackContent</type>
    </processors>
    <processors>
      <id>feca53d8-7bfb-38bd-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-update-attribute-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Delete Attributes Expression</key>
            <value>
              <name>Delete Attributes Expression</name>
            </value>
          </entry>
          <entry>
            <key>Store State</key>
            <value>
              <name>Store State</name>
            </value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
            <value>
              <name>Stateful Variables Initial Value</name>
            </value>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>
              <name>schema.name</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Delete Attributes Expression</key>
          </entry>
          <entry>
            <key>Store State</key>
            <value>Do not store state</value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>movies</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Add Schema Name Attribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
    </processors>
    <processors>
      <id>3f560dd9-2a0c-3090-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>588.3389261227253</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-kafka-0-10-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>bootstrap.servers</key>
            <value>
              <name>bootstrap.servers</name>
            </value>
          </entry>
          <entry>
            <key>topic</key>
            <value>
              <name>topic</name>
            </value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
              <name>record-reader</name>
            </value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
              <name>record-writer</name>
            </value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>
              <name>security.protocol</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
            <value>
              <name>sasl.kerberos.service.name</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
            <value>
              <name>sasl.kerberos.principal</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
            <value>
              <name>sasl.kerberos.keytab</name>
            </value>
          </entry>
          <entry>
            <key>ssl.context.service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>ssl.context.service</name>
            </value>
          </entry>
          <entry>
            <key>acks</key>
            <value>
              <name>acks</name>
            </value>
          </entry>
          <entry>
            <key>message-key-field</key>
            <value>
              <name>message-key-field</name>
            </value>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>
              <name>max.request.size</name>
            </value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>
              <name>ack.wait.time</name>
            </value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>
              <name>max.block.ms</name>
            </value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>
              <name>partitioner.class</name>
            </value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>
              <name>compression.type</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>bootstrap.servers</key>
            <value>localhost:9092</value>
          </entry>
          <entry>
            <key>topic</key>
            <value>Movies</value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>b8531252-2a7c-3648-0000-000000000000</value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>290d56c9-1e35-3dba-0000-000000000000</value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>PLAINTEXT</value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
          </entry>
          <entry>
            <key>ssl.context.service</key>
          </entry>
          <entry>
            <key>acks</key>
            <value>0</value>
          </entry>
          <entry>
            <key>message-key-field</key>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>1 MB</value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>5 secs</value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>5 sec</value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>none</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Publish to "Movies" Topic</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10</type>
    </processors>
    <processors>
      <id>4004c547-2a1a-321b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>0.0</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>URL</key>
            <value>
              <name>URL</name>
            </value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>
              <name>Filename</name>
            </value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>SSL Context Service</name>
            </value>
          </entry>
          <entry>
            <key>Username</key>
            <value>
              <name>Username</name>
            </value>
          </entry>
          <entry>
            <key>Password</key>
            <value>
              <name>Password</name>
            </value>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>
              <name>Connection Timeout</name>
            </value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>
              <name>Data Timeout</name>
            </value>
          </entry>
          <entry>
            <key>User Agent</key>
            <value>
              <name>User Agent</name>
            </value>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
            <value>
              <name>Accept Content-Type</name>
            </value>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>
              <name>Follow Redirects</name>
            </value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>
              <name>redirect-cookie-policy</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Host</key>
            <value>
              <name>Proxy Host</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Port</key>
            <value>
              <name>Proxy Port</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>URL</key>
            <value>http://files.grouplens.org/datasets/movielens/ml-20m.zip</value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>ml-20m.zip</value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
          </entry>
          <entry>
            <key>Username</key>
          </entry>
          <entry>
            <key>Password</key>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>User Agent</key>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>false</value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>default</value>
          </entry>
          <entry>
            <key>Proxy Host</key>
          </entry>
          <entry>
            <key>Proxy Port</key>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>10 mins</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Get Movie Data</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.GetHTTP</type>
    </processors>
  </snippet>
  <timestamp>09/11/2017 11:33:49 EDT</timestamp>
</template>

参考資料

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away