はじめに
Qiita2回目の投稿です。
私は、25新卒で4月からIT企業に就職し、ほとんど知識がない状態から現在勉強に励んでいます。
2ヶ月前に初めてAWSを触り始めました。2ヶ月前はLambda何それ?Python触ったことない💦という状態でした。Lambdaむじい、今回は、Lambdaで自分が学習した内容を記事にしていこうかなと思います。
目次
- Lambdaとは?
- 学習の方法
- 感想
1. Lambdaとは?
この記事に書いてあるとは思うのですが、初心者の自分には難しすぎました。
なので、自分が思うLambdaは
関数をLambdaで定義することにより、なんかいい感じのAPI的なものが作れるもの!
って感じです。あくまで、今の自分の理解度なので間違っていると思います。すみません
2. 学習方法
作成すること(やりたいこと)
データ(辞書型)をこのLambdaの中で定義して、
その情報を取得、更新、削除ができるようなLambdaを構築してきたい
1. Lambdaの関数を作成していきましょう
AWSにログインをして、上の検索でLambdaと入力してLambdaを開きます。
右上のリージョンはどこでもいいのですが、東京リージョンにします。
Lambdaを開いて、ダッシュボードを選択するとこのようなページが出てくると思います。
右上の部分に関数作成ボタンがあるのでそれをクリックします。
するとこのような関数作成画面に遷移して、関数を作成していきます。
- 一から作成を選択する
- 関数名を記載する(なんでもいいですが、今回はtest_lambdaにしました)
- ランタイム 関数を書く言語を選択する(今回はPythonを選択しました)
あとは、そのままにしてスクロールして下の関数作成をクリックしてください
このような画面になると思います。
コードソースと書いてあるエディタにPythonで記載してきます。
実際にLambdaにコードを書いていく
必要なライブラリをインポート
まず最初に必要なライブラリをインポートしていきます。
・jsonを返すようにしたいので、jsonを扱えるライブラリ
・変更などのアクションが起きた時間を記録するためのタイブラリ
import json
from datetime import datetime
Lambdaの開始をするような定義とサンプルデータを定義
def lambda_handler(event, context):
data = {
"users": [
{
"id": 1,
"name": "田中太郎",
"age": 25,
"email": "tanaka@example.com",
"skills": ["Python", "JavaScript"],
"active": True
},
{
"id": 2,
"name": "鈴木次郎",
"age": 30,
"email": "suzuki@example.com",
"skills": ["Java", "C++"],
"active": False
},
{
"id": 3,
"name": "佐藤花子",
"age": 22,
"email": "sato@example.com",
"skills": ["Ruby", "PHP"],
"active": True
},
],
"company": {
"name": "サンプル株式会社",
"founded": 2020,
"employees": 50
},
"metadata": {
"version": 1.0,
"last_updated": "2023-05-01"
}
}
◯引数としてeventとcontextを定義します。
・eventは、このLambda関数に渡された情報が入ります。
後に、Lambda内でテストをするのですが、そのテスト内でこのLambdaに投げるJSONを作成します。
これがeventに入ります。
・contextは、関数の名前やメモリ制限、タイムアウトなどの設定をする時に使います。
◯データを定義
初期値の設定とデータ保持の設定
※下記の記載の位置は上のコードのすぐ下(わからなければ、下に全体のコードをのしているのでそちらで確認してください)
operation = event.get('operation', 'read_all')
result = {}
・Lambdaに渡される情報の中にoperationというキーが存在しており、
その初期値がread_allとしています。(キーが存在しない時もread_all)
・resultという変数の初期値は何も入っていないようにします。
サンプルデータの全てを取得
ここから先は、実際にデータを返してそのLambdaが動くかテストしながら、実装していきます。
try:
if operation == 'read_all':
result = {
"status": "success",
"operation": "read_all",
"data": data,
"message": "All data retrieved successfully"
}
except Exception as e:
result = {
"status": "error",
"operation": operation,
"data": None,
"message": f"エラーが発生しました: {str(e)}"
}
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json; charset=utf-8'
},
'body': json.dumps(result, ensure_ascii=False, indent=2)
}
tryの中に、エラーが発生しそうな処理を記載して、エラーが発生したらexceptでエラーを受け取るようにしています。(例外処理)
もし、Lambdaに送られてきたoperationの値がread_allだったら、
resultの中に、辞書型でsratus, operation, data, messageのキーにそれぞれ値が入ります。
dataは、上で定義したdataです。
そのresultが下記の最後の行のbodyのキーの中にあるresultという値に入ります。
ensure_ascii=Falseは、日本語の可読性を上げるために記載しています。
indent=2については、インデントを指定しています。
ここまでかけたら、デプロイを行います。(ここを読み飛ばしたら、大変です!!)
スクロールして、下に行くとイベントJSONというところがあります。
そこに
{
"operation": "read_all"
}
このように記載して、テストを実行します。
特定のユーザーの読み取り
try:
if operation == 'read_all':
result = {
"status": "success",
"operation": "read_all",
"data": data,
"message": "All data retrieved successfully"
}
#↓ここからが新しく追加するコード
elif operation == 'read_user':
# 特定のユーザーの読み取り
user_id = event.get('user_id', 1)
user = next((u for u in data['users'] if u['id'] == user_id), None)
if user:
result = {
"status": "success",
"operation": "read_user",
"data": user,
"message": f"ユーザーID{user_id}のデータを取得しました"
}
else:
result = {
"status": "error",
"operation": "read_user",
"data": None,
"message": f"ユーザーID{user_id}が見つかりませんでした"
}
#↑ここまでが新しく追加するコード
operationの値がread_userだったら、下記のような処理を実行するように設定しています。
・user_idという変数は、Lambdaに渡すJSONの中に、user_id(キー)が入っていいて、その初期値は1としているコードを記載します。
・userという変数は、data['users']の中の、idが変数のuser_idの数字と一致しているものを探して、あったら、その情報を返して、なかったらNoneを返すようにしています。
ここで、デプロイを行います。(何度も書きますが、ここのデプロイを忘れると、変更が反映されません)
ここで、テストに移動して、
先ほどのJSONコードを下記に書き換えます。
{
"operation": "read_user",
"user_id": 1
}
書き換えてテストを実行します。
このような返答が返ってきたら、成功です。
新しいユーザーの追加
下記コードの場所がわからない場合は、この記事の一番したに全体コードが記載してあるため、そこを参考にして貼り付けてください。
elif operation == 'add_user':
# 新しいユーザーを追加
new_user = event.get('new_user', {})
max_id = max([u['id'] for u in data['users']])
new_user['id'] = max_id + 1
if 'active' not in new_user:
new_user['acitve'] =True
if 'skills' not in new_user:
new_user['skills'] = []
data['users'].append(new_user)
data['company']['employees'] += 1
data['metadata']['last_updated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "add_user",
"data": data,
"new_user": new_user,
"message": f"新しいユーザー(ID: {new_user['id']})を追加しました"
}
operationの値がadd_userだったら、下記のような処理を実行するように設定しています。
max_idは、data['users']の中のidの一番大きな値です。
そして、Lambdaで送られてきたデータactiveというキーがなかったら、Trueにする、
Lambdaで送られてきたデータskillsというキーがなかったら、空の配列が返ってくるようにします。
data['users']の中に送られてきた情報を入れて、新しいuserの情報を追記しています。
そして、companyの中のemployees(従業員の数)を1足します。
さらに、今日のデータもkast_updataというところに入れます。ただし、UTC (協定世界時)になっているため、実際の時間とは、異なります。
ここまで、デプロイを行い、テストのところで、下記のJSONを入力しましょう!
{
"operation": "add_user",
"new_user": {
"name": "山田次郎",
"age": 35,
"email": "yamada@example.com",
"skills": ["React", "Node.js"]
}
}
ユーザー情報の更新
elif operation == 'update_user':
# ==== ユーザー情報を更新 ====
user_id = event.get('user_id')
updates = event.get('updates', {})
user_index = next((i for i, u in enumerate(data['users']) if u['id'] == user_id), None)
if user_index is not None:
# ユーザー情報を更新
for key, value in updates.items():
if key != 'id': # IDは変更不可
data['users'][user_index][key] = value
data['metadata']['last_updated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "update_user",
"updated_user": data['users'][user_index],
"message": f"ユーザーID {user_id} を更新しました"
}
else:
result = {
"status": "error",
"operation": "update_user",
"message": f"ユーザーID {user_id} が見つかりませんでした"
}
enumetate関数については、要素のインデックスと共に取得する関数になります。
この場合は、usersのidを取り出しながら、一致するものを探している感じです。
そして、一致しているものを見つけたら、送られてきた情報に書き換えるという処理をしています。
ここで、デプロイをして、下記のJSONを貼り付けます。
{
"operation": "update_user",
"user_id": 1,
"updates": {
"age": 26,
"email": "new_tanaka@example.com"
}
}
ユーザーの削除
elif operation == 'delete_user':
# ユーザー情報を削除
user_id = event.get('user_id')
user_index = next((i for i, u in enumerate(data['users']) if u['id'] == user_id), None)
if user_index is not None:
deleted_user = data['users'].pop(user_index)
data['company']['employees'] -= 1
data['metadata']['last_ipdated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "delete_user",
"data": data,
"deleted_user": deleted_user,
"message": f"ユーザー(ID: {user_id})を削除しました"
}
else:
result = {
"status": "error",
"operation": "delete_user",
"message": f"ユーザー(ID: {user_id})が見つかりませんでした"
}
下記はテストに移動して、下記のJSONに書き換えます。
{
"operation": "delete_user",
"user_id": 2
}
感想
AWS Lambdaまだ全然使いこなせてないです(知識不足)😭
たくさん触って、学習をしていきたいと思います。
頑張る💪
Lambdaの全体のコード
import json
from datetime import datetime
def lambda_handler(event, context):
data = {
"users": [
{
"id": 1,
"name": "田中太郎",
"age": 25,
"email": "tanaka@example.com",
"skills": ["Python", "JavaScript"],
"active": True
},
{
"id": 2,
"name": "鈴木次郎",
"age": 30,
"email": "suzuki@example.com",
"skills": ["Java", "C++"],
"active": False
},
{
"id": 3,
"name": "佐藤花子",
"age": 22,
"email": "sato@example.com",
"skills": ["Ruby", "PHP"],
"active": True
},
],
"company": {
"name": "サンプル株式会社",
"founded": 2020,
"employees": 50
},
"metadata": {
"version": 1.0,
"last_updated": "2023-05-01"
}
}
# イベントからの操作を決定
operation = event.get('operation', 'read_all')
result = {}
try:
if operation == 'read_all':
result = {
"status": "success",
"operation": "read_all",
"data": data,
"message": "All data retrieved successfully"
}
elif operation == 'read_user':
# 特定のユーザーの読み取り
user_id = event.get('user_id', 1)
user = next((u for u in data['users'] if u['id'] == user_id), None)
if user:
result = {
"status": "success",
"operation": "read_user",
"data": user,
"message": f"ユーザーID{user_id}のデータを取得しました"
}
else:
result = {
"status": "error",
"operation": "read_user",
"data": None,
"message": f"ユーザーID{user_id}が見つかりませんでした"
}
elif operation == 'add_user':
# 新しいユーザーを追加
new_user = event.get('new_user', {})
max_id = max([u['id'] for u in data['users']])
new_user['id'] = max_id + 1
if 'active' not in new_user:
new_user['acitve'] =True
if 'skills' not in new_user:
new_user['skills'] = []
data['users'].append(new_user)
data['company']['employees'] += 1
data['metadata']['last_updated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "add_user",
"data": data,
"new_user": new_user,
"message": f"新しいユーザー(ID: {new_user['id']})を追加しました"
}
elif operation == 'update_user':
# ==== ユーザー情報を更新 ====
user_id = event.get('user_id')
updates = event.get('updates', {})
user_index = next((i for i, u in enumerate(data['users']) if u['id'] == user_id), None)
if user_index is not None:
# ユーザー情報を更新
for key, value in updates.items():
if key != 'id': # IDは変更不可
data['users'][user_index][key] = value
data['metadata']['last_updated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "update_user",
"updated_user": data['users'][user_index],
"message": f"ユーザーID {user_id} を更新しました"
}
else:
result = {
"status": "error",
"operation": "update_user",
"message": f"ユーザーID {user_id} が見つかりませんでした"
}
elif operation == 'delete_user':
# ユーザー情報を削除
user_id = event.get('user_id')
user_index = next((i for i, u in enumerate(data['users']) if u['id'] == user_id), None)
if user_index is not None:
deleted_user = data['users'].pop(user_index)
data['company']['employees'] -= 1
data['metadata']['last_ipdated'] = datetime.now().strftime('%Y-%m-%d')
result = {
"status": "success",
"operation": "delete_user",
"data": data,
"deleted_user": deleted_user,
"message": f"ユーザー(ID: {user_id})を削除しました"
}
else:
result = {
"status": "error",
"operation": "delete_user",
"message": f"ユーザー(ID: {user_id})が見つかりませんでした"
}
except Exception as e:
result = {
"status": "error",
"operation": operation,
"data": None,
"message": f"エラーが発生しました: {str(e)}"
}
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json; charset=utf-8'
},
'body': json.dumps(result, ensure_ascii=False, indent=2)
}
参考文献