Python - 腾讯云云存储cos简单使用
获取cos上传临时凭证pip install -U cos-python-sdk-v5
# -*- coding=utf-8 # appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成 # 1. 设置用户配置, 包括 secretId,secretKey 以及 Region from qcloud_cos import CosConfig from qcloud_cos import CosS3Client import sys import logging logging.basicConfig(level=logging.INFO, stream=sys.stdout) secret_id = ‘COS_SECRETID‘ # 替换为用户的 secretId secret_key = ‘COS_SECRETKEY‘ # 替换为用户的 secretKey region = ‘ap-chengdu‘ # 替换为用户的 Region token = None # 使用临时密钥需要传入 Token,默认为空,可不填 scheme = ‘https‘ # 指定使用 http/https 协议来访问 COS,默认为 https,可不填 config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) # 2. 获取客户端对象 client = CosS3Client(config)
创建桶
response = client.create_bucket( Bucket=‘examplebucket-1250000000‘ )
上传文件
#### 高级上传接口(推荐) response = client.upload_file( Bucket=‘zhoagang-1301531359‘, LocalFilePath=‘local.txt‘, #//代指本地文件路径 Key=‘picture.jpg‘, #//上传到桶之后的文件名 PartSize=1, MAXThread=10, EnableMD5=False ) print(response[‘ETag‘])
python上传实例代码
# -*- coding=utf-8 # appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成 # 1. 设置用户配置, 包括 secretId,secretKey 以及 Region from qcloud_cos import CosConfig from qcloud_cos import CosS3Client import sys secret_id = ‘自己的id‘ # 替换为用户的 secretId secret_key = ‘自己的key‘ # 替换为用户的 secretKey region = ‘ap-chengdu‘ # 替换为用户的 Region config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) # 2. 获取客户端对象 client = CosS3Client(config) #文件上传 response = client.upload_file( Bucket=‘zhoagang-1301531359‘, LocalFilePath=‘code.png‘, #//代指本地文件路径 Key=‘p1.png‘, #//上传到桶之后的文件名 ) print(response[‘ETag‘])
python创建桶的实例代码
# 1. 设置用户配置, 包括 secretId,secretKey 以及 Region from qcloud_cos import CosConfig from qcloud_cos import CosS3Client import sys secret_id = ‘自己的id‘ # 替换为用户的 secretId secret_key = ‘自己的key‘ # 替换为用户的 secretKey region = ‘ap-chengdu‘ # 替换为用户的 Region config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) client = CosS3Client(config) response = client.create_bucket( Bucket=‘examplebucket-1250000000‘, ACL="public-read",# private 私有读私有写 /public-read 公共读 私有写 / public-read-write /公共读/公共写 )
python实例删除桶
from qcloud_cos import CosConfig from qcloud_cos import CosS3Client secret_id = ‘555555‘ # 替换为用户的 secretId secret_key = ‘6666666‘ # 替换为用户的 secretKey region = ‘ap-chengdu‘ # 替换为用户的 Region config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) client = CosS3Client(config) # client.delete_object( # Bucket=‘wangyang-1251317460‘, # Key=‘p1.png‘ # ) objects = { "Quiet": "true", "Object": [ { "Key": "day2牛存果.py" }, { "Key": "小米CC9e.jpg" } ] } client.delete_objects( Bucket=‘wangyang-1251317460‘, Delete=objects )
获取cos上传临时凭证及删除
def credential(bucket, region): """ 获取cos上传临时凭证 """ from sts.sts import Sts config = { # 临时密钥有效时长,单位是秒(30分钟=1800秒) ‘duration_seconds‘: 5, # 固定密钥 id ‘secret_id‘: settings.TENCENT_COS_ID, # 固定密钥 key ‘secret_key‘: settings.TENCENT_COS_KEY, # 换成你的 bucket ‘bucket‘: bucket, # 换成 bucket 所在地区 ‘region‘: region, # 这里改成允许的路径前缀,可以根据自己网站的用户登录态判断允许上传的具体路径 # 例子: a.jpg 或者 a/* 或者 * (使用通配符*存在重大安全风险, 请谨慎评估使用) ‘allow_prefix‘: ‘*‘, # 密钥的权限列表。简单上传和分片需要以下的权限,其他权限列表请看 https://cloud.tencent.com/document/product/436/31923 ‘allow_actions‘: [ # "name/cos:PutObject", # ‘name/cos:PostObject‘, # ‘name/cos:DeleteObject‘, # "name/cos:UploadPart", # "name/cos:UploadPartCopy", # "name/cos:CompleteMultipartUpload", # "name/cos:AbortMultipartUpload", "*", ], } sts = Sts(config) result_dict = sts.get_credential() return result_dict def delete_bucket(bucket, region): """ 删除桶 """ # 删除桶中所有文件 # 删除桶中所有碎片 # 删除桶 config = CosConfig(Region=region, SecretId=settings.TENCENT_COS_ID, SecretKey=settings.TENCENT_COS_KEY) client = CosS3Client(config) try: # 找到文件 & 删除 while True: part_objects = client.list_objects(bucket) # 已经删除完毕,获取不到值 contents = part_objects.get(‘Contents‘) if not contents: break # 批量删除 objects = { "Quiet": "true", "Object": [{‘Key‘: item["Key"]} for item in contents] } client.delete_objects(bucket, objects) if part_objects[‘IsTruncated‘] == "false": break # 找到碎片 & 删除 while True: part_uploads = client.list_multipart_uploads(bucket) uploads = part_uploads.get(‘Upload‘) if not uploads: break for item in uploads: client.abort_multipart_upload(bucket, item[‘Key‘], item[‘UploadId‘]) if part_uploads[‘IsTruncated‘] == "false": break client.delete_bucket(bucket) except CosServiceError as e: pass
相关推荐
86193952 2020-10-27
Mynamezhuang 2020-09-18
充满诗意的联盟 2020-08-23
raksmart0 2020-08-17
yfightfors 2020-08-16
84487600 2020-08-16
qdqht00 2020-07-26
WebVincent 2020-07-21
hzyuhz 2020-07-04
fengchao000 2020-07-04
javaraylu 2020-06-28
baijinswpu 2020-06-28
ZHANGRENXIANG00 2020-06-28
Teamomc 2020-06-28
Catastrophe 2020-06-26
hell0kitty 2020-06-17
我欲疾风前行 2020-06-18