LoginSignup
0
1

More than 5 years have passed since last update.

Kinesis Streamsのストリームを異なるリージョン間で振り分ける (with Swift3)

Last updated at Posted at 2017-02-28

東京リージョンにKinesisが誕生する前からバージニアでログ収集等に使用していたけど、新たな種類のログが必要となったときにデータ転送などのコスト面の悪さなどから、こちらは東京リージョンのKinesisを使用したいとなったときがありました。
その時は以前から運用しているKinesisはそのままで新たなKinesisは東京に設けてリージョン間でストリームを振り分けることができます。

結論から述べると、以下のようにリージョンを設定したServiceConfigurationを定義しキーの値を設定します。
ここではSwift3による実装例を取り上げます。

let usEast1ServiceConfiguration = AWSServiceConfiguration(region: .usEast1, credentialsProvider: credentialsProvider)
let apNortheast1ServiceConfiguration = AWSServiceConfiguration(region: .apNortheast1, credentialsProvider: credentialsProvider)
AWSKinesisRecorder.register(with: usEast1ServiceConfiguration, forKey: "usEast1")
AWSKinesisRecorder.register(with: apNortheast1ServiceConfiguration, forKey: "apNortheast1")

そして、設定したキーを使用してAWSKinesisRecorderを生成します。
そのレコーダを用いてKinesisにストリームを流します。

let kinesisRecorder: AWSKinesisRecorder =  = AWSKinesisRecorder(forKey: "apNortheast1")
let streamName = "sample-stream1"
kinesisRecorder.saveRecord(data, streamName: streamName)

全体

単一リージョンに流す場合

元々の単一リージョンに流す実装について、最初にCognitoによる認証を行います。

let credentialsProvider:AWSCognitoCredentialsProvider = AWSCognitoCredentialsProvider(regionType: .usEast1, identityPoolId: "us-east-1:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")

let configuration = AWSServiceConfiguration(region: AWSRegionType.usEast1, credentialsProvider: credentialsProvider)
AWSServiceManager.default().defaultServiceConfiguration =
configuration

credentialsProvider.getIdentityId().continue ({ (task) -> Any? in
    if let error = task.error {
        print("Cognito error : \(error)")
    } else {
        print("Cognito id : \(credentialsProvider.identityId)")
    }
    return nil
})

送信するデータをJSONで用意します。

let dic:[String:Any] = ["key1": "name1", "key2": "name"]
let dataDic: [String:[String:Any]] = ["sample_record": dic]
var data:Data = Data()

do {
    try data = JSONSerialization.data(withJSONObject: dataDic, options: JSONSerialization.WritingOptions(rawValue: 0))
} catch {
    print("error = \(error)")
}

用意したJSONデータをKinesisに送信します。

let kinesisRecoder:AWSKinesisRecorder = AWSKinesisRecorder.default()
let streamName1 = "sample-stream"

kinesisRecoder.saveRecord(data, streamName: streamName1).continue ({ (task) -> AnyObject? in
    util.dispatch_async_main ({
        kinesisRecoder.submitAllRecords()
    })
    return nil
})

異なるリージョンに振り分ける場合

単一リージョンの場合に比べて、リージョンを定義したServiceConfigurationの設定を追加し、キーを設定します。

let credentialsProvider:AWSCognitoCredentialsProvider = AWSCognitoCredentialsProvider(regionType: .usEast1, identityPoolId: "us-east-1:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")

let configuration = AWSServiceConfiguration(region: AWSRegionType.usEast1, credentialsProvider: credentialsProvider)
configuration?.timeoutIntervalForResource = 1000
AWSServiceManager.default().defaultServiceConfiguration =
configuration

// Mark: 追加部分
let usEast1ServiceConfiguration = AWSServiceConfiguration(region: .usEast1, credentialsProvider: credentialsProvider)
let apNortheast1ServiceConfiguration = AWSServiceConfiguration(region: .apNortheast1, credentialsProvider: credentialsProvider)
AWSKinesisRecorder.register(with: usEast1ServiceConfiguration, forKey: "usEast1")
AWSKinesisRecorder.register(with: apNortheast1ServiceConfiguration, forKey: "apNortheast1")


credentialsProvider.getIdentityId().continue ({ (task) -> Any? in
    if let error = task.error {
        print("Cognito error : \(error)")
    } else {
        print("Cognito id : \(credentialsProvider.identityId)")
    }
    return nil
})

データの生成は先程と同様で、ストリームの振り分けは先程設定したキーからAWSKinesisRecorderのオブジェクトを生成し、Kinesisのストリームにデータを送信します。

var kinesisRecorder: AWSKinesisRecorder
var streamName: String = ""
let isApNortheast1 = false

// ストリームによる振り分け処理の設定
if isApNortheast1 {
    streamName = "sample-stream1"
    kinesisRecorder = AWSKinesisRecorder(forKey: "apNortheast1")
} else {
    streamName = "sample-stream2"
    kinesisRecorder = AWSKinesisRecorder(forKey: "usEast1")
}

kinesisRecorder.saveRecord(data, streamName: streamName).continue ({ (task) -> AnyObject? in
    util.dispatch_async_main ({
        kinesisRecorder.submitAllRecords()
    })
    return nil
})

参考

http://docs.aws.amazon.com/ja_jp/mobile/sdkforios/developerguide/kinesis-data-stream-processing-for-ios.html
http://docs.aws.amazon.com/AWSiOSSDK/latest/Classes/AWSKinesisRecorder.html

0
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1