Welcome to telecotele.com » 丁寧な暮らし

インターホンの録画と通知

インターホンの画面を録画し、Slackに飛ばします

tags: house intercom

インターホンの録画と通知のカバー画像

インターホンからの通知をインターネット越しに受け取りたい。 最近のインターホンではスマホアプリと連携するようなものもあるようですが、自宅のインターホンでは対応していません。

ところで、AIPH〇NEのプロトコルに関する追試ではインターホン室内端末のLCDはNTSCの信号を表示しているという夢を見ました。 ならこの信号を録画して、Slackか何かに貼り付ければいけるのでは? ということでやってみました。

NTSCの分岐(回路編)

まず、NTSC信号の分岐についてです。 単純に信号線を分岐させるとこんな感じ1になります。

NTSC信号の分岐(単純) NTSC信号の分岐(単純)

もともと受信回路1(既存)だけに繋がっていた信号線を分岐させて、SWを経由して受信回路2(増設)に繋ぐ感じです。 それぞれの受信回路内では、NTSCの特性インピーダンスに合わせて75Ωの終端抵抗が存在します。 受信回路の(Rと75Ωにより分圧された)入力電圧は1Vp-pの想定です。

ここで、SWを閉じる(受信回路2を繋ぐ)と各受信回路内の終端抵抗が合成され、75Ωにはなりません。 これによってインピーダンスマッチングがうまくいかなくなる、またRとの分圧比が変わるため受信回路に入力される信号レベルが下がります。 なら受信回路2の抵抗2を外してしまうか、というのは今回の用途以外で使えなくなるので避けたい。

ではどうするかというと、分岐(正確には分配)させるときは一度終端し、受信回路ごとにアンプで受けてそこから流す構成とするようです。 具体的にはこんな感じに。

NTSC信号の分岐(分配) NTSC信号の分岐(分配)

これなら問題ないですね。 どこかの受信回路が繋がっていようといまいと、他の受信回路に影響を与えることはありません3

ただ、今回はインターホンに関する信号を扱っています。 これを踏まえると、自分で作った回路をインターホンの室内端末側には繋ぎたくありません。 もし回路に問題があって室内端末をぶっ壊しました、というのは避けたいです。 「室内端末」であって幹線に何かするわけではない。 インターホンシステム全体を壊すことに比べるとマシとはいえ……。

そこで、分岐はするが既存の受信回路はそのままにして、増設部分だけアンプを経由して流すようにします。 具体的にはこうです。

NTSC信号の分岐(令和最新版) NTSC信号の分岐(令和最新版)

これなら既存部分はなるべく触らないので安心です。 ただ、もし既存の受信回路を取り外したり(今回の件とは関係なく)何らかの理由で壊れて終端抵抗がかからなくなったりすると、増設側に意図しない電圧がかかる可能性はあります。 その結果、増設した受信回路も壊れる懸念はありますが……ここで使うのは後述の$0.99で買ったデバイス。 インターホン側のほうが大事です。

NTSCの分岐(実装編)

先ほどの回路を実装していきます。

まず電源は5Vです。 これはインターホンの室内端末側から取るのではなく、外部(後述のRaspberry Pi)からUSB経由でもらいます。

また分岐部分については、もともとインターホン室内端末内のメイン基板とLCD基板担当がJST ZHコネクタで接続されていることを利用し、コネクタを使って復元可能な形で信号線を分岐させて取り出しました。

さらにオペアンプにはNJM2722を使い、これは負電源を必要とするのでそれ用にNJW4191も使います。 NJM2722のスルーレートは1000V/us(!)です。 今回の用途だとオーバースペックすぎる。 また、75Ωをドライブできるかは分からず4、本当はビデオ用アンプを使ったほうが良かったでしょう。 具体的にはNJM2575など。 というかこっちのほうが消費電流も小さいし、単電源で動くし、LPFも入っているし良いような……。 今回の実装では手間をかけてビデオ用アンプのLPF抜きを作ったことになってしまった。

基板はKiCadで設計しJLCPCBに発注しました。 この基板自体はPCBのみ(アセンブリなし)で製造も順調。 一方で別件のPCBA込みの基板と混ぜて発注したためにこれに引きずられて、また配送についてもTail Latencyのしっぽ側を引いてしまったようで手元に届くまでちょっと時間がかかった。 同梱したことで送料を節約できたかというとそうでもなく、これならわざわざまとめて注文を出すよりこまめに出したほうが良かったかも知れない。 とりあえず発注して待っている間に、次の基板を設計した方が効率が良い。TCP_NODELAY付けておきます。

この基板に対して実際にNTSCの信号(インターホンとは関係ない)を分岐させた信号を入力し、動作確認をするとこんな感じに。

動作確認 その1 動作確認 その1

C1(黄)が既存の受信回路に入力される信号、C2(ピンク)がオペアンプの出力で増設した受信回路に入力される信号です。 同じように見える。よく分からないので重ねてみます。

動作確認 その2 動作確認 その2

カラーバーストの部分を拡大しました。 ピンクは気持ち遅れている程度で、ちゃんと動いていそうですね。

これを3Dプリンタで作ったケースに収め、インターホン室内端末の隙間から信号線を這わせて壁に貼り付けました。

ケース with 基板 ケース with 基板

壁 with ケース 壁 with ケース

手前に棚があり、わざわざ覗き込まない限り配線は見えないという想定です。 本当は壁の中に通したかったですが、それは難しかった。

キャプチャ編

分岐されてきた信号をCVBSのキャプチャのデバイスでキャプチャします。

インターホン室内端末側の画面が消えているときは無信号(黒い画像が取得される)のため、 とりあえず画面に何か出してキャプチャしてみた画像はこんな感じです。

キャプチャ結果 キャプチャ結果

着荷画面 (追記)着荷画面も載せちゃお

いい感じですね。もちろん、インターホン室内端末側の画面も問題なく映っています。 これを発展させ、最終的にこんなスクリプトを用意し動かしました。

  1. 信号が来るかチェック
    • 画像をYUYV形式・低解像度・低fpsで取得する
    • 全体が黒っぽければ無信号と判断、1.の処理を続ける
    • 信号が来たら2.へ進むついでに3.のスレッドを起動
  2. 録画開始
    • MJPG形式・高解像度・高fpsで画像を取得し、動画ファイルにする
    • MJPG形式のまま保存。ここでは何もエンコードせず、右から左へ受け流すだけ
    • 一定時間続ける。終わったら4.のスレッドを起動して、1.に戻る
  3. 2.と同時に別スレッドで通知処理
    • HomeKitに通知
    • Slackにも通知
  4. 2.の終了後に別スレッドで動画をエンコード
    • ffmpegでH.264形式の動画に直す
    • 終わったらSlackに添付するとともに、ローカルにも保存しておく
具体的なコードはこんな感じ
import cv2
import os
import time
import subprocess
from slack_sdk import WebClient
import paho.mqtt.publish as publish
from concurrent.futures import ThreadPoolExecutor

REC_WIDTH    = 640
REC_HEIGHT   = 480
REC_FPS      = 20
REC_DURATION = 45
SAVE_DIR     = "xxxx"
SLACK_TOKEN  = "xxxx"

def detect_mode():
    print('detect_mode')
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
    cap.set(cv2.CAP_PROP_FPS, 5)
    cap.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)

def record_mode():
    print('record_mode')
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, REC_WIDTH)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, REC_HEIGHT)
    cap.set(cv2.CAP_PROP_FPS, REC_FPS)
    cap.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)

def is_frame_black(frame):
    #return cv2.countNonZero(cv2.cvtColor(frame, cv2.COLOR_YUV2GRAY_YUYV)) == 0
    return frame[0][0][0] == 0

def notify(frame):
    print('notify')
    publish.single('xxxx/getSwitch', '1', hostname="xxxx")
    publish.single('xxxx/getMotionDetected', 'true', hostname="xxxx")
    cv2.imwrite('/tmp/frame.png', cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV))
    client = WebClient(SLACK_TOKEN)
    client.files_upload_v2(
        channel="xxxx",
        title="frame.png",
        file='/tmp/frame.png',
        initial_comment="xxxx"
    )
    os.remove(f'/tmp/frame.png')

def record(cap):
    print('record')
    filename = f'xxxx-{time.strftime("%Y%m%d_%H%M%S")}'
    with open(f'/tmp/{filename}.mjpg', 'wb') as f:
        for _ in range(REC_DURATION * REC_FPS):
            _, frame = cap.read()
            f.write(frame.tobytes())
    return filename

def encode(filename):
    print('encode')
    cmd = [
        'ffmpeg', '-loglevel', 'warning',
        '-r', str(REC_FPS), '-i', f'/tmp/{filename}.mjpg',
        '-c:v', 'h264_v4l2m2m', '-preset', 'ultrafast', '-tune', 'zerolatency',
        '-video_size', f'{REC_WIDTH}x{REC_HEIGHT}', '-framerate', str(REC_FPS), '-b:v', '2M', '-pix_fmt', 'yuv420p', f'{SAVE_DIR}/{filename}.mp4'
    ]
    ret = subprocess.run(cmd)
    if ret.returncode == 0:
        os.remove(f'/tmp/{filename}.mjpg')
        client = WebClient(SLACK_TOKEN)
        client.files_upload_v2(
            channel="xxxx",
            title=f'{filename}.mp4',
            file=f'{SAVE_DIR}/{filename}.mp4',
            initial_comment="xxxx"
        )

if __name__ == '__main__':
    device_path = '/dev/v4l/by-id/usb-MACROSILICON_AV_TO_USB2.0_20200909-video-index0'
    cap = cv2.VideoCapture(device_path, cv2.CAP_V4L2)
    detect_mode()

    with ThreadPoolExecutor() as executor:
        while True:
            status, frame = cap.read()
            if not status:
                print("Status Error")
                exit(1)
            if is_frame_black(frame): continue

            # detect
            record_mode()
            executor.submit(notify, frame)
            filename = record(cap)
            executor.submit(encode, filename)
            detect_mode()

    cap.release()

ここではRaspberry Pi Zero Wで動かしており、できれば負荷をかけたくない。

信号チェックの画像取得をYUYV形式で行っているのは伸張せずに済ませたいため。 さらにここではチェックするだけなので低解像度・低fps(320x240/5fps)で十分です。 無信号のときは本当に全画素が黒のようで楽に判定できます。 この処理にかかるCPU使用率は2%ほど。 多少の誤判定リスクを許容して、YUYV形式のまま最初の1画素のYが0か否か判断してもいけそうでした。 この場合のCPU使用率は1%ほど。 これくらいのCPU使用率なら別に気にしなくても良いと思いますが、無信号か否か判断するだけならハードウェア側でトリガーかけるようにもできましたね。

信号が来たら「高解像度・高fps」で録画を開始しますが……実際は640x480/20fpsほどです。 これでももうYUYV形式では取得できず、この時点でMJPG形式に切り替えます。 この切り替えにかかる時間は300msほど(この間のフレームは欠落)。 このときMJPGの取得だけならCPU使用率はまだ余裕がありますが、伸張してしまうとCPU使用率が100%になりまともに処理できません。 そこで、ここでは何も触れず、取得したデータはMJPG形式の動画ファイルとして保存しておきます。 動画フォーマットとして統一された仕様は無いようでしたが、取得した画像を単純に連結すればいけました5

録画と同時に別スレッドで通知も行います。 まず行うのは、入門スマートホームで構築したHomebridgeとhomebridge-mqttthing pluginを経由したHomeKitへの通知です6。 これによりHomePodが鳴って、ホームアプリでも通知を許可しているとApple WatchやiPhoneなどに通知が来ます。

Apple Watchへの通知 Apple Watchへの通知

本当は画像とともに通知したかったのですが、これを行うには動画のストリーミングも行う必要がある。 それはリソース的に厳しい……ダミーの動画を流すのもなぁ……ということで、代替としてSlackに画像を添付して通知させています。

録画が終わると、別スレッドでMJPG形式の動画をH.264形式に変換します。 この処理が重く、Raspberry Pi Zero Wに備わるハードウェアエンコーダを使っても実時間に対して0.6倍ほどの速度しか出ません。 CPU使用率は100%。 ちなみにソフトウェアでエンコードすると実時間に対して0.1倍の速度になってしまいます。 とはいえ、エンコードせずにMJPGのままにすると取り回しが悪くどうしたものか……という感じです。

おわりに

インターホンの録画と通知を行いました。

「録画」と「通知」だけなので、もし外出先でこの通知を受け取っても会話はできず、解錠の操作もできません。 でも通知があると何か面白くて良かった。

それはそれとして、こんなことしなくて済むようにスマホアプリ連携のあるインターホンに設備更新してくれ。

Footnotes

  1. Rはインピーダンスマッチングのため75Ωとするのが基本のようですが、簡易的に信号レベルだけ適正にしたい場合は必ずしもそうではなく、ここではRとしています。

  2. そもそもレシーバICとかに内蔵されていたらどうしよ……と思い、CVBSのキャプチャのデバイスで確認したところ、このデバイスでは外付けの模様。でも抵抗値は56Ω。IC側の(データシートは見れませんが)仕様に沿ったものなのか、コスト削減でこうしているのかは不明です。

  3. では逆に他の受信回路に影響を与える場合は?たとえば、オペアンプ1個だけ用意して受信回路の数だけ前もって増幅しておく……みたいなことをすると、もし受信回路を付け外しした場合に分圧比が変わって他の受信回路に影響を与えます。

  4. 試した限りだといけましたが……データシートに記載は無いので適正かは分かりません。

  5. 参考: V4L2でカメラからMJPEGのフォーマットで映像を取り出すプログラムをZigで作った

  6. 参考: HomePodをドアベルチャイムにする | DIY Smart Matter