当前位置:  开发笔记 > 编程语言 > 正文

从S3读取csv并使用AWS Lambda插入MySQL表

如何解决《从S3读取csv并使用AWSLambda插入MySQL表》经验,为你挑选了0个好方法。

我正在尝试自动将csv加载到MySQL表中,当它被收到S3存储桶时.

我的策略是S3在收到指定存储桶中的文件时启动一个事件(我们称之为'bucket-file').这是事件被通知给AWS Lambda函数,该函数将下载并处理将每行插入MySql表的文件(让我们称之为'target_table').

我们必须考虑到RDS在VPC中.

存储桶的当前权限配置为:

{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "PublicReadForGetBucketObjects",
            "Effect": "Allow",
            "Principal": {
                "AWS": "*"
            },
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::bucket-file/*"
        }
    ]
}

我创建了一个角色,其中包含以下策略:附加到AWS Lambda函数的AmazonS3FullAccess和AWSLambdaVPCAccessExecutionRole.

lambda代码是:

from __future__ import print_function
import boto3
import logging
import os
import sys
import uuid
import pymysql
import csv
import rds_config


rds_host  = rds_config.rds_host
name = rds_config.db_username
password = rds_config.db_password
db_name = rds_config.db_name


logger = logging.getLogger()
logger.setLevel(logging.INFO)

try:
    conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
except Exception as e:
    logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
    logger.error(e)
    sys.exit()

logger.info("SUCCESS: Connection to RDS mysql instance succeeded")

s3_client = boto3.client('s3')

def handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key'] 
    download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)

    s3_client.download_file(bucket, key,download_path)

    csv_data = csv.reader(file( download_path))

    with conn.cursor() as cur:
        for idx, row in enumerate(csv_data):

            logger.info(row)
            try:
                cur.execute('INSERT INTO target_table(column1, column2, column3)' \
                                'VALUES("%s", "%s", "%s")'
                                , row)
            except Exception as e:
                logger.error(e)

            if idx % 100 == 0:
                conn.commit()

        conn.commit()

    return 'File loaded into RDS:' + str(download_path)

我一直在测试该功能,S3在上传文件时发送事件,Lambda连接到RDS实例并获取通知.我已经检查过桶名称是'bucket-file',文件名也是正确的.问题是当函数到达s3_client.download_file(bucket, key,download_path)它被卡住的行时,直到达到lamdba到期时间.

看着日志,它说:

[INFO]  2017-01-24T14:36:52.102Z    SUCCESS: Connection to RDS mysql instance succeeded
[INFO]  2017-01-24T14:36:53.282Z    Starting new HTTPS connection (1): bucket-files.s3.amazonaws.com
[INFO]  2017-01-24T14:37:23.223Z    Starting new HTTPS connection (2): bucket-files.s3.amazonaws.com
2017-01-24T14:37:48.684Z Task timed out after 60.00 seconds

我还读到,如果您在VPC中工作,为了访问S3存储桶,您必须创建一个VPC端点,以便为该子网授予对S3的访问权限.我也试过这个解决方案,结果是一样的.

我很欣赏一些想法.

提前致谢!

推荐阅读
手机用户2502851955
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有