1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

コンタクト追跡レコード(CTR)のFirehose~S3~経由のデータ連携 (前編)

Last updated at Posted at 2022-11-02

AmazonConnect_CTR改造.JPG

前提条件

Saleforce API Version : v55.0
serverlessrepo-AmazonConnectSalesforceLambda : 5.19.0
この記事は、2022/10頃に書かれています。

AmazonConnect自体には、コンタクト追跡レコードをFirehoseに流す機能があるけど、連携用のLambdaは提供されてないけど、提供されてるLambdaを流用して、どうにかならないかな?というお試しした話。

提供Lambdaのライセンス条項

Apache License, Version 2.0 とう事で

  • 自己責任で使ってね
  • 商用利用可
  • 修正・配布可
  • 改変した場合には、その旨を表示する必要あり

ほとんど継ぎ接ぎのマージでいけるのでポイントだけ絞って完成形は、自己責任でご用意ください。

S3のPUTトリガー

sfIntervalAgent(全文)

"""
You must have an AWS account to use the Amazon Connect CTI Adapter.
Downloading and/or using the Amazon Connect CTI Adapter is subject to the terms of the AWS Customer Agreement,
AWS Service Terms, and AWS Privacy Notice.

© 2017, Amazon Web Services, Inc. or its affiliates. All rights reserved.

NOTE:  Other license terms may apply to certain, identified software components
contained within or distributed with the Amazon Connect CTI Adapter if such terms are
included in the LibPhoneNumber-js and Salesforce Open CTI. For such identified components,
such other license terms will then apply in lieu of the terms above.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import json, csv, os
import boto3
import urllib.parse
from salesforce import Salesforce
from sf_util import get_arg, parse_date, split_bucket_key

import logging
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ["LOGGING_LEVEL"]))

s3 = boto3.client("s3")
pnamespace = os.environ['SF_ADAPTER_NAMESPACE']
if not pnamespace or pnamespace == '-':
  logger.info("SF_ADAPTER_NAMESPACE is empty")
  pnamespace = ''
else:
  pnamespace = pnamespace + "__"

def lambda_handler(event, context):

  logger.info("Logging Start sfIntervalAgent")
  logger.info("sfIntervalAgent event: %s" % json.dumps(event))

  event_record = event['Records'][0]
  bucket = event_record['s3']['bucket']['name']
  logger.info("bucket: %s" % bucket)
  key = urllib.parse.unquote(event_record['s3']['object']['key'])
  logger.info("key: %s" % key)
  data = s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode()
  logger.info("sfIntervalAgent data: %s" % data)
  sf = Salesforce()


  for record in csv.DictReader(data.split("\n")):
    logger.info("sfIntervalAgent record: %s" % record)
    #sf.create(pnamespace + "AC_AgentPerformance__c", prepare_agent_record(record, event_record['eventTime']))
    agent_record = prepare_agent_record(record, event_record['eventTime'])
    #logger.info("AC_Object_Name__c: %s" % agent_record[pnamespace + 'AC_Object_Name__c'])
    #logger.info("StartInterval__c: %s" % agent_record[pnamespace + 'StartInterval__c'])
    ac_record_id = "%s%s" % (agent_record[pnamespace + 'AC_Object_Name__c'], agent_record[pnamespace + 'StartInterval__c'])
    #logger.info("sfIntervalAgent ac_record_id: %s" % ac_record_id)
    #logger.info("sfIntervalAgent record: %s" % agent_record)
    #logger.info("sfIntervalAgent record: %s" % agent_record)
    sf.update_by_external(pnamespace + "AC_AgentPerformance__c", pnamespace + 'AC_Record_Id__c',ac_record_id, agent_record)

  logger.info("done")

def prepare_agent_record(record_raw, current_date):
  record = {label_parser(k):value_parser(v) for k, v in record_raw.items()}
  #record[pnamespace + 'Type__c'] = "Agent"
  record[pnamespace + 'Created_Date__c'] = current_date
  #record[pnamespace + 'AC_Record_Id__c'] = "%s%s" % (record[pnamespace + 'AC_Object_Name__c'], current_date)
  #record[pnamespace + 'AC_Record_Id__c'] = "%s%s" % (record[pnamespace + 'AC_Object_Name__c'], record[pnamespace + 'StartInterval__c'])
  return record

def label_parser(key):
  if key.lower() == 'average agent interaction and customer hold time':#To Long
    return pnamespace + 'Avg_agent_interaction_and_cust_hold_time__c'

  if key.lower() == "agent":
    return pnamespace + "AC_Object_Name__c"

  return pnamespace + "%s__c" % key.replace(" ", "_")

def value_parser(value):
  return value.replace("%", "") if len(value) > 0 else None

こちらのLambdaからは、先頭~66行目

  logger.info("sfIntervalAgent data: %s" % data)

までが必要な部分です。

プログラム概略としては、S3のPUTトリガーを受け取って、該当のオブジェクトから文字列データとして取得する

Kinesis DataStreamからのリアルタイム連携部分

sfCTRTrigger.py(全文)

"""
You must have an AWS account to use the Amazon Connect CTI Adapter.
Downloading and/or using the Amazon Connect CTI Adapter is subject to the terms of the AWS Customer Agreement,
AWS Service Terms, and AWS Privacy Notice.

© 2017, Amazon Web Services, Inc. or its affiliates. All rights reserved.

NOTE:  Other license terms may apply to certain, identified software components
contained within or distributed with the Amazon Connect CTI Adapter if such terms are
included in the LibPhoneNumber-js and Salesforce Open CTI. For such identified components,
such other license terms will then apply in lieu of the terms above.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import boto3
import json
import os
import logging
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ["LOGGING_LEVEL"]))


def lambda_handler(event, context):
    try:
        logger.info('Event: {}'.format(event))

        invoke_sfExec_async(event, context)
        
    except Exception as e:
        raise e

def invoke_sfExec_async(event, context):

    event_template = {'async' : True, 'record':{}}

    def send_data(data_to_send):
        event_to_send = event_template.copy()
        event_to_send['record'] = data_to_send
        
        if os.environ["POSTCALL_RECORDING_IMPORT_ENABLED"].lower() == 'true' or os.environ["POSTCALL_TRANSCRIBE_ENABLED"].lower() == 'true':
            logger.info('Invoke  EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA')
            boto3.client('lambda').invoke(FunctionName=os.environ["EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA"], InvocationType='Event', Payload=json.dumps(event_to_send))
        
        if os.environ["POSTCALL_CTR_IMPORT_ENABLED"].lower() == 'true':
            logger.info('Invoke  EXECUTE_CTR_IMPORT_LAMBDA')
            boto3.client('lambda').invoke(FunctionName=os.environ["EXECUTE_CTR_IMPORT_LAMBDA"], InvocationType='Event', Payload=json.dumps(event_to_send))


    #for each record in Kinesis records, invoke a new Lambda function to process it async
    for record in event['Records']:
        payload = record['kinesis']['data']
        send_data(payload)

下記の部分、1行データを受け取って、CTRデータ連携用のLambdaを非同期で実行する。
この処理は、無加工で、そのまま活かせます。

    event_template = {'async' : True, 'record':{}}

    def send_data(data_to_send):
        event_to_send = event_template.copy()
        event_to_send['record'] = data_to_send
        
        if os.environ["POSTCALL_RECORDING_IMPORT_ENABLED"].lower() == 'true' or os.environ["POSTCALL_TRANSCRIBE_ENABLED"].lower() == 'true':
            logger.info('Invoke  EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA')
            boto3.client('lambda').invoke(FunctionName=os.environ["EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA"], InvocationType='Event', Payload=json.dumps(event_to_send))
        
        if os.environ["POSTCALL_CTR_IMPORT_ENABLED"].lower() == 'true':
            logger.info('Invoke  EXECUTE_CTR_IMPORT_LAMBDA')
            boto3.client('lambda').invoke(FunctionName=os.environ["EXECUTE_CTR_IMPORT_LAMBDA"], InvocationType='Event', Payload=json.dumps(event_to_send))

この部分が一番手直し(I/F合わせと修正)が必要な肝となる箇所です。

    #for each record in Kinesis records, invoke a new Lambda function to process it async
    for record in event['Records']:
        payload = record['kinesis']['data']
        send_data(payload)

処理的には、KinesisDataStreamから複数行データ(Records)から単一行データを取り出ししてループして連携処理用の関数を実行していますね。

新規で必要になる処理は???

  for recordStr in data.split("\n"):
    if len(recordStr) != 0:
      record = json.loads(recordStr)
      logger.info("sfIntervalCTR record: %s" % json.dumps(record))
      recordB64Str = base64.b64encode(json.dumps(record).encode('utf-8'))
      send_data(recordB64Str.decode('ascii'))

dataには、sfIntervalAgent.py の処理の通り、S3から読み込んだデータ(複数JSON改行区切り)が格納されているので、改行文字で分割してループを回します。
KinesisDataStreamの行データは、Base64エンコードされた状態で格納されてるので、I/Fを一緒にする為に一旦、Base64エンコードをした状態で、send_dataへ処理を引き渡します。

import base64

を追加するのも忘れずに行ってください。

パッケージとデプロイ

上記の『Zipの作り直し』を参考に、デプロイ用のLambdaのZipに

  • sfIntervalAgent.py の、S3のPutトリガーを受け取ってオブジェクトのデータ取得
  • 新規で必要になる処理 データを単一行データでループして、Base64エンコードしてから連携処理実行
  • sfCTRTrigger.py の、コンタクト追跡レコード連携処理の部分

をマージした新規のpyファイルを作成して、Zipの中に含めてデプロイ用のZipを用意します。

前半総括

CTI 連携用のLambdaは比較的コードが短くシンプルなので修正はかけやすそうです。デプロイ用のパッケージを作ったあとは、AWSコンソールでの操作、LambdaにからんだIAM設定、Firehose、AmazonConnectの設定が多くなるので、前編/後編に分割させてください。

(後編は、また後日)

AWSの相談・お困りごとありましたら、、、

AWSの活用方法や、お困りごとの相談、随時、お仕事の受付しております。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?