Skip to content

Instantly share code, notes, and snippets.

@voluntas
Last active June 19, 2024 03:22
Show Gist options
  • Save voluntas/14a3421585bb08379314 to your computer and use it in GitHub Desktop.
Save voluntas/14a3421585bb08379314 to your computer and use it in GitHub Desktop.
時雨堂 MQTT ブローカー開発ログまとめ

時雨堂 MQTT ブローカー開発ログまとめ

日時:2015-01-10
作:@voluntas
バージョン:0.3.4
url:http://voluntas.github.io/

注意

発表用の資料として作られているため、 2014 年 9 月 5 日時点でのまとめです。

  • 2014 年 8 月 29 日に行われた MQTT Meetup Tokyo 2014.08 の発表資料でしたがそこから色々書き換えられています
  • 2014 年 9 月 5 日に行われた (COPY) MQTT Meetup Tokyo 2014.08 の発表資料です

概要

もともと開発ログを書いていたのですが、これは苦労話をまとめたものです。

時雨堂 MQTT ブローカー開発ログ

MQTT ブローカーは仕様を守ったり、細かいことを気にしなければあまり難しくありません。 プロトコル自体はとてもシンプルですし、やってることもたいしたことではありません。

ただし、細かい所を見ていくとかなり深い闇が見えてくるのでその辺を話していきます。

で、だれ?

時雨堂という会社から来ました。

Twitter も GitHub も ID は @voluntas です。

Erlang/OTP で何かを書いて飯食べています。

MQTT 歴は 9 ヶ月目です。

なにはなすの?

MQTT ブローカーを Erlang/OTP で実装しているので、その苦労話します。

目次

  • MQTT ブローカーの実装
    • QoS 1,2 の実装
    • リトライ の実装
    • クラスター の実装
    • will の実装
    • retain の実装
    • セッションの実装
    • ステートフル + タイマーの実装
    • $SYS の実装
  • 時雨堂 MQTT ブローカーの実装
    • 接続数制限
    • パーミッション制限
    • ペイロードサイズ制限
    • セッション有効期限
    • Retain 有効期限
    • メッセージ流量制限
    • セッションメッセージ保存数制限
    • 強制リトライ機能
    • テスト
  • 今後
    • REST API
    • Socket.IO 対応
    • セッションのクラスター同期
    • MQTT ブローカーサービス

MQTT ブローカーの実装

MQTT 3.1.1 の仕様を前提の話

QoS 1-2 の実装

  • QoS 0 は投げっぱなしなので誰でも
  • QoS 1-2 は投げてから戻ってくるのを確認してから PUBLISH メッセージを削除する必要がある
    • 例えば QoS 1 で投げたメッセージが、送り先 1000 個ある場合は 1000 からの PUBACK を全て受け取って確認する必要がある
      • 一つでも抜けていた場合はそこに対してリトライを発行する必要がある
      • 全ての想定しているクライアントに正常に送りきり、PUBACK が全部から返ってくるのを確認してメッセージを PUBLISH 削除する
  • QoS 1-2 はリトライやダウングレードなどが存在し、ただただ複雑になって行く
  • それぞれのクライアントに対して packet_id をちゃんとインクリメントして届ける

QoS ダウングレードの実装

  • 真面目に実装すると最初に引っかかる難関がこの QoS のダウングレード機能
  • SUBSCRIBE 側が QoS 0 で、PUBLISH 側が QoS 2 の場合は QoS 0 で処理しなければならない
  • 1:1 でメッセージで送る場合はいいが、 1:N なので QoS 2 のメッセージが来たときは 0,1,2 の可能性がある
    • 全てのメッセージをそれぞれリトライも考えなければいけない
  • 秒間 1 万のメッセージでダウングレードを考えると恐ろしい ...

リトライ の実装

  • QoS 1-2 の場合はクライアントから戻りが返ってこなかった場合はリトライを行う必要がある
  • リトライ回数やタイミングなどは仕様で決められていないため、判断はブローカー実装者にゆだねられている
  • QoS 2 の場合は PUBLISH だけでなく PUBREL もリトライする必要がある
    • セッション時に PUBREL だけを保存するのは現実的で無い為、リトライするメッセージを PUBLISH に戻す必要がある
  • リトライ回数は最大 10 回、リトライ間隔は 3 秒をデフォルトとしてみた
  • 2 回目からは dup_flag を有効にする
  • packet_id は変更してはならない

クラスター の実装

  • MQTT では bridge と呼ばれる機能がよく実装される
  • 時雨堂では bridge は実装せずクラスター一本で行くことにした。
  • クラスターは複数台の MQTT ブローカーが一つの大きな MQTT ブローカーに見えるようになるのが最終的なゴール
  • どのノードへ PUBLISH しても、どのノードで SUBSCRIBE していても、あたかも一つのブローカーで動いているように見える
  • セッション時のメッセージの同期が最大の課題
    • 一つのメッセージの配信が本人に対して終わったら一つずつメッセージを同期して削除していく必要がある。
  • スプリットブレインは多数決で対応する

Will の実装

  • CONNECT 時に設定した will メッセージ を配信する
  • クライアントが DISCONNECT を送って来る以外の終了処理全てで発生させる
    • TCP の切断やエラー、それ以外の切断系
  • Will も QoS のダウングレードはもちろん存在する
    • QoS 0,1,2 全てに対して対応する
    • あれ ... PUBREL は ?
  • 終了時にクライアントが 1 メッセージ送ってきたという動きにする
    • ただしそのメッセージは CONNECT 時に送られてきたメッセージを使う
  • QoS 1,2 であればリトライも必要

Retain の実装

  • Subscriber への PUBLISH メッセージ配信と真逆
  • SUBSCRIBE してきた Topic にマッチするメッセージを今まで送られてきた 全ての Retain メッセージから探し出す
    • SUBSCRIBE Topic が # で来た場合は全てのメッセージを戻すことになる
    • Retain メッセージが 1 億あったら 1 億のメッセージを Subscriber に戻すことになる
    • 10 の Subscriber が同時に # で接続してきたら ...
  • 配信とは逆のツリーが必要になる
    • /a/+/b/# にマッチしているメッセージ リスト を探し出す。
  • Retain は削除されないかぎり残り続ける
    • 1 年か使われない Topic で Retain されるとそのメッセージは無限に残り続ける
    • どうにかして管理をする手段が必要になる
  • Retain も QoS のダウングレードが存在する
    • これが送られてきた QoS と送り先の QoS を比較しつつ全ての Retain メッセージで処理を変えていく必要がある
  • パケット Id はもちろんクライアント単位でインクリメントしていくこと
  • Retain を送っている間にも別のクライアントからメッセージは送られている
  • Retain を送っている間にもセッションで保存していたメッセージは送られている

セッションの実装

  • 接続が切断したタイミングで自分の代わりにメッセージを受け取る代理人(セッション)を立てる
  • 代理人はメッセージを受け取ったら自分自身のキューに詰め込む
  • 再接続時に代理人を探して存在している場合は代理人から溜め込んでいたメッセージを受け取る
  • 代理人と本人の区別は優先順位を採用する
    • 本人が存在したら本人へ
    • 本人が不在だったら代理人へ
  • 代理人から本人へ溜め込んだメッセージを配信中に本人が不在になった場合の処理
    • 代理人はすぐさま本人へのメッセージ配信をやめる
  • 溜め込んだメッセージでも QoS のダウングレードは発生する
  • セッションに溜め込んだメッセージを配信している間にも他のメッセージは送られてくる
  • リトライが発生したメッセージで、明確に本人が居なくなった場合、代理でリトライメッセージを受け取れなければならない
    • 例えば QoS 1 で PUBACK が返ってこないので、リトライしようと思ったけど本人がいなくて、代理人が居た場合
  • CONNECT 時にセッションがあるかどうかのフラグが MQTT 3.1.1 で追加された
  • PUBREL をそもそもリトライするの嬉しいのか?
  • PUBREL はセッションとして残すべきなのかどうか
    • 残さないで PUBLISH (QoS2) を残すことにした

ステートフル + タイマーの実装

  • MQTT では一定期間クライアントからメッセージが送られてこない場合は CONNECT を切断する必要がある
  • メッセージが送られてくる毎にタイマーはリセットされる
    • タイマーが切れた場合は Will を確認しつつコネクションを切断する
    • もちろんセッションが有効であればセッションも有効にしてからの切断
  • 接続開始時の CONNECT に必要な情報は全て入ってくるので保持している必要あり
  • PacketId は古いのを使わない、今使われているのを使わない
  • 10 万接続であれば、タイマーは 10 万以上必要

$SYS の実装

$SYS は統計情報です。# では取得できず、明示的に $SYS を指定する必要があります。これは仕様で決まっています。

mosquitto の $SYS 一覧が出ているので、この辺を同実装していく必要がありそうかを解説していきます。

http://mosquitto.org/man/mosquitto-8.html

mosquitto 固有の $SYS は省略しています。

  • $SYS/broker/bytes/received
    • ブローカーが受信したメッセージのバイト数
    • 受け取ったパケットでデコードできたものなのか
    • 受け取ったパケットすべてなのか
  • $SYS/broker/bytes/sent
    • ブローカーが送信したメッセージのバイト数
    • 10 バイトを 10000 箇所に配信したら 100000 バイト
  • $SYS/broker/clients/active
    • 現在接続中のクライアント一覧を取得するには、接続時に +1 して離脱時に -1 する
    • マイナスになってはいけない
  • $SYS/broker/clients/expired
    • セッション状態で期限が切れた場合カウントアップ
    • 期限を持っているということはタイマー、セッション数が 10 ならいいけど
  • $SYS/broker/clients/inactive
    • セッション状態の数を数えられるようにする
  • $SYS/broker/clients/maximum
    • 今までで最大接続したクライアント数
    • $SYS/broker/clients/active の最大値
    • $SYS 更新したときに比較する
  • $SYS/broker/clients/total
    • 今までの acive + inactive の合計値
    • カウントアップする場所が増える
    • 接続時、セッション時に +1 していく
  • $SYS/broker/messages/inflight
    • QoS 1,2 で完了していないメッセージ数
    • リトライ中だったり反応待ちだったり
    • 完全に終了したら -1
  • $SYS/broker/messages/received
    • ブローカーが起動してから受信したすべてのメッセージ
    • 種類は関係ない
  • $SYS/broker/messages/sent
    • ブローカーが起動してから送信したすべてのメッセージ
    • 種類は関係ない
  • $SYS/broker/messages/stored
    • retain や session 用のキューとしてメッセージをブローカーが責任持った数
  • $SYS/broker/publish/messages/dropped
    • 何かしらの制限によってサイレントディスカードされたメッセージ数
  • $SYS/broker/publish/messages/received
    • 受信した PUBLISH 数
  • $SYS/broker/publish/messages/sent
    • 送信した PUBLISH 数
  • $SYS/broker/retained messages/count
    • retain したメッセージ数
  • $SYS/broker/subscriptions/count
    • 現在接続している SUBSCRIBE の数
    • 今の接続している topic 一覧の数
  • $SYS/broker/uptime
    • ブローカーが起動してからの時間

WebSocket

  • sec-websocket-protocol が 3.1 と 3.1.1 で違う
  • そして複数投げてサーバ側に選択権をゆだねる
  • 想像していたより人気 ...
  • paho は QoS 2 を実装していない

時雨堂 MQTT ブローカーの実装

MQTT 3.1.1 の仕様には一切関係ない、時雨堂の MQTT ブローカーの独自仕様の話

接続数制限

  • ユーザ名単位で接続制限を付けられるようにしたい
  • 1 ユーザが 10 万接続とかできないようにしたい
  • サービスをする場合は必須

パーミッション制限

  • ユーザ名単位で制限
  • どの Topic に送れるのか
  • どの Topic を受け取れるのか
  • マルチテナントを考えると、他の人のメッセージを受け取れるのは無理
  • QoS の制限もかけられる
    • SUBSCRIBE 時に 128 を返すと その Topic/QoS は使えない という事をブローカーからクライアントに伝えられる
  • サービスをする場合は必須
  • Will メッセージにもパーミッションは必須なわけで

ペイロードサイズ制限

  • PUBLISH メッセージは最大 256MB だが、現実的なサイズを制限できるべき
  • 不可を考えると大きくても 1 MB 程度
  • 転送料に直接的に影響する
  • パーサの長さ部分ではじく必要がある
    • Length チェックしてそもそも長そうなペイロードが来そうな場合ははじく
    • Buffer を真面目に受け取ってたら 256MB 受け取ってからはじく感じになる
  • サービスをする場合は必須

セッション有効期限

  • セッションは再接続が無い限り無制限に持ち続けてしまう
  • 1 日接続が無い場合はセッション自体の破棄ができる用にする
  • サービスをする場合は必須
  • 無限に保持されるとデータベースが太る
  • タイマーを持たせて一定時間経過したらセッションを削除する
    • 溜め込んだデータも丸ごと削除

Retain 有効期限

  • ユーザ名単位で有効期限を指定
  • Retain はクライアントが消さないかぎり消えない
  • Retain は好き勝手に溜め込むことができる
  • Retain 取得時に期限が切れていたら削除する
  • Retain の中身を見れる機能が必要
  • バッチで期限切れを削除できるようにする

メッセージ流量制限

  • メッセージがどれだけ流れているか確認したい
  • ユーザ名単位でメッセージの流量を制限する
  • クラスターでは同期できていない
    • クラスター時にメッセージ流量制限を有効にして同期していたら破綻する可能性が高い
    • ただ全体での流量は欲しくなりそう、SPoF になる予感はしている
  • メッセージはブローカーが受信したメッセージのみをカウントしている
    • 将来的には送信もカウントする予定

セッションメッセージ保存数制限

  • セッション時に送られてきたメッセージをどれだけため込めるか
    • クライアント ID 単位で管理している
  • 10 であれば、10 メッセージだけしかため込めずそれ以外は破棄
    • 将来的には最新の 10 件という機能を追加予定
    • 古いのは消えていく

強制リトライ機能

個人的に一番気に入っている機能

  • 本人が受けきれず代理人にリトライを送る際、本人は最後にリトライを管理している人に代理人に今すぐリトライをするように依頼を投げる。
  • リトライ管理のコストも減り、メッセージも確実に代理人へため込まれる。

テスト

ブローカーのテストは難しいため、MQTT 非同期クライアントを Erlang/OTP で実装し、単体テストとして実際に TCP 接続をして色々なテストを実行できるような仕組みを作った。

非同期である必要性がある。1 つは CONNECT しておいて、別の接続からパケットを投げるをテストで実現する。その後最初に CONNECT しておいた方にメッセージが届いているかどうかを確認したい。

今後

REST API

  • HTTP ベースで QoS1 までのメッセージを PUBLISH できる
  • HTTP (XHR/WebSocket) ベースで QOS1 までのメッセージを SUBSCRIBE できる

Socket.IO 対応

WebSocket + MQTT 需要ありそうなので、Socket.IO に対応すると面白そうなので実装してみたい

  • Socket.IO っぽい実装は経験あるので、対応できそう

セッションのクラスター同期

LevelDB + Raft あたりで実装してみたい

  • 他のノードにセッション情報が同期される
  • 一つのノードでセッションからメッセージがクライアントに配信されたら、他のノードでは削除される
  • 多数決システムをとり、スプリットブレイン対策を入れる

無償で使える MQTT ブローカーサービス

MQTT as a Service Sango

https://sango.shiguredo.jp

皆様是非使ってフィードバックをばしばし送っていただけると嬉しいです。

商用 MQTT ブローカー

MQTT Broker Akane

http://akane.shiguredo.jp

興味ある方はご相談ください。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment