Athenaのクエリの実行速度は、ファイルの数やサイズ、またそのフォーマット・圧縮の有無によって大きく影響を受ける。この検証では、ファイル形式が実際にAthenaのパフォーマンスにどのような影響を与えるのかを調査した。
なお、今回はファイル数とサイズの関係に着目しての検証であり、集計対象に対する最適化を目的としていない。そのため、Athenaのクエリのパターンや、ファイルフォーマットが与える影響については、最小限のみとしている。また同様の理由から、パーティションなどのAthenaの機能に関する検証の対象外とした。
簡易的な検証であり、データ構造や分析目的によって結果は大きく異なってくる。Athenaの性能傾向全般に通じるものでないことを留意すること。
データ分析基盤を構築する上では、データレイクをどのように作るかが重要になる。AthenaであればデータをSQLで扱えるようになるため、ETLの構築負荷を軽減できる可能性がある。そこで、データレイクのETL処理の一部にAthenaを組み込む事を前提に検証した。
Athenaの課金体系としては、読み込んだデータ量に対しての従量課金が中心となる。そのため、たとえ処理時間が長かったとしても、Athena自体のコストとしては変わらない。一方で、Athenaを呼び出す元になるコンピュートエンジン(GlueやLambda、コンテナ)は実行時間での課金である。結果としてAthenaの処理時間が長くなると、全体としてのコストが大きくなる。そのため、Athenaの処理時間を最小化するためのパターンの考察する必要がある。
以下の表は、異なるファイル数、サイズ、圧縮形式を用いた時のAthenaのクエリ実行時間を示している。
ファイル数(※) | ファイルサイズ(※) | 合計サイズ(※) | CSV(圧縮なし) | CSV(GZIP圧縮) | Parquet形式(Snappy圧縮) |
---|---|---|---|---|---|
1件 | 10GB | 10GB | 3.88秒 | 247.88秒 | エラー発生 |
10件 | 1GB | 10GB | 3.87秒 | 29.27秒 | 1.85秒 |
100件 | 100MB | 10GB | 3.57秒 | 4.48秒 | 1.85秒 |
1,000件 | 10MB | 10GB | 3.36秒 | 3.26秒 | 2.24秒 |
10,000件 | 1MB | 10GB | 4.07秒 | 4.17秒 | 2.76秒 |
100,000件 | 100KB | 10GB | 11.51秒 | 12.62秒 | 11.68秒 |
1,000,000件 | 10KB | 10GB | 92.95秒 | 91.38秒 | 76.35秒 |
10,000,000件 | 1KB | 10GB | 90.17秒 | 900秒超 | 900秒超 |
※ 検証データは、およそ10GBのCSVファイルを元に作成している。そのファイルを、1GB、100MBなど目的のサイズになるように行ごとに分割した。 そのため、合計のレコード数はどのバリエーションでも同じものとなる。 またファイルサイズはCSV形式のものであり、GZIP・Snappyで圧縮されたものは、それより小さくなっている。
※ 測定結果については、データの内容や集計方法によって大きく変わるものであり、常に同様の結果になるとは限らないことに留意すること。
検証内容および時期
ファイルをS3に配置したデータに対して、EC2上のPythonスクリプトからAthenaのクエリを実行する。パターンごとにそれぞれ10回づつクエリを実行し、平均タイムを結果として採用した。
なお実施時期は2023年9月下旬であり、今後のAthenaのエンジンの改良により、性能傾向が変わる可能性があることに留意すること。
データ構造
検証データは、カンマ区切りのCSVファイルである。
データはFakerというPythonのライブラリで作成し、項目としては、名前、メールアドレス、都市名、ランダムの数字(4桁)、ランダムの文字列(500バイト)により構成されている。
実行したクエリ
検証として、下記のクエリを実行した。
列に対しての集計となるため、列志向で保存するParquetは有利であり、行志向のCSV形式は不向きという前提がある。
1.サイズ問題
・ファイルが小さすぎると、パフォーマンスは劣化する。全ての形式で共通するため、ファイル転送待ちによる処理のアイドルが発生しているためと断定できる。
・ファイルサイズが大きい場合は、ファイルの形式によって結果が大きく異なった。これは、AthenaのエンジンであるPrestroの処理の影響を受けていると推測される。
・分散処理を行うため、分析対象のデータは適度な大きさで複数ある方が効率が高い。大きすぎるファイルの場合は、Athena側で分割していると推測される。
・CSVのGZIP圧縮の場合、分割処理ができずに単一処理となっている可能性が高い。
・CSVやParquetの場合は、大きなファイルの場合はAthena側で分割して分散処理されている可能性が高く、処理性能の差は小さい。
・Parquetについては処理可能なサイズ上限があるようで、10GBのデータではエラーが発生した。
2.圧縮の影響
・CSVデータをGZIPで圧縮することで、Athena単体での処理性能の向上効果は見えなかった。
・Parquet形式のSnappy圧縮は、一般的にCSVよりも効果的であり、特にファイルサイズが1GBまたは100MBの場合、明らかに高速である。
3.最適な設定
・この検証に基づくと、ファイルサイズが1GBまたは100MBの範囲で、Parquet形式のSnappy圧縮を使用するのが最も効果的であることが示されている。
4.S3の料金
・Athenaの性能以外の影響として、サイズが小さい大量のファイルを検索すると、S3のAPIリクエストの料金が課題になる。100MBのデータ100件のリクエストのAPI利用料に対して、1KBのデータ1,000万件へのリクエストは、10万倍のAPIリクエスト料になる。
Athenaのパフォーマンスを最適化するには、データのサイズ、ファイルの数、および圧縮形式を適切に選択することが重要である。この検証の結果により、Athenaを利用する際は、ファイルを予め100MB前後にまとめておくことが何よりも重要であることがわかった。そのうえで、用途に応じてファイルフォーマット、圧縮形式の検討が必要である。可能なかぎりParquet形式でSnappy圧縮するのを前提とすると良いが、データソース側の開発負荷を考えて選択すべき。
Athenaと離れたところで考えても、サイズに大量の小さなファイルを転送するのは著しく転送効率が悪い。Athenaの処理の他に、ETLの転送処理の影響も大きい。データレイクにデータを格納する際に、最初からある程度のサイズにまとめておくことが、処理効率・コスト観点からも重要である。
今回の検証に用いた各種スクリプトは、処理要件をChatGPTに伝え生成されたコードをベースに行った。なおコードの安全性について保証はしていないため、留意すること。
検証データ生成スクリプト
CSVデータ作成スクリプト
:generate_data.py
ーーーーーーーーーーーーーーーーーーーーーーーー
import csv
import os
from faker import Faker
import argparse
def generate_data(target_folder, file_size, file_count):
fake = Faker()
header = ["name", "email", "address", "random_number", "random_string"]
# フォルダが存在しない場合は作成
if not os.path.exists(target_folder):
os.makedirs(target_folder)
for i in range(file_count):
filename = os.path.join(target_folder, f"data_{i}.csv")
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header)
while os.path.getsize(filename) < file_size:
random_number = fake.random_int(min=1000, max=9999) # 1000から9999の間のランダムな整数を生成
random_string = fake.pystr(min_chars=500, max_chars=500) # 500文字のランダムな文字列を生成
row = [fake.name(), fake.email(), fake.address().replace('\n', ' '), random_number, random_string]
writer.writerow(row)
print(f"Generated {filename} with size {os.path.getsize(filename)} bytes")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate dummy CSV data using Faker")
parser.add_argument("target_folder", help="Folder to store generated files")
parser.add_argument("file_size", type=int, help="Target size of each file in bytes")
parser.add_argument("file_count", type=int, help="Number of files to generate")
args = parser.parse_args()
generate_data(args.target_folder, args.file_size, args.file_count)
ーーーーーーーーーーーーーーーー
CSVデータを任意の数で分割するスクリプト
ーーーーーーーーーーーーーーーー
:split_csv.py
import csv
import os
import sys
def split_csv(input_file, output_folder, number_of_files):
# 現在のスクリプトの実行場所を取得
base_path = os.path.dirname(os.path.abspath(__file__))
# 実行場所からの相対パスに基づいてフルパスを作成
input_file_path = os.path.join(base_path, input_file)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 全行数を取得
with open(input_file_path, 'r') as f:
total_rows = sum(1 for row in f) - 1 # ヘッダー分を除く
rows_per_file = total_rows // number_of_files
with open(input_file_path, 'r') as f:
reader = csv.reader(f)
headers = next(reader)
current_file_number = 1
current_row = 0
current_output = open(os.path.join(output_folder, f"output_{current_file_number}.csv"), 'w', newline='')
writer = csv.writer(current_output)
writer.writerow(headers)
for row in reader:
if current_row > rows_per_file and current_file_number < number_of_files:
current_output.close()
current_file_number += 1
current_row = 0
current_output = open(os.path.join(output_folder, f"output_{current_file_number}.csv"), 'w', newline='')
writer = csv.writer(current_output)
writer.writerow(headers)
writer.writerow(row)
current_row += 1
current_output.close()
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: python split_csv.py <input_file> <output_folder> <number_of_files>")
sys.exit(1)
input_file = sys.argv[1]
output_folder = sys.argv[2]
number_of_files = int(sys.argv[3])
split_csv(input_file, output_folder, number_of_files)
ーーーーーーーーーーーーーーーー
CSVデータをParquet形式に変換してSnappy圧縮するスクリプト
ーーーーーーーーーーーーーーーー
:split_csv.py
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
def convert_csv_to_parquet(csv_filepath, output_folder, chunk_size=50000):
parquet_filename = os.path.basename(csv_filepath).replace(".csv", ".parquet")
parquet_filepath = os.path.join(output_folder, parquet_filename)
chunk_iter = pd.read_csv(csv_filepath, chunksize=chunk_size, quotechar='"')
temp_files = []
for i, chunk in enumerate(chunk_iter):
temp_file = os.path.join(output_folder, f"temp_{i}_{parquet_filename}")
chunk.to_parquet(temp_file, index=False, compression='snappy', engine='pyarrow')
temp_files.append(temp_file)
dfs = [pd.read_parquet(temp_file, engine='pyarrow') for temp_file in temp_files]
pd.concat(dfs).to_parquet(parquet_filepath, index=False, compression='snappy', engine='pyarrow')
for temp_file in temp_files:
os.remove(temp_file)
print(f"Converted {csv_filepath} to {parquet_filepath}")
def csv_to_parquet_in_chunks(input_folder, output_folder, chunk_size=50000, max_workers=5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for filename in os.listdir(input_folder):
if filename.endswith(".csv"):
csv_filepath = os.path.join(input_folder, filename)
executor.submit(convert_csv_to_parquet, csv_filepath, output_folder, chunk_size)
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: python script_name.py <path_to_input_folder> <path_to_output_folder>")
sys.exit(1)
input_path = sys.argv[1]
output_path = sys.argv[2]
if not os.path.exists(output_path):
os.makedirs(output_path)
csv_to_parquet_in_chunks(input_path, output_path)
ーーーーーーーーーーーーーーーー
CSVデータをGZIP圧縮するスクリプト
ーーーーーーーーーーーーーーーー
:compress_csv.py
import os
import gzip
import shutil
from concurrent.futures import ProcessPoolExecutor
def compress_file(csv_file, output_folder):
base_name = os.path.basename(csv_file)
gzip_path = os.path.join(output_folder, base_name + '.gz')
with open(csv_file, 'rb') as f_in, gzip.open(gzip_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
print(f"Compressed: {csv_file} -> {gzip_path}")
def main(input_folder, output_folder):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
csv_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.csv')]
with ProcessPoolExecutor() as executor:
executor.map(compress_file, csv_files, [output_folder] * len(csv_files))
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: python script_name.py <path_to_input_folder> <path_to_output_folder>")
sys.exit(1)
input_path = sys.argv[1]
output_path = sys.argv[2]
main(input_path, output_path)
ーーーーーーーーーーーーーーーー
検証実施スクリプト
対象テーブルに対してAthenaのクエリーを10回実行して平均所要時間を計測するスクリプト
ーーーーーーーーーーーーーーーー
:mesuring_athena_execution_time.py
import boto3
import time
import sys
def execute_athena_query(database, table_name, output_location, region_name="us-west-2"):
client = boto3.client('athena', region_name=region_name)
query = f"SELECT sum(random_number) FROM {table_name} "
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': output_location
}
)
query_execution_id = response['QueryExecutionId']
final_states = ['SUCCEEDED', 'FAILED', 'CANCELLED']
start_time = time.time()
while True:
response = client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status in final_states:
break
time.sleep(1)
end_time = time.time()
elapsed_time = end_time - start_time
return status, elapsed_time
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python script_name.py TABLE_NAME")
sys.exit(1)
table_name = sys.argv[1]
database_name = 'performance'
output_s3_path = 's3://performance-measurement-20231022/athena/log'
region = 'ap-northeast-1'
total_time = 0
executions = 10
for _ in range(executions):
query_status, execution_time = execute_athena_query(database_name, table_name, output_s3_path, region)
if query_status != 'SUCCEEDED':
print(f"Query {query_status}. Exiting.")
sys.exit(1)
total_time += execution_time
average_time = total_time / executions
print(f"Average execution time over {executions} runs: {average_time:.2f} seconds.")