使用Python还原s3的GLACIER对象的代码
注意:发起还原之前要先检查是不是有正在进行的还原任务,或者有已经还原的临时对象。
以下是发起任务后调用head-object返回的json。
1 2 3 4 5 6 7 8 9 10
| { "AcceptRanges": "bytes", "Restore": "ongoing-request=\"true\"", "LastModified": "Tue, 26 Nov 2019 08:26:24 GMT", "ContentLength": 735, "ETag": "\"1e0284fa4441896a4a267777caef067a\"", "ContentType": "text/plain", "Metadata": {}, "StorageClass": "GLACIER" }
|
"Restore"
表示有正还原的任务或者存在已经还原的临时对象。值"ongoing-request=\"true\""
表示正在还原对象,当值为"ongoing-request=\"false\", expiry-date=\"Sun, 05 Jan 2020 00:00:00 GMT\""
时表示对象已经还原成功,持续时间到05 Jan 2020 00:00:00 GMT
。
测试脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
import logging import boto3 from botocore.exceptions import ClientError import sys import os import time import botocore
BUCKET_NAME="" OBJECT_NAME=""
def restore_object(bucket_name, object_name, days, retrieval_type='Standard'): request = {'Days': days, 'GlacierJobParameters': {'Tier': retrieval_type}} s3 = boto3.client('s3') try: s3.restore_object(Bucket=bucket_name, Key=object_name, RestoreRequest=request) except ClientError as e: logging.error(e) logging.error(f"NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's, storage class was not GLACIER. {bucket_name} {object_name} ")
return False return True
def head_object(bucket_name, object_name): s3 = boto3.client('s3') response = None try: response = s3.head_object(Bucket=bucket_name, Key=object_name) except ClientError as e: logging.error(e) logging.error(f"NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's, storage class was not GLACIER. {bucket_name} {object_name} ") return None return response
def test_restore_object(): test_bucket_name = BUCKET_NAME test_object_name = OBJECT_NAME days = 1 success = restore_object(test_bucket_name, test_object_name, days, 'Expedited') if success: logging.info(f'Submitted request to restore {test_object_name} ' f'in {test_bucket_name}') logging.info(f'restore object {success}') return success
def test_head_object(): test_bucket_name = BUCKET_NAME test_object_name = OBJECT_NAME success = head_object(test_bucket_name, test_object_name) if success: logging.info(f'Submitted request to restore {test_object_name} ' f'in {test_bucket_name}') if success.get('Restore'): logging.info('Restore {}'.format(success['Restore'])) index = success['Restore'].find('ongoing-request=\"false\"') if -1 == index: logging.info("正在还原...") else: logging.info(success['Restore'][success['Restore'].find('expiry-date='):]) logging.info("还原成功.") else: logging.info('need restore object {} {}'.format(test_bucket_name, test_object_name)) return success
if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') BUCKET_NAME="mybucket" OBJECT_NAME="obj/test.png" test_restore_object() test_head_object()
|