Lambda 函数 - 从S3加载文件保存至DynamoDB
tip
使用Lambda函数,设置trigger,当我们每次将文件上传至S3桶中,触发Lambda函数将文件保存至DynamoDB表中。
设置 trigger
没有额外需要设置的参数,默认即可。
加载CSV文件
注意这里的写入语句不能对数据库的Schema进行修改。
import json
import boto3
import os
import csv
import codecs
import sys
# get a handle on s3
s3 = boto3.resource('s3')
dynamodb=boto3.resource('dynamodb')
table=dynamodb.Table('lendingclub-test')
def lambda_handler(event,context):
    
    # get a handle on the bucket that holds your file
    bucket = s3.Bucket('jiacheng-project1')
    
    # get a handle on the object you want (i.e. your file)
    obj = bucket.Object(key='LendingClub.csv')
    
    # get the object
    response = obj.get()
    
    # read the contents of the file and split it into a list of lines
    # for python 3 you need to decode the incoming bytes:
    lines = response['Body'].read().decode('utf-8').split()
    
    # # now iterate over those lines
    for row in csv.DictReader(lines):
    #     # here you get a sequence of dicts
    #     # do whatever you want with each line here
        print(row)
    # for row in csv.DictReader(codecs.getreader('utf-8')(response['Body'])):
        # print(rec_data)
        table.put_item(
			Item={
       # 这里我表的主键是小写id
       # 这里的k-v是要传入表的结构
			'id': row['ID'],
			'credit_policy':row['credit_policy'],
			'purpose':row['purpose'],
			'int_rate':row['int_rate'],
			'installment':row['installment'],
			'log_annual_inc':row['log_annual_inc'],
			'dti':row['dti'],
			'fico':row['fico'],
			'days_with_cr_line':row['days_with_cr_line'],
			'revol_bal':row['revol_bal'],
			'revol_util':row['revol_util'],
			'inq_last_6mths':row['inq_last_6mths'],
			'delinq_2yrs':row['delinq_2yrs'],
			'pub_rec':row['pub_rec']
			})
加载Json文件
import json
import boto3
import os
import csv
import codecs
import sys
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    csv_file_name = event['Records'][0]['s3']['object']['key']
    csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name)
    csvFileReader = csv_object['Body'].read()
    csvDict = json.loads(csvFileReader)
    table = dynamodb.Table('lendingclub-test')
    table.put_item(Item=csvDict)