LoginSignup
0
0

More than 1 year has passed since last update.

2022/10/19 cloudformation codepiplineメモ

Last updated at Posted at 2022-10-19
sanplefordocdbtos3.py
---
#!/bin/env python

import json
import logging
import os
import boto3
import datetime
from bson import json_util
from pymongo import MongoClient
from pymongo.errors import OperationFailure
import urllib.request                                                    

"""
Read data from a DocumentDB collection's change stream and replicate that data to MSK.

Required environment variables:
DOCUMENTDB_URI: The URI of the DocumentDB cluster to stream from.
DOCUMENTDB_SECRET: Secret Name of the credentials for the DocumentDB cluster in Secrets Manager
STATE_COLLECTION: The name of the collection in which to store sync state.
STATE_DB: The name of the database in which to store sync state.
WATCHED_COLLECTION_NAME: The name of the collection to watch for changes.
WATCHED_DB_NAME: The name of the database to watch for changes.
Iterations_per_sync: How many events to process before syncing state.
Documents_per_run: The max for the iterator loop. 
SNS_TOPIC_ARN_ALERT: The topic to send exceptions.   

Kafka target environment variables:
MSK_BOOTSTRAP_SRV: The URIs of the MSK cluster to publish messages. 

SNS target environment variables:
SNS_TOPIC_ARN_EVENT: The topic to send docdb events.    

S3 target environment variables:
BUCKET_NAME: The name of the bucket that will save streamed data. 
BUCKET_PATH (optional): The path of the bucket that will save streamed data. 

ElasticSearch target environment variables:
ELASTICSEARCH_URI: The URI of the Elasticsearch domain where data should be streamed.

Kinesis target environment variables:
KINESIS_STREAM : The Kinesis Stream name to publish DocumentDB events.

SQS target environment variables:
SQS_QUERY_URL: The URL of the Amazon SQS queue to which a message is sent.

"""

db_client = None                        # DocumentDB client - used as source 
s3_client = None                        # S3 client - used as target        
sns_client = boto3.client('sns')        # SNS client - for exception alerting purposes
clientS3 = boto3.resource('s3')         # S3 client - used to get the DocumentDB certificates
                                  
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# The error code returned when data for the requested resume token has been deleted
TOKEN_DATA_DELETED_CODE = 136


def get_credentials():
    """Retrieve credentials from the Secrets Manager service."""
    boto_session = boto3.session.Session()

    try:
        secret_name = os.environ['DOCUMENTDB_SECRET']

        logger.debug('Retrieving secret {} from Secrets Manger.'.format(secret_name))

        secrets_client = boto_session.client(service_name='secretsmanager',
                                             region_name=boto_session.region_name)
        secret_value = secrets_client.get_secret_value(SecretId=secret_name)

        secret = secret_value['SecretString']
        secret_json = json.loads(secret)
        username = secret_json['username']
        password = secret_json['password']

        logger.debug('Secret {} retrieved from Secrets Manger.'.format(secret_name))

        return (username, password)

    except Exception as ex:
        logger.error('Failed to retrieve secret {}'.format(secret_name))
        raise


def get_db_client():
    """Return an authenticated connection to DocumentDB"""
    # Use a global variable so Lambda can reuse the persisted client on future invocations
    global db_client

    if db_client is None:
        logger.debug('Creating new DocumentDB client.')

        try:
            cluster_uri = os.environ['DOCUMENTDB_URI']
            (username, password) = get_credentials()                   
            db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='rds-combined-ca-bundle.pem')
            # force the client to connect
            db_client.admin.command('ismaster')
            db_client["admin"].authenticate(name=username, password=password)

            logger.debug('Successfully created new DocumentDB client.')
        except Exception as ex:
            logger.error('Failed to create new DocumentDB client: {}'.format(ex))
            send_sns_alert(str(ex))
            raise

    return db_client


def get_state_collection_client():
    """Return a DocumentDB client for the collection in which we store processing state."""

    logger.debug('Creating state_collection_client.')
    try:
        db_client = get_db_client()
        state_db_name = os.environ['STATE_DB']
        state_collection_name = os.environ['STATE_COLLECTION']
        state_collection = db_client[state_db_name][state_collection_name]
    except Exception as ex:
        logger.error('Failed to create new state collection client: {}'.format(ex))
        send_sns_alert(str(ex))
        raise

    return state_collection


def get_last_processed_id():
    """Return the resume token corresponding to the last successfully processed change event."""
    last_processed_id = None
    logger.debug('Returning last processed id.')
    try:
        state_collection = get_state_collection_client()
        if "WATCHED_COLLECTION_NAME" in os.environ:
            state_doc = state_collection.find_one({'currentState': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME']), 
                'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'db_level': False})
        else:
            state_doc = state_collection.find_one({'currentState': True, 'db_level': True, 
                'dbWatched': str(os.environ['WATCHED_DB_NAME'])})
           
        if state_doc is not None:
            if 'lastProcessed' in state_doc: 
                last_processed_id = state_doc['lastProcessed']
        else:
            if "WATCHED_COLLECTION_NAME" in os.environ:
                state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']),
                    'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'currentState': True, 'db_level': False})
            else:
                state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'currentState': True, 
                    'db_level': True})

    except Exception as ex:
        logger.error('Failed to return last processed id: {}'.format(ex))
        send_sns_alert(str(ex))
        raise

    return last_processed_id


def store_last_processed_id(resume_token):
    """Store the resume token corresponding to the last successfully processed change event."""

    logger.debug('Storing last processed id.')
    try:
        state_collection = get_state_collection_client()
        if "WATCHED_COLLECTION_NAME" in os.environ:
            state_collection.update_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 
                'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME'])},{'$set': {'lastProcessed': resume_token}})
        else:
            state_collection.update_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'db_level': True, },
                {'$set': {'lastProcessed': resume_token}})

    except Exception as ex:
        logger.error('Failed to store last processed id: {}'.format(ex))
        send_sns_alert(str(ex))
        raise


def send_sns_alert(message):
    """send an SNS alert"""
    try:
        logger.debug('Sending SNS alert.')
        response = sns_client.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN_ALERT'],
            Message=message,
            Subject='Document DB Replication Alarm',
            MessageStructure='default'
        )
    except Exception as ex:
        logger.error('Exception in publishing alert to SNS: {}'.format(ex))
        send_sns_alert(str(ex))
        raise


def getDocDbCertificate():
    """download the current docdb certificate"""
    try:
        logger.debug('Getting DocumentDB certificate from S3.')
        clientS3.Bucket('rds-downloads').download_file('rds-combined-ca-bundle.pem', '/tmp/rds-combined-ca-bundle.pem')
    except Exception as ex:
        logger.error('Exception in publishing message to Kinesis: {}'.format(ex))
        send_sns_alert(str(ex))
        raise


def insertCanary():
    """Inserts a canary event for change stream activation"""
    
    canary_record = None

    try:
        logger.debug('Inserting canary.')
        db_client = get_db_client()
        watched_db = os.environ['WATCHED_DB_NAME']

        if "WATCHED_COLLECTION_NAME" in os.environ:
            watched_collection = os.environ['WATCHED_COLLECTION_NAME']
        else:
            watched_collection = 'canary-collection'

        collection_client = db_client[watched_db][watched_collection]

        canary_record = collection_client.insert_one({ "op_canary": "canary" })
        logger.debug('Canary inserted.')

    except Exception as ex:
        logger.error('Exception in inserting canary: {}'.format(ex))
        send_sns_alert(str(ex))
        raise

    return canary_record


def deleteCanary():
    """Deletes a canary event for change stream activation"""
    
    try:
        logger.debug('Deleting canary.')
        db_client = get_db_client()
        watched_db = os.environ['WATCHED_DB_NAME']

        if "WATCHED_COLLECTION_NAME" in os.environ:
            watched_collection = os.environ['WATCHED_COLLECTION_NAME']
        else:
            watched_collection = 'canary-collection'

        collection_client = db_client[watched_db][watched_collection]
        collection_client.delete_one({ "op_canary": "canary" })
        logger.debug('Canary deleted.')
    
    except Exception as ex:
        logger.error('Exception in deleting canary: {}'.format(ex))
        send_sns_alert(str(ex))
        raise


def put_s3_event(event, database, collection, doc_id):
    """send event to S3"""
    # Use a global variable so Lambda can reuse the persisted client on future invocations
    global s3_client

    if s3_client is None:
        logger.debug('Creating new S3 client.')
        s3_client = boto3.resource('s3')  

    try:
        logger.debug('Publishing message to S3.') #, str(os.environ['BUCKET_PATH'])
        if "BUCKET_PATH" in os.environ:
            s3_client.Object(os.environ['BUCKET_NAME'], str(os.environ['BUCKET_PATH']) + '/' + database + '/' +
                collection + '/' + datetime.datetime.now().strftime('%Y%m%d') + '/' + doc_id).put(Body=event)
        else: 
            s3_client.Object(os.environ['BUCKET_NAME'], database + '/' + collection + '/' + 
                datetime.datetime.now().strftime('%Y%m%d') + '/' + doc_id).put(Body=event)

    except Exception as ex:
        logger.error('Exception in publishing message to S3: {}'.format(ex))
        send_sns_alert(str(ex))
        raise

def lambda_handler(event, context):
    """Read any new events from DocumentDB and apply them to an streaming/datastore endpoint."""
    
    events_processed = 0
    canary_record = None
    watcher = None
    folder = None
    filename = None
    kafka_client = None
    # getDocDbCertificate()

    try:
        
        # DocumentDB watched collection set up
        db_client = get_db_client()
        watched_db = os.environ['WATCHED_DB_NAME']
        if "WATCHED_COLLECTION_NAME" in os.environ:
            watched_collection = os.environ['WATCHED_COLLECTION_NAME']
            watcher = db_client[watched_db][watched_collection]
        else: 
            watcher = db_client[watched_db]
        logger.debug('Watching collection {}'.format(watcher))

        # DocumentDB sync set up
        state_sync_count = int(os.environ['Iterations_per_sync'])
        last_processed_id = get_last_processed_id()
        logger.debug("last_processed_id: {}".format(last_processed_id))

        with watcher.watch(full_document='updateLookup', resume_after=last_processed_id) as change_stream:
            i = 0

            if last_processed_id is None:
                canary_record = insertCanary()
                deleteCanary()

            while change_stream.alive and i < int(os.environ['Documents_per_run']):
            
                i += 1
                change_event = change_stream.try_next()
                logger.debug('Event: {}'.format(change_event))
                
                if last_processed_id is None:
                    if change_event['operationType'] == 'delete':
                        store_last_processed_id(change_stream.resume_token)
                        last_processed_id = change_event['_id']['_data']
                    continue
                
                if change_event is None:
                        break
                else:
                    op_type = change_event['operationType']
                    op_id = change_event['_id']['_data']

                    if op_type in ['insert', 'update']:             
                        doc_body = change_event['fullDocument']
                        doc_id = str(doc_body.pop("_id", None))
                        readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
                        ######## Uncomment the following line if you want to add operation metadata fields to the document event. 
                        doc_body.update({'action':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
                        ######## Uncomment the following line if you want to add db and coll metadata fields to the document event. 
                        doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
                        payload = {'_id':doc_id}
                        payload.update(doc_body)

                        # Append event for S3 
                        if "BUCKET_NAME" in os.environ:
                            put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)
                        
                        logger.debug('Processed event ID {}'.format(op_id))

                    if op_type == 'delete':
                        doc_id = str(change_event['documentKey']['_id'])
                        readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
                        payload = {'_id':doc_id}
                        ######## Uncomment the following line if you want to add operation metadata fields to the document event. 
                        payload.update({'action':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
                        ######## Uncomment the following line if you want to add db and coll metadata fields to the document event. 
                        payload.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})

                        # Append event for S3
                        if "BUCKET_NAME" in os.environ:
                            put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)

                        logger.debug('Processed event ID {}'.format(op_id))

                    events_processed += 1

                    if events_processed >= state_sync_count and "BUCKET_NAME" not in os.environ:
                        # To reduce DocumentDB IO, only persist the stream state every N events
                        store_last_processed_id(change_stream.resume_token)
                        logger.debug('Synced token {} to state collection'.format(change_stream.resume_token))

    except OperationFailure as of:
        send_sns_alert(str(of))
        if of.code == TOKEN_DATA_DELETED_CODE:
            # Data for the last processed ID has been deleted in the change stream,
            # Store the last known good state so our next invocation
            # starts from the most recently available data
            store_last_processed_id(None)
        raise

    except Exception as ex:
        logger.error('Exception in executing replication: {}'.format(ex))
        send_sns_alert(str(ex))
        raise

    else:
        
        if events_processed > 0:

            store_last_processed_id(change_stream.resume_token)
            logger.debug('Synced token {} to state collection'.format(change_stream.resume_token))
            return{
                'statusCode': 200,
                'description': 'Success',
                'detail': json.dumps(str(events_processed)+ ' records processed successfully.')
            }
        else:
            if canary_record is not None:
                return{
                    'statusCode': 202,
                    'description': 'Success',
                    'detail': json.dumps('Canary applied. No records to process.')
                }
            else:
                return{
                    'statusCode': 201,
                    'description': 'Success',
                    'detail': json.dumps('No records to process.')
                }

    finally:

        # Close Kafka client
        if "MSK_BOOTSTRAP_SRV" in os.environ:                                                 
            kafka_client.close()


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0