AWS
以下パイソンのコードはAmazonのQで生成していますが、
全て動作を確認済みです。
AWS CLI
https://awscli.amazonaws.com/AWSCLIV2.msi
を導入してコマンドプロンプトから使用できるようにする。
API接続に必要な情報のの取得箇所
「コンソール右上アカウント名」→「セキュリティ認証情報」→「アクセスキーを作成」
※一度しか表示されない為、控えておく
※安全の為に毎回CMDを開く度にsetで設定している。開き直すと消えている。
python -m pip install boto3
python -m pip install aws-sam-cli
set AWS_ACCESS_KEY_ID=アクセスキー
set AWS_SECRET_ACCESS_KEY=シークレットアクセスキー
パイソンから接続確認
test.py
import boto3
from botocore.exceptions import ClientError
def verify_aws_connection():
try:
# STSクライアントを作成
sts_client = boto3.client('sts')
# get_caller_identity()を呼び出し
response = sts_client.get_caller_identity()
print("AWS接続成功!")
print(f"アカウントID: {response['Account']}")
print(f"ARN: {response['Arn']}")
print(f"ユーザーID: {response['UserId']}")
return response
except ClientError as e:
print(f"AWS接続エラー: {e}")
return None
# 関数を実行
verify_aws_connection()
S3のバケットを作成
中国に居るのでVPNをつけないと接続できなかった。
test.py
import boto3
from botocore.exceptions import ClientError
def create_s3_bucket(bucket_name, region=None):
"""
指定された地域にS3バケットを作成
:param bucket_name: 作成するバケット名
:param region: バケットを作成する地域(例:'ap-northeast-1')
:return: 作成成功時はTrue、失敗時はFalse
"""
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3', region_name=region)
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=location)
except ClientError as e:
print(f"エラーが発生しました: {e}")
return False
return True
# 使用例
bucket_name = '**************' # 世界で唯一の名前に変更
region = 'ap-northeast-1' # 東京
if create_s3_bucket(bucket_name, region):
print(f"バケット {bucket_name} が {region} に正常に作成されました")
else:
print("バケットの作成に失敗しました")
作成したバケットを確認
test.py
import boto3
from botocore.exceptions import ClientError
def list_my_buckets():
"""
自分のS3バケット一覧を表示
"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
print("あなたのS3バケット一覧:")
for bucket in response['Buckets']:
print(f" - {bucket['Name']} (作成日: {bucket['CreationDate']})")
except ClientError as e:
print(f"エラーが発生しました: {e}")
# バケット一覧を確認
list_my_buckets()
作成したバケットへファイルを上げる
※testfile.txtはtest.pyと同じフォルダ内にある。
test.py
import boto3
import logging
from botocore.exceptions import ClientError
# ログ設定
logging.basicConfig(level=logging.INFO)
def upload_file_to_s3(file_name, bucket, object_name=None):
"""
S3バケットにファイルをアップロード
:param file_name: アップロードするファイルのパス
:param bucket: アップロード先のバケット名
:param object_name: S3でのオブジェクト名(指定しない場合はfile_nameを使用)
:return: アップロード成功時はTrue、失敗時はFalse
"""
# S3のオブジェクト名が指定されていない場合、ファイル名を使用
if object_name is None:
object_name = file_name
try:
s3_client = boto3.client('s3')
s3_client.upload_file(file_name, bucket, object_name)
print(f"✓ ファイル {file_name} を {bucket}/{object_name} にアップロードしました")
return True
except ClientError as e:
print(f"✗ アップロードエラー: {e}")
return False
except FileNotFoundError:
print(f"✗ ファイルが見つかりません: {file_name}")
return False
# 使用例
bucket_name = '**************' # 自分バケット名に変更
file_to_upload = 'testfile.txt' # アップロードしたいファイル
s3_object_name = 'uploaded-files/testfile.txt' # S3での保存先パス
upload_file_to_s3(file_to_upload, bucket_name, s3_object_name)
バケット内のファイルを表示
test.py
import boto3
from botocore.exceptions import ClientError
def list_bucket_files(bucket_name):
"""
バケット内のファイル一覧を表示
"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
print(f"\n=== {bucket_name} 内のファイル一覧 ===")
for obj in response['Contents']:
print(f"📄 {obj['Key']} ({obj['Size']} bytes)")
return [obj['Key'] for obj in response['Contents']]
else:
print(f"{bucket_name} は空です")
return []
except ClientError as e:
print(f"✗ エラー: {e}")
return []
def display_file_content(bucket_name, file_key):
"""
S3のファイル内容を表示
"""
try:
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
# ファイル内容を読み取り(テキストファイルの場合)
file_content = response['Body'].read().decode('utf-8')
print(f"\n=== {file_key} の内容 ===")
print(file_content)
print("=" * 50)
except ClientError as e:
print(f"✗ ファイル読み取りエラー: {e}")
except UnicodeDecodeError:
print(f"✗ {file_key} はテキストファイルではありません(バイナリファイル)")
# 使用例
bucket_name = '**************' # 自分のバケット名に変更
# 1. ファイル一覧を表示
files = list_bucket_files(bucket_name)
# 2. 最初のファイルの内容を表示(ファイルがある場合)
if files:
display_file_content(bucket_name, files[0])
Lambda関数を試す
検索から「Lambda」と検索
関数の作成:一から作成
関数名:HelloWorldFunction
ランタイム:Python 3.13
アーキテクチャ:x86_64
※コンソール上部で設定してある地域とboto3.clientの引数で設定している
地域が一致しないと動かなかった。
lambda_function.py
import json
def lambda_handler(event, context):
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
test.py
import boto3
import json
from botocore.exceptions import ClientError
def invoke_lambda_function():
"""
us-east-1リージョンのLambda関数を呼び出す
"""
try:
# us-east-1リージョンでLambdaクライアントを作成
lambda_client = boto3.client('lambda', region_name='us-east-1')
# あなたのLambda関数名(実際の関数名に変更してください)
function_name = 'HelloWorldFunction'
print(f"Lambda関数 '{function_name}' を呼び出し中...")
# Lambda関数を呼び出し
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType='RequestResponse', # 同期呼び出し
Payload=json.dumps({}) # 空のデータ
)
# レスポンスをチェック
if response['StatusCode'] == 200:
# 成功した場合
result = json.loads(response['Payload'].read().decode('utf-8'))
C:\Users\oe\awstest>python test.py
Lambda関数 'HelloWorldFunction' を呼び出し中...
✓ Lambda関数の実行が成功しました!
ステータスコード: 200
メッセージ: Hello from Lambda!
C:\Users\oe\awstest>