sanplefordocdbtos3.py
---
#!/bin/env python
import json
import logging
import os
import boto3
import datetime
from bson import json_util
from pymongo import MongoClient
from pymongo.errors import OperationFailure
import urllib.request
"""
Read data from a DocumentDB collection's change stream and replicate that data to MSK.
Required environment variables:
DOCUMENTDB_URI: The URI of the DocumentDB cluster to stream from.
DOCUMENTDB_SECRET: Secret Name of the credentials for the DocumentDB cluster in Secrets Manager
STATE_COLLECTION: The name of the collection in which to store sync state.
STATE_DB: The name of the database in which to store sync state.
WATCHED_COLLECTION_NAME: The name of the collection to watch for changes.
WATCHED_DB_NAME: The name of the database to watch for changes.
Iterations_per_sync: How many events to process before syncing state.
Documents_per_run: The max for the iterator loop.
SNS_TOPIC_ARN_ALERT: The topic to send exceptions.
Kafka target environment variables:
MSK_BOOTSTRAP_SRV: The URIs of the MSK cluster to publish messages.
SNS target environment variables:
SNS_TOPIC_ARN_EVENT: The topic to send docdb events.
S3 target environment variables:
BUCKET_NAME: The name of the bucket that will save streamed data.
BUCKET_PATH (optional): The path of the bucket that will save streamed data.
ElasticSearch target environment variables:
ELASTICSEARCH_URI: The URI of the Elasticsearch domain where data should be streamed.
Kinesis target environment variables:
KINESIS_STREAM : The Kinesis Stream name to publish DocumentDB events.
SQS target environment variables:
SQS_QUERY_URL: The URL of the Amazon SQS queue to which a message is sent.
"""
db_client = None # DocumentDB client - used as source
s3_client = None # S3 client - used as target
sns_client = boto3.client('sns') # SNS client - for exception alerting purposes
clientS3 = boto3.resource('s3') # S3 client - used to get the DocumentDB certificates
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# The error code returned when data for the requested resume token has been deleted
TOKEN_DATA_DELETED_CODE = 136
def get_credentials():
"""Retrieve credentials from the Secrets Manager service."""
boto_session = boto3.session.Session()
try:
secret_name = os.environ['DOCUMENTDB_SECRET']
logger.debug('Retrieving secret {} from Secrets Manger.'.format(secret_name))
secrets_client = boto_session.client(service_name='secretsmanager',
region_name=boto_session.region_name)
secret_value = secrets_client.get_secret_value(SecretId=secret_name)
secret = secret_value['SecretString']
secret_json = json.loads(secret)
username = secret_json['username']
password = secret_json['password']
logger.debug('Secret {} retrieved from Secrets Manger.'.format(secret_name))
return (username, password)
except Exception as ex:
logger.error('Failed to retrieve secret {}'.format(secret_name))
raise
def get_db_client():
"""Return an authenticated connection to DocumentDB"""
# Use a global variable so Lambda can reuse the persisted client on future invocations
global db_client
if db_client is None:
logger.debug('Creating new DocumentDB client.')
try:
cluster_uri = os.environ['DOCUMENTDB_URI']
(username, password) = get_credentials()
db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='rds-combined-ca-bundle.pem')
# force the client to connect
db_client.admin.command('ismaster')
db_client["admin"].authenticate(name=username, password=password)
logger.debug('Successfully created new DocumentDB client.')
except Exception as ex:
logger.error('Failed to create new DocumentDB client: {}'.format(ex))
send_sns_alert(str(ex))
raise
return db_client
def get_state_collection_client():
"""Return a DocumentDB client for the collection in which we store processing state."""
logger.debug('Creating state_collection_client.')
try:
db_client = get_db_client()
state_db_name = os.environ['STATE_DB']
state_collection_name = os.environ['STATE_COLLECTION']
state_collection = db_client[state_db_name][state_collection_name]
except Exception as ex:
logger.error('Failed to create new state collection client: {}'.format(ex))
send_sns_alert(str(ex))
raise
return state_collection
def get_last_processed_id():
"""Return the resume token corresponding to the last successfully processed change event."""
last_processed_id = None
logger.debug('Returning last processed id.')
try:
state_collection = get_state_collection_client()
if "WATCHED_COLLECTION_NAME" in os.environ:
state_doc = state_collection.find_one({'currentState': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME']),
'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'db_level': False})
else:
state_doc = state_collection.find_one({'currentState': True, 'db_level': True,
'dbWatched': str(os.environ['WATCHED_DB_NAME'])})
if state_doc is not None:
if 'lastProcessed' in state_doc:
last_processed_id = state_doc['lastProcessed']
else:
if "WATCHED_COLLECTION_NAME" in os.environ:
state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']),
'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'currentState': True, 'db_level': False})
else:
state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'currentState': True,
'db_level': True})
except Exception as ex:
logger.error('Failed to return last processed id: {}'.format(ex))
send_sns_alert(str(ex))
raise
return last_processed_id
def store_last_processed_id(resume_token):
"""Store the resume token corresponding to the last successfully processed change event."""
logger.debug('Storing last processed id.')
try:
state_collection = get_state_collection_client()
if "WATCHED_COLLECTION_NAME" in os.environ:
state_collection.update_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']),
'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME'])},{'$set': {'lastProcessed': resume_token}})
else:
state_collection.update_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'db_level': True, },
{'$set': {'lastProcessed': resume_token}})
except Exception as ex:
logger.error('Failed to store last processed id: {}'.format(ex))
send_sns_alert(str(ex))
raise
def send_sns_alert(message):
"""send an SNS alert"""
try:
logger.debug('Sending SNS alert.')
response = sns_client.publish(
TopicArn=os.environ['SNS_TOPIC_ARN_ALERT'],
Message=message,
Subject='Document DB Replication Alarm',
MessageStructure='default'
)
except Exception as ex:
logger.error('Exception in publishing alert to SNS: {}'.format(ex))
send_sns_alert(str(ex))
raise
def getDocDbCertificate():
"""download the current docdb certificate"""
try:
logger.debug('Getting DocumentDB certificate from S3.')
clientS3.Bucket('rds-downloads').download_file('rds-combined-ca-bundle.pem', '/tmp/rds-combined-ca-bundle.pem')
except Exception as ex:
logger.error('Exception in publishing message to Kinesis: {}'.format(ex))
send_sns_alert(str(ex))
raise
def insertCanary():
"""Inserts a canary event for change stream activation"""
canary_record = None
try:
logger.debug('Inserting canary.')
db_client = get_db_client()
watched_db = os.environ['WATCHED_DB_NAME']
if "WATCHED_COLLECTION_NAME" in os.environ:
watched_collection = os.environ['WATCHED_COLLECTION_NAME']
else:
watched_collection = 'canary-collection'
collection_client = db_client[watched_db][watched_collection]
canary_record = collection_client.insert_one({ "op_canary": "canary" })
logger.debug('Canary inserted.')
except Exception as ex:
logger.error('Exception in inserting canary: {}'.format(ex))
send_sns_alert(str(ex))
raise
return canary_record
def deleteCanary():
"""Deletes a canary event for change stream activation"""
try:
logger.debug('Deleting canary.')
db_client = get_db_client()
watched_db = os.environ['WATCHED_DB_NAME']
if "WATCHED_COLLECTION_NAME" in os.environ:
watched_collection = os.environ['WATCHED_COLLECTION_NAME']
else:
watched_collection = 'canary-collection'
collection_client = db_client[watched_db][watched_collection]
collection_client.delete_one({ "op_canary": "canary" })
logger.debug('Canary deleted.')
except Exception as ex:
logger.error('Exception in deleting canary: {}'.format(ex))
send_sns_alert(str(ex))
raise
def put_s3_event(event, database, collection, doc_id):
"""send event to S3"""
# Use a global variable so Lambda can reuse the persisted client on future invocations
global s3_client
if s3_client is None:
logger.debug('Creating new S3 client.')
s3_client = boto3.resource('s3')
try:
logger.debug('Publishing message to S3.') #, str(os.environ['BUCKET_PATH'])
if "BUCKET_PATH" in os.environ:
s3_client.Object(os.environ['BUCKET_NAME'], str(os.environ['BUCKET_PATH']) + '/' + database + '/' +
collection + '/' + datetime.datetime.now().strftime('%Y%m%d') + '/' + doc_id).put(Body=event)
else:
s3_client.Object(os.environ['BUCKET_NAME'], database + '/' + collection + '/' +
datetime.datetime.now().strftime('%Y%m%d') + '/' + doc_id).put(Body=event)
except Exception as ex:
logger.error('Exception in publishing message to S3: {}'.format(ex))
send_sns_alert(str(ex))
raise
def lambda_handler(event, context):
"""Read any new events from DocumentDB and apply them to an streaming/datastore endpoint."""
events_processed = 0
canary_record = None
watcher = None
folder = None
filename = None
kafka_client = None
# getDocDbCertificate()
try:
# DocumentDB watched collection set up
db_client = get_db_client()
watched_db = os.environ['WATCHED_DB_NAME']
if "WATCHED_COLLECTION_NAME" in os.environ:
watched_collection = os.environ['WATCHED_COLLECTION_NAME']
watcher = db_client[watched_db][watched_collection]
else:
watcher = db_client[watched_db]
logger.debug('Watching collection {}'.format(watcher))
# DocumentDB sync set up
state_sync_count = int(os.environ['Iterations_per_sync'])
last_processed_id = get_last_processed_id()
logger.debug("last_processed_id: {}".format(last_processed_id))
with watcher.watch(full_document='updateLookup', resume_after=last_processed_id) as change_stream:
i = 0
if last_processed_id is None:
canary_record = insertCanary()
deleteCanary()
while change_stream.alive and i < int(os.environ['Documents_per_run']):
i += 1
change_event = change_stream.try_next()
logger.debug('Event: {}'.format(change_event))
if last_processed_id is None:
if change_event['operationType'] == 'delete':
store_last_processed_id(change_stream.resume_token)
last_processed_id = change_event['_id']['_data']
continue
if change_event is None:
break
else:
op_type = change_event['operationType']
op_id = change_event['_id']['_data']
if op_type in ['insert', 'update']:
doc_body = change_event['fullDocument']
doc_id = str(doc_body.pop("_id", None))
readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
######## Uncomment the following line if you want to add operation metadata fields to the document event.
doc_body.update({'action':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
######## Uncomment the following line if you want to add db and coll metadata fields to the document event.
doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
payload = {'_id':doc_id}
payload.update(doc_body)
# Append event for S3
if "BUCKET_NAME" in os.environ:
put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)
logger.debug('Processed event ID {}'.format(op_id))
if op_type == 'delete':
doc_id = str(change_event['documentKey']['_id'])
readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()
payload = {'_id':doc_id}
######## Uncomment the following line if you want to add operation metadata fields to the document event.
payload.update({'action':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})
######## Uncomment the following line if you want to add db and coll metadata fields to the document event.
payload.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})
# Append event for S3
if "BUCKET_NAME" in os.environ:
put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)
logger.debug('Processed event ID {}'.format(op_id))
events_processed += 1
if events_processed >= state_sync_count and "BUCKET_NAME" not in os.environ:
# To reduce DocumentDB IO, only persist the stream state every N events
store_last_processed_id(change_stream.resume_token)
logger.debug('Synced token {} to state collection'.format(change_stream.resume_token))
except OperationFailure as of:
send_sns_alert(str(of))
if of.code == TOKEN_DATA_DELETED_CODE:
# Data for the last processed ID has been deleted in the change stream,
# Store the last known good state so our next invocation
# starts from the most recently available data
store_last_processed_id(None)
raise
except Exception as ex:
logger.error('Exception in executing replication: {}'.format(ex))
send_sns_alert(str(ex))
raise
else:
if events_processed > 0:
store_last_processed_id(change_stream.resume_token)
logger.debug('Synced token {} to state collection'.format(change_stream.resume_token))
return{
'statusCode': 200,
'description': 'Success',
'detail': json.dumps(str(events_processed)+ ' records processed successfully.')
}
else:
if canary_record is not None:
return{
'statusCode': 202,
'description': 'Success',
'detail': json.dumps('Canary applied. No records to process.')
}
else:
return{
'statusCode': 201,
'description': 'Success',
'detail': json.dumps('No records to process.')
}
finally:
# Close Kafka client
if "MSK_BOOTSTRAP_SRV" in os.environ:
kafka_client.close()