从阿里云 OSS 传输 S3 对象
本教程介绍如何将 阿里云 OSS 中的数据传输到 Amazon S3。
前提条件
您已经完成了Data Transfer Hub解决方案的部署,并将解决方案部署在 俄勒冈州(us-west-2) 区域。更多信息请参考部署解决方案。
步骤1: 为OSS配置凭证
- 打开 Secrets Manager 控制台。
- 点击左侧边栏的密钥。
- 点击存储新的密钥按钮。
- 在密钥类型中,选择其他类型的密钥。
-
在明文选项卡中输入您的阿里云的凭证,该凭证的格式如下:
{ "access_key_id": "<Your Access Key ID>", "secret_access_key": "<Your Access Key Secret>" }
-
点击下一步。
- 输入密钥名称,例如:
dth-oss-credentials
。 - 点击下一步。
- 禁用自动轮换。
- 点击存储 完成创建。
步骤2: 创建OSS传输任务
-
从创建传输任务页面,选择创建新任务,然后选择下一步。
-
在引擎选项页面的引擎下,选择Amazon S3,然后选择下一步。
-
指定传输任务详细信息。
- 在源类型下,选择数据源为 Aliyun OSS。
-
输入存储桶名称,并选择同步整个存储桶或指定前缀的对象或多个指定前缀的对象。
-
设置目标端S3存储桶信息。
-
在引擎设置中,验证信息,并在必要时修改信息。如果要进行增量数据传输,建议将最小容量设置为至少为1的值。
-
在任务调度设置处,选择您的任务调度配置。
- 如果要以固定频率配置定时任务,以实现定时对比两侧的数据差异,请选择Fixed Rate。
- 如果要通过Cron Expression配置定时任务,以实现定时对比两侧的数据差异,请选择Cron Expression。
- 如果只想执行一次数据同步任务,请选择One Time Transfer。
- 如果您想实现实时增量数据同步,请参考 OSS事件配置.
-
在高级选项中,保留默认值。
-
在是否需要数据比对处,选择您的任务配置。
- 如果要跳过数据对比过程,传输所有文件,请选择No。
- 如果只想同步有差异的文件,请选择Yes。
-
在通知邮箱中提供电子邮件地址。
-
选择下一步并查看您的任务参数详细信息。
-
选择创建任务。
任务创建成功后,会出现在任务页面。
图2:任务页面
选择任务 ID进入任务详情页面,然后选择CloudWatch Dashboard监控任务状态。
通过OSS事件触发器进行实时数据传输
如果您想将数据实时地从阿里云OSS迁移到Amazon S3,可以按照以下步骤启用OSS事件触发器。
在您创建任务之后,前往 SQS 控制台 并记下 Queue URL
和 Queue arn
,我们将在后续步骤中使用。
准备您的AWS账户的AK/SK
-
前往 IAM 控制台。
-
在导航窗格中选择策略,然后选择创建一个新的策略(Create Policy)。
-
点击 JSON,并将下面的权限JSON文件输入到策略中。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:SendMessage" ], "Resource": "arn:aws:sqs:us-west-2:xxxxxxxxxxx:DTHS3Stack-S3TransferQueue-1TSF4ESFQEFKJ" } ] }
说明
请替换JSON中您的queue ARN。
-
完成创建策略。
-
在导航窗格中,选择用户,然后选择添加用户(Add User)。
-
将您先前创建的策略关联到该用户上。
-
保存 ACCESS_KEY/SECRET_KEY,以备后面的步骤使用。
准备阿里云中的事件发送函数
-
打开终端并输入以下命令,建议使用docker或linux机器。
mkdir tmp cd tmp pip3 install -t . boto3
-
在同一文件夹中创建
index.py
,然后输入代码。import json import logging import os import boto3 def handler(event, context): logger = logging.getLogger() logger.setLevel('INFO') evt = json.loads(event) if 'events' in evt and len(evt['events']) == 1: evt = evt['events'][0] logger.info('Got event {}'.format(evt['eventName'])) obj = evt['oss']['object'] # logger.info(obj) ak = os.environ['ACCESS_KEY'] sk = os.environ['SECRET_KEY'] queue_url = os.environ['QUEUE_URL'] region_name = os.environ['REGION_NAME'] # minimum info of a message obj_msg = { 'key': obj['key'], 'size': obj['size'] } # start sending the msg sqs = boto3.client('sqs', region_name=region_name, aws_access_key_id=ak, aws_secret_access_key=sk) try: sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(obj_msg) ) except Exception as e: logger.error( 'Unable to send the message to Amazon SQS, Exception:', e) else: logger.warning('Unknown Message '+evt) return 'Done'
-
打包代码(包括boto3)。
zip -r code.zip *
在阿里云上创建函数
-
打开阿里云 函数计算的服务及函数, 点击 Task。
-
点击创建函数。
-
选择 Python3.x 作为 运行时环境变量。
-
选择 上传ZIP包作为代码上传方式。
-
上传前述步骤中创建的
code.zip
。 -
然后点击 新建。
配置函数环境变量
-
点击配置。
-
在函数的环境变量部分,点击 修改配置。
-
然后在环境变量中输入json配置文件,请使用您自己的
ACCESS_KEY
,SECRET_KEY
和QUEUE_URL
。{ "ACCESS_KEY": "XXX", "QUEUE_URL": "https://sqs.us-west-2.amazonaws.com/xxxx/DTHS3Stack-S3TransferQueue-xxxx", "REGION_NAME": "us-west-2", "SECRET_KEY": "XXXXX" }
-
点击OK。
创建触发器
-
在触发器页签中点击创建触发器以创建函数的触发器。
-
选择OSS作为触发器类型,然后选择桶名称。
-
选择触发事件。
oss:ObjectCreated:PutObject oss:ObjectCreated:PostObject oss:ObjectCreated:CopyObject oss:ObjectCreated:CompleteMultipartUpload oss:ObjectCreated:AppendObject
-
点击OK。