LoginSignup
0
1

More than 5 years have passed since last update.

NiFi HTTPプロセッサーでファイルを取得・解凍し、特定パターンのファイルを整形した後でKafkaに流す

Last updated at Posted at 2018-11-27

目的

NiFi HTTPプロセッサーでファイルを取得・解凍し、特定パターンのファイルを整形した後でKafkaに流す

このチュートリアルでPublishKafkaRecord_0_10プロセッサーを使ってみる。
リモートのHTTPサーバーから、ZIPに圧縮されたCVSファイルを取得したあとに、
解凍。
更に特定CSVファイルだけをPublishKafkaRecord_0_10プロセッサーに渡してKafkaに流す。

  • このチュートリアルはNiFi 1.2から実行可能。

環境

  • Mac OS X
  • HDF 3.2
    • Apache NiFi 1.7.0
    • Apache Kafka 1.1.0
    • Schema Registry

PublishKafkaRecord_0_10 (CSV to JSON)

Kafka環境を用意

HDF 3.2をインストール。
インストール手順はこちら

テンプレートファイルをImport

86CC1E15-F71B-46EF-9A32-C5DF41D4B100.png

image.png

Importした後の様子

image.png

GetHTTP:

このプロセッサーで、 MovieLensサイトからZipファイルをダウンロードする。
MovieLensは映画の推奨サービス。今回ダウンロードするデータファイルは (ml-20m.zip) 27,278本の映画に対しての 20,000,263 評価データと465,564 タグデータ。
設定は下記:
image.png

注意事項:

プロセッサーのデフォルト設定では、一回実行終わったら自動的に再実行します。
今回のHTTPデータを連続的に何回ダウンロードしていたらDDOS攻撃になるので、
感覚を0秒から10分間に開けましょう。
image.png
↑「Run Schedule」の設定を10 minsに変更。

ZIPファイルの解凍:

UnpackContent
image.png
image.png

ファイル名

image.png

Kafka topicへのMessage準備

image.png

Kafka へMessageを送る

image.png

テンプレートXML

<?xml version="1.0" ?>
<template encoding-version="1.1">
  <description>This tutorial workflow gets CSV data from the GroupLens website and
uses the PublishKafkaRecord processor to:
-Convert the data into JSON
-Publish the JSON data to the Kafka topic "Movies"</description>
  <groupId>855d9cd4-015d-1000-8dc5-71f1f96a4779</groupId>
  <name>PublishKafkaRecord</name>
  <snippet>
    <connections>
      <id>a21d3e99-68e1-3542-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>af5788a5-23eb-3d39-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <bends>
        <x>1043.3389261227253</x>
        <y>685.9336127468443</y>
      </bends>
      <bends>
        <x>1043.3389261227253</x>
        <y>735.9336127468443</y>
      </bends>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>failure</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>bca31e1c-69fc-3691-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>4004c547-2a1a-321b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>edd5e36a-f6d3-3dfb-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>f100ccb4-13c6-3e5c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>movies</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <controllerServices>
      <id>b8531252-2a7c-3648-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>
            <name>CSV Format</name>
          </value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>
            <name>Value Separator</name>
          </value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>
            <name>Skip Header Line</name>
          </value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>
            <name>Quote Character</name>
          </value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>
            <name>Escape Character</name>
          </value>
        </entry>
        <entry>
          <key>Comment Marker</key>
          <value>
            <name>Comment Marker</name>
          </value>
        </entry>
        <entry>
          <key>Null String</key>
          <value>
            <name>Null String</name>
          </value>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>
            <name>Trim Fields</name>
          </value>
        </entry>
      </descriptors>
      <name>CSVReader</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>custom</value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>,</value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>true</value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>"</value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>\</value>
        </entry>
        <entry>
          <key>Comment Marker</key>
        </entry>
        <entry>
          <key>Null String</key>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>true</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.csv.CSVReader</type>
    </controllerServices>
    <controllerServices>
      <id>bdeb04f8-7ece-385c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-registry-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>movies</key>
          <value>
            <name>movies</name>
          </value>
        </entry>
      </descriptors>
      <name>AvroSchemaRegistry</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>movies</key>
          <value>{
  "type": "record",
  "name": "MoviesRecord",
  "fields" : [
    {"name": "movieId", "type": "long"},
    {"name": "title", "type": ["null", "string"]},
    {"name": "genres", "type": ["null", "string"]}
  ]
}</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
    </controllerServices>
    <controllerServices>
      <id>290d56c9-1e35-3dba-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>Schema Write Strategy</key>
          <value>
            <name>Schema Write Strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>
            <name>Pretty Print JSON</name>
          </value>
        </entry>
      </descriptors>
      <name>JsonRecordSetWriter</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>Schema Write Strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>false</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.json.JsonRecordSetWriter</type>
    </controllerServices>
    <processors>
      <id>c4612b5a-0fe7-367b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>441.2567407123754</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Routing Strategy</key>
            <value>
              <name>Routing Strategy</name>
            </value>
          </entry>
          <entry>
            <key>movies</key>
            <value>
              <name>movies</name>
            </value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>
              <name>ratings</name>
            </value>
          </entry>
          <entry>
            <key>tags</key>
            <value>
              <name>tags</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Routing Strategy</key>
            <value>Route to Property name</value>
          </entry>
          <entry>
            <key>movies</key>
            <value>${filename:equals('movies.csv')}</value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>${filename:equals('ratings.csv')}</value>
          </entry>
          <entry>
            <key>tags</key>
            <value>${filename:equals('tags.csv')}</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>RouteOnAttribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>movies</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>ratings</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>tags</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>unmatched</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
    </processors>
    <processors>
      <id>f4cb8328-1431-3494-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>224.95887663044255</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Packaging Format</key>
            <value>
              <name>Packaging Format</name>
            </value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>
              <name>File Filter</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Packaging Format</key>
            <value>zip</value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>.*</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Unzip</name>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>original</name>
      </relationships>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.UnpackContent</type>
    </processors>
    <processors>
      <id>feca53d8-7bfb-38bd-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-update-attribute-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Delete Attributes Expression</key>
            <value>
              <name>Delete Attributes Expression</name>
            </value>
          </entry>
          <entry>
            <key>Store State</key>
            <value>
              <name>Store State</name>
            </value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
            <value>
              <name>Stateful Variables Initial Value</name>
            </value>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>
              <name>schema.name</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Delete Attributes Expression</key>
          </entry>
          <entry>
            <key>Store State</key>
            <value>Do not store state</value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>movies</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Add Schema Name Attribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
    </processors>
    <processors>
      <id>3f560dd9-2a0c-3090-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>588.3389261227253</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-kafka-0-10-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>bootstrap.servers</key>
            <value>
              <name>bootstrap.servers</name>
            </value>
          </entry>
          <entry>
            <key>topic</key>
            <value>
              <name>topic</name>
            </value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
              <name>record-reader</name>
            </value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
              <name>record-writer</name>
            </value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>
              <name>security.protocol</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
            <value>
              <name>sasl.kerberos.service.name</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
            <value>
              <name>sasl.kerberos.principal</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
            <value>
              <name>sasl.kerberos.keytab</name>
            </value>
          </entry>
          <entry>
            <key>ssl.context.service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>ssl.context.service</name>
            </value>
          </entry>
          <entry>
            <key>acks</key>
            <value>
              <name>acks</name>
            </value>
          </entry>
          <entry>
            <key>message-key-field</key>
            <value>
              <name>message-key-field</name>
            </value>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>
              <name>max.request.size</name>
            </value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>
              <name>ack.wait.time</name>
            </value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>
              <name>max.block.ms</name>
            </value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>
              <name>partitioner.class</name>
            </value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>
              <name>compression.type</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>bootstrap.servers</key>
            <value>localhost:9092</value>
          </entry>
          <entry>
            <key>topic</key>
            <value>Movies</value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>b8531252-2a7c-3648-0000-000000000000</value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>290d56c9-1e35-3dba-0000-000000000000</value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>PLAINTEXT</value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
          </entry>
          <entry>
            <key>ssl.context.service</key>
          </entry>
          <entry>
            <key>acks</key>
            <value>0</value>
          </entry>
          <entry>
            <key>message-key-field</key>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>1 MB</value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>5 secs</value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>5 sec</value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>none</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Publish to "Movies" Topic</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10</type>
    </processors>
    <processors>
      <id>4004c547-2a1a-321b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>0.0</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>URL</key>
            <value>
              <name>URL</name>
            </value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>
              <name>Filename</name>
            </value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>SSL Context Service</name>
            </value>
          </entry>
          <entry>
            <key>Username</key>
            <value>
              <name>Username</name>
            </value>
          </entry>
          <entry>
            <key>Password</key>
            <value>
              <name>Password</name>
            </value>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>
              <name>Connection Timeout</name>
            </value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>
              <name>Data Timeout</name>
            </value>
          </entry>
          <entry>
            <key>User Agent</key>
            <value>
              <name>User Agent</name>
            </value>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
            <value>
              <name>Accept Content-Type</name>
            </value>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>
              <name>Follow Redirects</name>
            </value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>
              <name>redirect-cookie-policy</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Host</key>
            <value>
              <name>Proxy Host</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Port</key>
            <value>
              <name>Proxy Port</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>URL</key>
            <value>http://files.grouplens.org/datasets/movielens/ml-20m.zip</value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>ml-20m.zip</value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
          </entry>
          <entry>
            <key>Username</key>
          </entry>
          <entry>
            <key>Password</key>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>User Agent</key>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>false</value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>default</value>
          </entry>
          <entry>
            <key>Proxy Host</key>
          </entry>
          <entry>
            <key>Proxy Port</key>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>10 mins</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Get Movie Data</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.GetHTTP</type>
    </processors>
  </snippet>
  <timestamp>09/11/2017 11:33:49 EDT</timestamp>
</template>

参考資料

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1