如何使用boto3访问亚马逊S3对象存储

今日头条

1100人已加入

描述

【关于boto3】

Boto3是亚马逊AWS提供的python SDK,最为常用的功能是S3对象存储的访问。作为标准的S3 SDK,除了访问AWS,也可以访问其他兼容S3 API的云存储厂商。

Boto3的项目地址为:https://github.com/boto/boto3.git

Boto3的AWS doc地址为:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource

 

需要指出的是,AWS的文档说明较为混乱,现在给大家提供一个简单的使用说明和范例。

 

【准备】

首先需要安装python,建议使用Python 3.6及以后的版本。

通过pip安装boto3的包即可:

pip install boto3

 

在你的python脚本直接引用boto3即可:

import boto3

 

正式使用boto3之前,你还需要具备云存储厂商的access key和secret_key。先给出一对土星云的测试key,有需要的可以去土星云存储官网注册获取。

param_endpoint='https://s3.local-north-1.saturncloud.com.cn:6666'

param_access_key='3ABC8E3ABC387442B3936F9426B11C1B'

param_secret_key='0428D37B0FDC14805AF0153969C272F3'

 

【创建client】

先初始化一个client对象,我们可以定义一个方法如下:

def setup_client(endpoint, access_key_id, secret_access_key):

return boto3.client(

's3',

aws_access_key_id=access_key_id,

aws_secret_access_key=secret_access_key,

use_ssl=True,

region_name='cn',

endpoint_url=endpoint,

config=Config(s3={"addressing_style": "virtual", "signature_version": 's3v4'}))

再定义一个判断http响应状态的方法:

def is_result_ok(return_obj):

ifcode = return_obj['ResponseMetadata']['HTTPStatusCode']

if ifcode == 200 or ifcode == 204:

return True

else:

return False

 

然后在main函数中调用创建client的方法:

if __name__ == '__main__':

print('Start SCS testing at '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

 

s3=setup_client(param_endpoint, param_access_key, param_secret_key)

 

【创建存储桶】

以当前时间为名创建一个存储桶:

bucket_id=datetime.datetime.now().strftime('test-%Y%m%d-%H%M%S')

 

result=cre_bucket(s3, bucket_id)

 

if is_result_ok(result):

print('bucket ['+bucket_id+'] created ok.')

else:

print('bucket ['+bucket_id+'] created fail.')

print(result)

sys.exit(1)

 

【上传文件】

定义一个上传文件的方法如下:

def put_obj(s3, bucket_id, filename, local_dir):

return s3.put_object(Bucket=bucket_id, Body=open(local_dir+'/'+filename, 'rb'), Key=filename)

选择一个本地文件上传到刚刚创建的存储桶

param_test_local_dir='/tmp/'

filename='test.txt’

result=put_obj(s3, bucket_id, filename, param_test_local_dir)

if is_result_ok(result):

print('put_obj ['+filename+'] ok.')

else:

print('put_obj ['+filename+'] fail.')

print(result)

return

 

【下载文件】

定义一个下载文件的方法:

def get_obj(s3, bucket_id, filename):

return s3.get_object(Bucket=bucket_id, Key=filename)

我们再把刚才上传的文件下载下来

result=get_obj(s3, bucket_id, filename)

 

if is_result_ok(result):

print('get_obj ['+filename+'] ok.')

else:

print('get_obj ['+filename+'] fail.')

print(result)

return

 

【删除文件】

首先定义一个删除S3存储桶文件的方法

def del_obj(s3, bucket_id, filename):

return s3.delete_object(Bucket=bucket_id, Key=filename)

然后调用此方法即可删除刚刚上传到云端的文件:

result=del_obj(s3, bucket_id, filename)

 

if is_result_ok(result):

print('del_obj ['+filename+'] ok.')

else:

print('del_obj ['+filename+'] fail.')

print(result)

return

 

【结语】

上面已经给出了一个简单的使用范例,更多的接口和使用方法,可以查看AWS的文档和云存储厂商的文档。

 

【附 完整代码】

 

############################################################################################################

'''

Random test script for saturn cloud systems.

 

Usage:

1. Change test parameters through modifying param_XXXX in test configurations.

2. Run 'python scs_rand_test.py'

3. For better performance, please set param_test_local_dir to a ramdisk path, such as '/run' in Linux.

 

Requirement:

1. Python 3.6 or higher version.

2. boto3, use 'pip install boto3 --user' to install.

 

by sam 2021-10-06

'''

############################################################################################################

 

 

import time

import datetime

import os

import sys

import subprocess

import random

import numpy as np

import threading

from queue import Queue

 

import boto3

from botocore.config import Config

 

############################################################################################################

'''

Test configurations

'''

############################################################################################################

 

param_endpoint='https://s3.local-north-1.saturncloud.com.cn:6666'

param_access_key='3ABC8E3ABC387442B3936F9426B11C1B'

param_secret_key='0428D37B0FDC14805AF0153969C272F3'

 

param_test_local_dir='/tmp/'

 

param_test_max_blocks=33

param_block_size=256*1024

 

############################################################################################################

'''

Common used API fuctions

'''

############################################################################################################

 

def setup_client(endpoint, access_key_id, secret_access_key):

return boto3.client(

's3',

aws_access_key_id=access_key_id,

aws_secret_access_key=secret_access_key,

use_ssl=True,

region_name='cn',

endpoint_url=endpoint,

config=Config(s3={"addressing_style": "virtual", "signature_version": 's3v4'}))

 

def cre_bucket(s3, bucket_id):

    return s3.create_bucket(Bucket=bucket_id,)

 

def del_bucket(s3, bucket_id):

return s3.delete_bucket(Bucket=bucket_id,)

 

def put_obj(s3, bucket_id, filename, local_dir):

    return s3.put_object(Bucket=bucket_id, Body=open(local_dir+'/'+filename, 'rb'), Key=filename)

 

def get_obj(s3, bucket_id, filename):

return s3.get_object(Bucket=bucket_id, Key=filename)

 

def del_obj(s3, bucket_id, filename):

    return s3.delete_object(Bucket=bucket_id, Key=filename)

 

def is_result_ok(return_obj):

ifcode = return_obj['ResponseMetadata']['HTTPStatusCode']

if ifcode == 200 or ifcode == 204:

return True

else:

return False

 

############################################################################################################

'''

Test applications

'''

############################################################################################################

def create_dummy_file(file_path):

blocks=random.randrange(0, param_test_max_blocks, 1)

last_bytes=random.randrange(1, param_block_size+1, 1)

total_size=blocks*param_block_size+last_bytes

 

file=open(file_path, mode='wb')

file.write(np.random.bytes(total_size))

file.close()

 

def test_task(bucket_id):

s3=setup_client(param_endpoint, param_access_key, param_secret_key)

 

filename='put_obj'

create_dummy_file(param_test_local_dir+'/'+filename)

result=put_obj(s3, bucket_id, filename, param_test_local_dir)

 

if is_result_ok(result):

print('put_obj ['+filename+'] ok.')

else:

print('put_obj ['+filename+'] fail.')

print(result)

return

 

result=get_obj(s3, bucket_id, filename)

 

if is_result_ok(result):

print('get_obj ['+filename+'] ok.')

else:

print('get_obj ['+filename+'] fail.')

print(result)

return

 

filename_new='get_obj'

file=open(param_test_local_dir+'/'+filename_new, mode='wb')

file.write(result['Body'].read())

file.close()

 

cmd_string='cmp '+param_test_local_dir+'/'+filename+' '+param_test_local_dir+'/'+filename_new

cmd_result=subprocess.getoutput(cmd_string)

    

if len(cmd_result)>1:

print('content error at ['+filename+'] and ['+filename_new+']')

return

    

result=del_obj(s3, bucket_id, filename)

 

if is_result_ok(result):

print('del_obj ['+filename+'] ok.')

else:

print('del_obj ['+filename+'] fail.')

print(result)

return

 

cmd_string='rm '+param_test_local_dir+'/'+filename+' '+param_test_local_dir+'/'+filename_new

os.system(cmd_string)

 

 

 

############################################################################################################

'''

Script entrance

'''

############################################################################################################

if __name__ == '__main__':

print('Start SCS random testing at '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

 

s3=setup_client(param_endpoint, param_access_key, param_secret_key)

 

bucket_id=datetime.datetime.now().strftime('test-%Y%m%d-%H%M%S')

 

result=cre_bucket(s3, bucket_id)

 

if is_result_ok(result):

print('bucket ['+bucket_id+'] created ok.')

else:

print('bucket ['+bucket_id+'] created fail.')

print(result)

sys.exit(1)

pass

 

test_task(bucket_id)

 

result=del_bucket(s3, bucket_id)

 

if is_result_ok(result):

print('bucket ['+bucket_id+'] deleted ok.')

else:

print('bucket ['+bucket_id+'] deleted fail.')

print(result)

sys.exit(1)

pass

 

print('End SCS random testing at '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

编辑:fqj

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分