Rapiroに赤外線の受信機を搭載してみた

丸パンダ研究所にはSwitchBotがある。
来客のピンポンが聞こえづらいので、インターホンのモニタにセンサーをつけて検知をしたらハブミニから発信された赤外線を受け取って、「誰かが来たよ」としゃべってもらうために赤外線の受信機を搭載することにした。

「Rapiroをしゃべらせてみた」の記事はこちら

Rapiroをしゃべらせてみた
Rapiroは耳の部分にスピーカーを取り付けられるように設計されている。せっかくなのでOpen JTalkを使ってRapiroをしゃべらせることにした。

赤外線受信モジュールを取り付ける

用意したもの

あくまで丸パンダ研究所のRapiroにつけたものなので、下記のものでなくてもよい。

赤外線受信モジュールの取り付け

赤外線リモコン受信モジュールとRaspberry Piを接続する。

丸パンダ研究所のRapiroはGPIO17を使用。
耳の部分からケーブルを出してツノ(?)の先に赤外線リモコン受信モジュールを取り付けた。

pigpioを使って赤外線を受信したらしゃべるようにする

pigpioとは

pigpioは、GPIOピンを制御できるRaspberry Pi用のライブラリで、PythonやCなどのプログラミング言語から、赤外線通信やLEDの点灯、サーボモーターの制御などを柔軟に扱えるのが特徴。

pigpioのインストール

pigpiod(本体)とpython3-pigpio(クライアントライブラリ)をインストールする。

sudo apt-get install pigpio python3-pigpio

デーモンの自動起動を有効化

pigpioを使用するには、事前にデーモン(pigpiod)を起動する必要がある。
毎回手動で起動するのは手間なので、Raspberry Piの起動時に自動で起動するように設定しておく。

sudo systemctl enable pigpiod.service

ここで一度再起動を行い、自動で起動するか確認をしておくのがオススメ。

手動で起動する場合は、都度以下のコマンドを叩けばよい。

sudo systemctl start pigpiod

pigpioのステータスを確認。
「Active: active (running)」となっていれば、正常に起動している。

sudo systemctl status pigpiod

赤外線データの学習

赤外線データを学習するためのプログラムをダウンロードする。

wget https://abyz.me.uk/rpi/pigpio/code/irrp_py.zip

ダウンロードが終わったら、解凍する。

unzip ./irrp_py.zip

irrp.pyを実行。

パラメータで以下を指定。

  • レコードモード(-r)
  • 赤外線受信モジュールを接続したGPIOピン番号(-g)
  • 任意のファイル名(-f)
  • ボタン名(スペース区切りで複数指定することができる)

丸パンダ研究所では、インターホンを検知してSwitchBot に赤外線を発信してもらうため、ファイル名をswitchbot、ボタン名をinterphoneとした。

python3 irrp.py -r -g17 -f switchbot interphone

「Press key for ‘interphone’」、「Press key for ‘interphone’ to confirm」と表示されたら、それぞれのタイミングでリモコンの赤外線を発信する。
確認のため、同じ信号を2回受信させる必要がある。

Press key for 'interphone'
Okay
Press key for 'interphone' to confirm
Okay

赤外線受信時の処理

赤外線受信時の処理を行うプログラムを作成する。
説明は要点のみにとどめるため、プログラム全体は後述のソースコードを参照。

プログラム冒頭の「IR_RX_PIN」に、赤外線受信モジュールを接続したGPIOピン番号を設定する。

IR_RX_PIN  = 17

赤外線受信の処理を実装するために、「赤外線データの学習の項」で使用したirrp.pyから以下の関数を流用。

  • normalise(信号の標準化)
  • compare(赤外線の判定)
  • end_of_code(受信の終了処理)
  • cbf(赤外線受信のメイン処理)

下記のコードブロックの一行目で、学習済みの赤外線データのファイル名を指定する。
後半のif文ではボタンごとの挙動を指定する。key_nameには学習済みのボタン名が入る。
ここでは、あらかじめ用意しておいた音声ファイルを再生させている。

with open('/[homeディレクトリ]/switchbot') as f:
    key_config = json.load(f)

    pi.set_mode(IR_RX_PIN, pigpio.INPUT) # IR RX connected to this IR_RX_PIN.

    pi.set_glitch_filter(IR_RX_PIN, GLITCH) # Ignore glitches.

    cb = pi.callback(IR_RX_PIN, pigpio.EITHER_EDGE, cbf)

    try:  
        while True:
            code = []
            fetching_code = True
            while fetching_code:
                time.sleep(0.1)
            time.sleep(0.5)
            key_name = "-"
            for key, val in key_config.items():
                if compare(val, code[:]):
                    key_name = key
            if key_name == "interphone":
                subprocess.call("aplay /[homeディレクトリ]/wav/interphone.wav", shell=True)

    except KeyboardInterrupt:
        pass
    finally:
        pi.stop() # Disconnect from Pi.

ボタンが複数ある場合は、以下のようにelifを追加して処理を分ける。

if key_name == "interphone":
    subprocess.call("aplay /[homeディレクトリ]/wav/interphone.wav", shell=True)
elif key_name == "hoge":
    hogeボタンが押下されたときの処理
#!/usr/bin/env python

import time
import os
import json
import subprocess

import RPi.GPIO as GPIO
import pigpio

IR_RX_PIN  = 17
GLITCH     = 100
PRE_MS     = 200
POST_MS    = 15
FREQ       = 38.0
SHORT      = 10
TOLERANCE  = 15

POST_US    = POST_MS * 1000
PRE_US     = PRE_MS  * 1000
TOLER_MIN =  (100 - TOLERANCE) / 100.0
TOLER_MAX =  (100 + TOLERANCE) / 100.0

last_tick = 0
in_code = False
code = []
fetching_code = False

def normalise(c):
    entries = len(c)
    p = [0]*entries # Set all entries not processed.
    for i in range(entries):
        if not p[i]: # Not processed?
            v = c[i]
            tot = v
            similar = 1.0

            # Find all pulses with similar lengths to the start pulse.
            for j in range(i+2, entries, 2):
                if not p[j]: # Unprocessed.
                    if (c[j]*TOLER_MIN) < v < (c[j]*TOLER_MAX): # Similar.
                        tot = tot + c[j]
                        similar += 1.0

            # Calculate the average pulse length.
            newv = round(tot / similar, 2)
            c[i] = newv

            # Set all similar pulses to the average value.
            for j in range(i+2, entries, 2):
                if not p[j]: # Unprocessed.
                    if (c[j]*TOLER_MIN) < v < (c[j]*TOLER_MAX): # Similar.
                        c[j] = newv
                        p[j] = 1

def compare(p1, p2):
    if len(p1) != len(p2):
        return False

    for i in range(len(p1)):
        v = p1[i] / p2[i]
        if (v < TOLER_MIN) or (v > TOLER_MAX):
            return False

    for i in range(len(p1)):
        p1[i] = int(round((p1[i]+p2[i])/2.0))

    return True


def end_of_code():
    global code, fetching_code
    if len(code) > SHORT:
        normalise(code)
        fetching_code = False
    else:
        code = []
        print("Short code, probably a repeat, try again")

def cbf(gpio, level, tick):
    global last_tick, in_code, code, fetching_code

    if level != pigpio.TIMEOUT:

        edge = pigpio.tickDiff(last_tick, tick)
        last_tick = tick

        if fetching_code:

            if (edge > PRE_US) and (not in_code): # Start of a code.
                in_code = True
                pi.set_watchdog(IR_RX_PIN, POST_MS) # Start watchdog.

            elif (edge > POST_US) and in_code: # End of a code.
                in_code = False
                pi.set_watchdog(IR_RX_PIN, 0) # Cancel watchdog.
                end_of_code()

            elif in_code:
                code.append(edge)

    else:
        pi.set_watchdog(IR_RX_PIN, 0) # Cancel watchdog.
        if in_code:
            in_code = False
            end_of_code()

pi = pigpio.pi() # Connect to Pi.

if not pi.connected:
    exit(0)

with open('/[homeディレクトリ]/switchbot') as f:
    key_config = json.load(f)

    pi.set_mode(IR_RX_PIN, pigpio.INPUT) # IR RX connected to this IR_RX_PIN.

    pi.set_glitch_filter(IR_RX_PIN, GLITCH) # Ignore glitches.

    cb = pi.callback(IR_RX_PIN, pigpio.EITHER_EDGE, cbf)

    try:  
        while True:
            code = []
            fetching_code = True
            while fetching_code:
                time.sleep(0.1)
            time.sleep(0.5)
            key_name = "-"
            for key, val in key_config.items():
                if compare(val, code[:]):
                    key_name = key
            if key_name == "interphone":
                subprocess.call("aplay /[homeディレクトリ]/wav/interphone.wav", shell=True)

    except KeyboardInterrupt:
        pass
    finally:
        pi.stop() # Disconnect from Pi.

起動時の設定

赤外線受信のプログラムを常にバックグラウンドで動かしておく必要があるので、起動時に自動で実行するようにする。

方法はいくつかあるが、rc.localを使って自動実行するようにする。

rc.localを開く。
ここではvimを使用しているが、好きなエディタで構わない。

sudo vim /etc/rc.local

一番下に書かれている「exit 0」の上に赤外線受信のプログラムを実行するコマンドを書き込む。

python /[homeディレクトリ]/py_code/rapiro_irrp.py  > /dev/null 2>&1 &

exit 0