Skip to main content

Lambda 函数 - 从S3加载文件保存至DynamoDB

tip

使用Lambda函数,设置trigger,当我们每次将文件上传至S3桶中,触发Lambda函数将文件保存至DynamoDB表中。

设置 trigger

没有额外需要设置的参数,默认即可。

加载CSV文件

注意这里的写入语句不能对数据库的Schema进行修改。

import json
import boto3
import os
import csv
import codecs
import sys

# get a handle on s3
s3 = boto3.resource('s3')
dynamodb=boto3.resource('dynamodb')
table=dynamodb.Table('lendingclub-test')

def lambda_handler(event,context):

# get a handle on the bucket that holds your file
bucket = s3.Bucket('jiacheng-project1')

# get a handle on the object you want (i.e. your file)
obj = bucket.Object(key='LendingClub.csv')

# get the object
response = obj.get()

# read the contents of the file and split it into a list of lines
# for python 3 you need to decode the incoming bytes:
lines = response['Body'].read().decode('utf-8').split()

# # now iterate over those lines
for row in csv.DictReader(lines):
# # here you get a sequence of dicts
# # do whatever you want with each line here
print(row)
# for row in csv.DictReader(codecs.getreader('utf-8')(response['Body'])):
# print(rec_data)
table.put_item(
Item={
# 这里我表的主键是小写id
# 这里的k-v是要传入表的结构
'id': row['ID'],
'credit_policy':row['credit_policy'],
'purpose':row['purpose'],
'int_rate':row['int_rate'],
'installment':row['installment'],
'log_annual_inc':row['log_annual_inc'],
'dti':row['dti'],
'fico':row['fico'],
'days_with_cr_line':row['days_with_cr_line'],
'revol_bal':row['revol_bal'],
'revol_util':row['revol_util'],
'inq_last_6mths':row['inq_last_6mths'],
'delinq_2yrs':row['delinq_2yrs'],
'pub_rec':row['pub_rec']
})

加载Json文件

import json
import boto3
import os
import csv
import codecs
import sys


s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')


def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
csv_file_name = event['Records'][0]['s3']['object']['key']
csv_object = s3_client.get_object(Bucket=bucket,Key=csv_file_name)
csvFileReader = csv_object['Body'].read()
csvDict = json.loads(csvFileReader)
table = dynamodb.Table('lendingclub-test')
table.put_item(Item=csvDict)