無気力生活 (ノ ´ω`)ノ ~゜

脱力系エンジニア。てきとーに生きてます。

AuroraからOutfile S3で出力したpartファイルを1つに結合したかった話

# 本記事の環境
ruby: 2.5.0
Rails: 5.1

Amazon Auroraには、実行したクエリの結果をそのままS3に出力する機能があります。

docs.aws.amazon.com

Aurora v1.13くらいから使えるようになった機能で、弊チームではデータ分析基盤に流し込むために使っています。Selectした結果をTSVにしてS3として保存してくれるので、出力さえしてしまえば世の中の大方の分析環境で使えるのがいいですね( ・`ω・´)

実際に使うときは、こんな感じ(Aurora側にS3に吐き出すための権限が付与されているかつ、SQLの実行ユーザーにSELECT INTO S3のGrant設定をする必要があり。詳しくは↑のリンク)

SELECT *
FROM table
WHERE created_at > DATE_ADD(CURRENT_DATE(), interval 1 day)
INTO OUTFILE S3 's3-us-west-2://bucket/dir/key' 
OVERWRITE ON

楽ちん。これだけでS3にファイルをはきだせるので、ちょっとしたデータをサクッと集めたいときに便利そうですね。

この機能を使って全テーブルの全データを一日1回S3に投げつけようとします。Railsであれば対象となるModelに対し、columnsを駆使してSQLを組み立てればいけます。

model = User
query = <<~SQL
SELECT #{model.columns.map(&:name).join(',') }
FROM #{model.table_name}
INTO OUTFILE S3 's3-us-west-2://bucket/dir/#{model.table_name}' 
OVERWRITE ON
SQL

model.connection.execute(query)

確かこんな感じで行けるはず( ・`ω・´)

適切な権限があり、Bucketが作成済みであればこれでS3にファイルが出力されます。17000行、6.2MBくらいのデータで出力に700msほど時間がかかりましたが、そこまで遅くはない印象。

今回の困ったこと

さて、今回の本題はここから。 この方法で、例えばbucket/dir/usersとして出力した場合、実際には以下の名前で出力されます。

bucket/dir/users.part_00000

名前の後ろに勝手にpart_xxxxxがついてしまいます。S3への出力オプションを見ても制御する設定はなかったので、おそらく常にこの形式の名前で吐き出される模様(´・ω・`)

名前を見て想像できる通り、データセットのサイズが大きすぎる場合にファイル分割して出力されます。Amazonのドキュメントに記載されている内容をみると、

Amazon S3 バケットに書き込まれるファイルの数は、SELECT INTO OUTFILE S3 ステートメントで選択したデータの量と Aurora MySQL のファイルサイズのしきい値によって異なります。デフォルトのファイルサイズのしきい値は 6 GB です。ステートメントで選択したデータがファイルサイズのしきい値より少ない場合は、1 つのファイルが作成されます。それ以外の場合は、複数のファイルが作成されます。

とあり、6GBを閾値としてファイル分割するみたいです。

複数ファイルに分かれてしまう。特に事情がなければそのまま使ってもいいんですが、複数ファイルに分割すると分析基盤に取り込む際に考慮しなくてはいけないので、可能な限り1ファイルに固めたいもの。

そこで、マルチパートアップロード

S3には、大きすぎるファイルを転送する際、ファイルを複数に分割して送信しS3側で1つにまとめる『マルチパートアップロード』という機能がサポートされています。今回の1ファイルに固める際はこれを使います。

docs.aws.amazon.com

マルチパートは、基本的に以下の手順でファイル送信 + 結合を実現しているようです。

  1. AWS::S3::Client.create_multipart_uploadで、バケット名と出力ファイル名を渡す。実行結果として1つのMultipartを指すOrderIdが取得できる
  2. AWS::S3::Client.upload_partでファイルを送信する。保存できたらetagという固有のIDを返却する。
  3. ファイルをすべて転送した後、2で取得したetagとpart_numberを送信して、ファイルを結合する

これが基本の流れ。Auroraから吐き出したデータをこれに食わせて1ファイルに結合できればすべて解決できそうな気がします。Railsから試すには、gem aws-sdk-3をGemfileに追加します。

まず、今回はAuroraからS3への出力を行っているため、ファイルが存在しています。2の工程はスキップできそうです。後、3の工程で必要なetagですが、以下のコードで取得できます。

def bucket
  @bucket ||= Aws::S3::Resource.new(
    client: ::Aws::S3::Client.new(region: "us-west-2")
  ).bucket("bucket")
end

# キー名をprefixに指定すると、.part_xxxxxの要素が引っかかる。それのetagとpart_number
etag_and_partnum = bucket.objects(prefix: "dir/users").to_a.map{|obj| {etag: obj.etag, part_number: obj.key.match("\d+$").to_s.to_i})

この取得したetag指定してmultipartを実行します。

shared_options = { bucket: bucket.name, key: "dir/users" }
create_result = bucket.client.create_multipart_upload(shared_options)
# complete_multipart_upload: https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/Client.html#complete_multipart_upload-instance_method
bucket.client.complete_multipart_upload(
    shared_options.merge(
        multipart_upload: { parts: etag_and_partnum },
        upload_id: create_result.upload_id
    )
)

これでいけるかと思いきや、無残に失敗します(´・ω・`)マルチパートのetagじゃないぞ、というお怒りだったはず。

やはり正しく1,2,3の工程を実施する必要はありそうです。upload_partでファイルを送信し直すなんで、なんのためにOutfile S3をやったのかという気持ちになりますね(´・ω・`)

そんな折、こんなQiitaをみつけました。

qiita.com

つまり、upload_part_copyメソッドを用いて、バケットに存在するOutfile S3で出力したファイルをcopy_sourceに指定することで、改めてファイルを送信せずに目的にことが実現できる、ということになります。

最終的に、以下のコードで実現できました。

  def join_multipart_object(s3_path_key)
    # upload_part_copyのcopy_source指定のため、出力キーの一覧を取得しておく
    upload_object_keys = bucket.objects(prefix: s3_path_key).to_a.map(&:key)
    return if upload_object_keys.blank?
    
    shared_options = { bucket: bucket_name, key: s3_path_key }
    crete_result = bucket.client.create_multipart_upload(shared_options)
    begin
      # 一度上げたものを結合することはできないので、コピーを元に生成する
      etag_and_partnum = upload_object_keys.map.with_index(1) do |updated_key, idx|
        upload_result = bucket.client.upload_part_copy(
          shared_options.merge(
            copy_source: "#{bucket.name}/#{updated_key}",
            part_number: idx,
            upload_id: crete_result.upload_id
          )
        )
        {etag: upload_result.copy_part_result.etag, part_number: idx}
      end

      # 分割したファイルを結合する
      bucket.client.complete_multipart_upload(
        shared_options.merge(
          multipart_upload: { parts: etag_and_partnum },
          upload_id: crete_result.upload_id
        )
      )
    rescue
      # 処理失敗の場合は、abortを叩いてマルチパートを終了させる
      bucket.client.abort_multipart_upload(
        shared_options.merge(upload_id: part_struct.upload_id)
      )
      raise $!
    end

    # マルチパートを合成したら、元のファイルを削除して完了
    bucket.delete_objects(
       delete: {
         objects: upload_object_keys.map { |key| {key: key} }
       }
    )
end

def bucket
  @bucket ||= Aws::S3::Resource.new(
    client: ::Aws::S3::Client.new(region: "us-west-2")
  ).bucket("bucket")
end