Amazon Web ServiceでSESとSQSを使って、大量に高速に送ることが出来るメールマガジン配信システムを構築してみました。


もともと、別の開発者の人が開発していたものを、引き継いで受託していたサービスが、日々会員数が増えるたびにメールの配信時間が伸びていき、最終的に2万人に配信するのに6時間ほどかかるシステムになっていました。

システム

システム構成はシンプルでAWSのEC2のインスタンス上でDBやアプリケーションが動いており、画像などのリソースの保存にS3を、メール送信にSESを使っています。
Screenshot 2014-09-14 18.41.18

ちなみにアプリケーションはJavaEEです。最近ではちょっと珍しい?ですね。

問題点

もともとのシステムは、管理画面から登録されたメルマガ情報からメルマガを受診する対象のユーザーを取得し、名前などを埋め込んだスプールを作成する「スプール生成バッチ」と、DBに登録されたスプールを順番に送信する「スプール送信バッチ」が存在しました。
また、スプールの送信が追いつかない場合はバッチが複数呼び出されるようになっています。

このシステムが設計されたのは、会員数の少ない頃では耐え抜いてきましたが、次第に会員数が増えると全体の送信に要する時間が増えてきました。
原因を探ると、スプール送信バッチが複数呼び出されて同じメールを複数送信しないようDBをロックしていたことが問題でした。

図にすると以下の感じです。
Screenshot 2014-09-14 21.25.30

解決策

ここで、求められる要件は以下の3つ。

  • スプール生成バッチとスプール送信バッチが重なることがある
  • 同じメールを二度送信してはいけない

そこで、便利なのがAWSのSQS(Simple Queue Service)です。
シンプルなキューイングサービスですが、自分で構築するよりもメリットが多く、以下の様な特徴があります。

  • 高速
  • 信頼性が高い
  • 低コスト
  • 分散マネージドキュー
  • スケーラビリティに優れている

SQSについては以下のスライドで詳しく解説されています。

SQSを使ったシステムを図にすると以下の感じです。
Screenshot 2014-09-14 22.30.30

SQSを使うことで、バッチが同時に動いていても、スプール生成バッチでスプールをキューイングし、スプール送信バッチでキューからスプールを取得しメールを送信することが出来るようになります。

SQSの機能

SQSのQueueの作成方法などはいろんなページで紹介されてるので割愛します。

SQSの機能はQueueの設定画面から詳細な設定をすることが出来ますが、そのうちいくつかを紹介します。
Screenshot 2014-09-14 23.46.12

Default Visibility Timeout と Delivery Delay

Delay_Queues
Default Visibility TimeoutはSQSで一番の肝となる機能です。
SQSでは、キューを送信してからDefault Visibility Timeoutで指定されている時間はそのキューを送信することはありません。ということは、Default Visibility Timeoutで指定されている時間を過ぎた場合また、同じキューを送信してしまいます。
Default Visibility Timeoutではキューの処理にかかる時間より多めの時間をセットしておくことで、キューの処理に失敗した場合に、またそのキューを処理することが出来ます。

また、Delivery Delayを設定することにより、SQSにキューが登録されてからキューの対象になるまで間隔を開けることが出来ます。

Dead Letter Queue

Dead Letter Queueを設定することにより、n回以上取り残されたキューを別のQueueに移動することが出来ます。移動されたキューに対してもう一度バッチを回したり原因究明に使ったりすることが可能です。

実装

以下、SystemConfigMailSpoolなどのクラスが登場しますが、雰囲気で察してください。

汎用クラス

AWSのサービスを使う際にいちいち認証周りを基にしなくてもいいよう、AmazonSQSClientを提供してくれる汎用的なクラスを作っておきます。

public class AWS {
  public static AmazonSQSClient getSQSClient() {
    final String accessKey = SystemConfig.get("AMAZON.ACCESS_KEY", "");
    final String secretKey = SystemConfig.get("AMAZON.SECRET_KEY", "");
    final BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
    return new AmazonSQSClient(credentials);
  }
}

スプールをキューイング

メールのデータとなるスプールをSQSにキューイングします。

ここで、メールのデータをキューに埋め込むのですが、SQSでは、Message Bodyの他にも、構造化されたデータを埋め込めるMessage Attributesがあります。Message BodyにJSON化したデータを埋め込むのも手ですが、Message AttributesはString、Number、Binaryなど特定の型のデータを埋め込むことが出来ます。

尚、埋め込めるデータは全部で256KB以内です。

private void addSQSQueue(MailSpool spool) {
  final String queueUrl = SystemConfig.get("MAIL.QUEUE.SQS_QUEUE_URL", "");
  final AmazonSQSClient sqs = AWS.getSQSClient();

  final Map<String, MessageAttributeValue> attributes = new HashMap<String, MessageAttributeValue>();
  attributes.put("fromAddress", new MessageAttributeValue().withDataType("String").withStringValue(spool.getFromAddress()));
  attributes.put("toAddress", new MessageAttributeValue().withDataType("String").withStringValue(spool.getToAddress()));
  attributes.put("subject", new MessageAttributeValue().withDataType("String").withStringValue(spool.getSubject()));
  attributes.put("body", new MessageAttributeValue().withDataType("String").withStringValue(spool.getBody()));
  sqs.sendMessage(new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody("mailSpool").withMessageAttributes(attributes));
}

キューからスプールを送信

SQSからキューを取り出し、スプールを送信します。

いくつか注意事項があり、SQSは取得するMessage Attributesを指定しない場合、Message Attributesを返してきません。そのため、明示的に指定する必要があります。

また、これが一番の肝ですがSQSのシステムは最低一度のメッセージ到達を保証です。ということは、一度以上同じキューが配信される場合があります。
そのため、システム側で同じキューを処理したかのチェックが必要となります。

private static int sendSpool() {
  int count = 0;

  final String queueUrl = SystemConfig.get("MAIL.QUEUE.SQS_QUEUE_URL", "");
  final AmazonSQSClient sqs = AWS.getSQSClient();

  while (true) {
    List<Message> messages = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMessageAttributeNames("fromAddress", "toAddress", "subject", "body")).getMessages();

    if (messages.size() == 0) {
      break;
    }

    for (com.amazonaws.services.sqs.model.Message message : messages) {
      // SQSでは"最低一度のメッセージ到達を保証"しているためすでに送信している場合はキューを削除し、次のキューへ
      if ( <alreadySent?> ) {
        sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle()));
        continue;
      }

      boolean result = createMailSpool(message).send();
      if (result) {
        count++;
        sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle()));
      }
    }
  }

  return count;
}

まとめ

この仕組みで、約1万件を10分くらいで処理することが出来ました。おそらく、もう少しチューニングすればもう少し速く送信することが出来ると思います。
AWSのシステムを組み合わせてどんどんアプリケーションを作りこむことができるので、AWSは面白いですね。

ちなみに、コスト面ですが最初の100万件までは無料で、それ以降、100万件につき$0.50ということで、この規模であればタダで使う事が出来ました。

また、詳しい使い方や仕様については、公式ドキュメントをご覧ください。