AWS Lambdaを利用してS3かのputイベントをトリガーにredshiftへcopy

· ·

GCP 側で作っていたものを AWS 側で作るということで、色々とハマりながらとりあえず実装していました。 今回は広告の adjust から AWS の S3 に吐き出されたファイルを lambda で S3 put イベントを検知しつつ redshift へ追加という流れです。

  1. 事前に Adjust データを S3 へ吐き出す設定は完了させておく
  2. VPC、subnet、security group の設定と redshift を準備
  3. lambda を準備(イベントトリガーは S3 の特定バケットに対する PUT)
  4. lambda に layer 追加

今回は python3.8 で組んだのですが code はこんな感じ。一旦環境変数とか使わずに生で書いてしまったので、しばらく稼働確認したら直そう…

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import psycopg2

hostName = "Redshiftのホスト名"
databaseName = "RedshiftのDB名"
portNo = "5439"
userName = "Redshiftのユーザー名"
password = "Redshiftのパスワード"

def lambda_handler(event, context):
   file_key_name = event['Records'][0]['s3']['object']['key']

   file = 's3://*bucket-name*/' + file_key_name
   query = """
      COPY *table-name*
      FROM '{}'
      credentials 'aws_iam_role=*role*'
      gzip
      delimiter ','
      csv
      ignoreheader 1
      region 'ap-northeast-1'
   """.format(file)

    conn = psycopg2.connect(
        host=hostName,
        database=databaseName,
        port=portNo,
        user=userName,
        password=password
    )

   cur = conn.cursor()
   cur.execute(query)
   conn.commit()
   cur.close()
   conn.close()

psycopg2 の追加は色々やり方がありそうですが、一番簡単なこちらの記事を参考にした layer 追加にとどめました。

1
arn:aws:lambda:ap-northeast-1:898466741470:layer:psycopg2-py38:1

S3->redshift の COPY と逆の redshift->S3 の UNLOAD はどちらも lambda で実装しました。たぶん scheduled query でも対応可能のような感じだったのですが、ちょっとハマって時間が惜しかったということもあります。

参考にしたサイト 🔗

comments powered by Disqus