【ESP32-WROVER CAM】顔追跡カメラを作った!

Freenove ESP32-WROVER CAMで顔追跡カメラを作りました。

動作の概要

①ESP32-WROVERのカメラで撮った画像をWiFi(UDP)でPCに送信
②PC上で顔検出(OpenCV/YuNet)
③顔が中央に来るようにサーボの位置を計算し、ESP32-WROVERに送信
④ESP32-WROVERで上下-左右(2軸)のサーボを動かす

構成

サーボのマウントはサーボ2台を直交させて接続します。お手軽にラズパイカメラ用2軸マウント(A838)を使用しました。
▼から購入できます。

サーボはSG90(互換品)▼を使用しました。

Freenove ESP32-WROVER CAMは▼で購入しました。技適はついてました。

ソース

■ESP32-WROVER側
“esp32″のボードマネージャをインストールしてください。(今回はv3.1.1で確認)
“ESP32Servo”ライブラリを使用していますので、インストールしてください。(今回はv3.0.6で確認)
WiFiのSSID/パスワードとIPアドレスは変更してください。

ESP32CameraTracking.ino


// カメラ 顔トラッキング
// ESP32 WROVER カメラ画像をUDPで送信する
// サーボ(2個)の角度を受信する

#include "esp_camera.h"
#include <WiFi.h>

// ESP32 WROVER カメラのPIN定義
#define PWDN_GPIO_NUM  -1
#define RESET_GPIO_NUM -1
#define XCLK_GPIO_NUM  21
#define SIOD_GPIO_NUM  26
#define SIOC_GPIO_NUM  27

#define Y9_GPIO_NUM    35
#define Y8_GPIO_NUM    34
#define Y7_GPIO_NUM    39
#define Y6_GPIO_NUM    36
#define Y5_GPIO_NUM    19
#define Y4_GPIO_NUM    18
#define Y3_GPIO_NUM    5
#define Y2_GPIO_NUM    4
#define VSYNC_GPIO_NUM 25
#define HREF_GPIO_NUM  23
#define PCLK_GPIO_NUM  22

// サーボ関連
#include <ESP32Servo.h>
#define SERVO_H_PIN 12
#define SERVO_V_PIN 14
Servo servo_h;
Servo servo_v;

// ネットワーク関連の定義
const char *ssid = "xxxxxxxx";         // 環境に合わせる
const char *password = "xxxxxxxx";     // 同上

// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 3);
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1);    // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1);        // 不要な場合はLOCAL_IPと同じにする

const unsigned int LOCAL_PORT = 24759;     // 受信ポート

const IPAddress DEST_IP(192, 168, 0, 2);  // 送信先IP(環境に合わせる)
const unsigned int DEST_PORT = 24759;      // 送信先ポート

#define UDP_BUF_SIZE 1400   // 1400バイト以上は分割して送信する

WiFiUDP udp;

void setup() {
  Serial.begin(115200);
  Serial.setDebugOutput(true);
  Serial.println();

  camera_config_t config;
  config.ledc_channel = LEDC_CHANNEL_0;
  config.ledc_timer = LEDC_TIMER_0;
  config.pin_d0 = Y2_GPIO_NUM;
  config.pin_d1 = Y3_GPIO_NUM;
  config.pin_d2 = Y4_GPIO_NUM;
  config.pin_d3 = Y5_GPIO_NUM;
  config.pin_d4 = Y6_GPIO_NUM;
  config.pin_d5 = Y7_GPIO_NUM;
  config.pin_d6 = Y8_GPIO_NUM;
  config.pin_d7 = Y9_GPIO_NUM;
  config.pin_xclk = XCLK_GPIO_NUM;
  config.pin_pclk = PCLK_GPIO_NUM;
  config.pin_vsync = VSYNC_GPIO_NUM;
  config.pin_href = HREF_GPIO_NUM;
  config.pin_sccb_sda = SIOD_GPIO_NUM;
  config.pin_sccb_scl = SIOC_GPIO_NUM;
  config.pin_pwdn = PWDN_GPIO_NUM;
  config.pin_reset = RESET_GPIO_NUM;
  config.xclk_freq_hz = 20000000;
  //config.frame_size = FRAMESIZE_VGA;
  //config.frame_size = FRAMESIZE_320X320;
  config.frame_size = FRAMESIZE_240X240;
  config.pixel_format = PIXFORMAT_JPEG;
  config.grab_mode = CAMERA_GRAB_WHEN_EMPTY;
  config.fb_location = CAMERA_FB_IN_PSRAM;
  config.jpeg_quality = 12;
  config.fb_count = 1;

  esp_err_t err = esp_camera_init(&config);
  if (err != ESP_OK) {
    Serial.printf("Camera init failed with error 0x%x", err);
    return;
  }

  sensor_t *s = esp_camera_sensor_get();
  // initial sensors are flipped vertically and colors are a bit saturated
  if (s->id.PID == OV3660_PID) {
    s->set_vflip(s, 1);        // flip it back
    s->set_brightness(s, 1);   // up the brightness just a bit
    s->set_saturation(s, -2);  // lower the saturation
  }

  // ネットワーク関連
  WiFi.config(LOCAL_IP, GATEWAY, SUBNET, DNS);  // IPを指定する場合 (DHCPで割り当てられる場合は不要)
  
  WiFi.begin(ssid, password);
  WiFi.setSleep(false);

  Serial.print("WiFi connecting");
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println(WiFi.localIP());

  udp.begin(LOCAL_PORT);

  // サーボ関連初期化
  servo_h.attach(SERVO_H_PIN);
  servo_v.attach(SERVO_V_PIN);
  servo_h.write(90);
  servo_v.write(90);

  delay(500);
}

unsigned char img_no = 0;

void loop() {
  // カメラキャプチャ
  camera_fb_t *fb = esp_camera_fb_get();
  if (!fb){
      Serial.println("Camera capture failed");
      return;
  }
  Serial.print("capture size:");
  Serial.println(fb->len);

  // UDP送信用バッファ
  uint8_t send_buf[UDP_BUF_SIZE];

  // 分割してUDP送信
  int send_num = fb->len / (UDP_BUF_SIZE - 3);  // 送信回数
  if (send_num * (UDP_BUF_SIZE - 3) < fb->len) send_num++;

  for (int no = 0; no < send_num; no++) {
    send_buf[0] = img_no;   // 画像番号
    send_buf[1] = no;       // パケット番号
    send_buf[2] = send_num; // 送信回数

    /*
    Serial.print(img_no);
    Serial.print(' ');
    Serial.print(no);
    Serial.print(' ');
    Serial.println(send_num);
    */

    int size = UDP_BUF_SIZE - 3;
    if (no == send_num - 1) size = fb->len - no * (UDP_BUF_SIZE - 3);
    memcpy(&send_buf[3], &fb->buf[no * (UDP_BUF_SIZE - 3)], size);

    // UDP送信
    udp.beginPacket(DEST_IP, DEST_PORT);
    udp.write(send_buf, size + 3);
    udp.endPacket();
  }
  esp_camera_fb_return(fb);

  img_no++;

  // UDP受信
  int packetSize = udp.parsePacket();
  if (packetSize > 0) {
    uint8_t recv_buf[UDP_BUF_SIZE];
    int recvSize = udp.read(recv_buf, UDP_BUF_SIZE);
    for (int i = 0; i < recvSize; i++) {
      Serial.print(recv_buf[i]);
      Serial.print(',');
    }
    Serial.println();
    if (recvSize == 2) {
      servo_h.write(recv_buf[0]);
      servo_v.write(recv_buf[1]);
      delay(100);
    }
  }
}

■PC側
顔検出はOpenCV/YuNetを使用しています。GPU不要でCPUのみで実用的に動作します。
OpenCV/YuNetを使用した顔検出と描画は▼を参考にさせていただきました。
OpenCVの新しい顔検出を試してみる

あらかじめ▼よりface_detection_yunet_2023mar.onnxをダウンロードしておいてください。
https://github.com/opencv/opencv_zoo/tree/main/models/face_detection_yunet

IPアドレスは変更してください。

camera_recv.py


#
# ・カメラデータ UDP受信
# ・顔検出 (OpenCV/YuNet)
# ・サーボ位置 UDP送信

import socket
import threading
import time
import cv2
import numpy as np

DEST_IP = "192.168.0.3"
PORT = 24759
UDP_BUF_SIZE = 1400

# ===========================================
# UDP送信クラス
class RoboConnSend:
    # dest_ip   : 送信先IP   ブロードキャスト->'255.255.255.255' 
    # dest_port : 送信先PORT
    def __init__(self, dest_ip, dest_port):
        self.dest_ip = dest_ip
        self.dest_port = dest_port

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # ar_data : bytearray型
    # 1400バイト以上になる場合は分割すること
    def send(self, ar_data):
        send_data = bytes(ar_data)
        self.sock.sendto(send_data, (self.dest_ip, self.dest_port))

    def close(self):
        self.sock.close()

# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
    # port : UDP通信のPORT 送受信で合わせる
    # callback : 受信時に呼ばれる関数 callback(bytearray)
    def __init__(self, port, callback):
        super(RoboConnRecv, self).__init__()

        self.port = port
        self.callback = callback

        self.stop_flag = False  # スレッド停止フラグ

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(('', self.port))  # すべて受信 
        self.sock.settimeout(0.1)

        self.start()    # スレッド開始

    # 1パケットを受信してdata部を返す(bytearray型)
    # ※受信するまでブロック
    def recv(self):
        ar_data = bytearray()

        # 有効なデータが届くまで待ち
        while True:
            if self.stop_flag:
                return None

            try:
                data, address = self.sock.recvfrom(UDP_BUF_SIZE)
            except:
                # タイムアウト
                continue

            ar_data = bytearray(data)

            return ar_data

    def close(self):
        self.stop()
        self.join()
        self.sock.close()

    # ------------------------
    # 受信スレッド
    def run(self):
        print("受信スレッド開始 " + self.name)

        while True:
            if self.stop_flag:
                print("受信スレッド停止")
                break
            recv_data = self.recv()
            if recv_data is None:
                print("受信スレッド停止")
                break
            self.callback(recv_data)

    # ------------------------
    # スレッド停止
    def stop(self):
        self.stop_flag = True
# ===========================================

img_no = -1     # 画像番号
recv_no = []    # 受信済パケットNo
img_bytes = bytearray()    # 受信バッファ

# 受信時のcallback関数
def recv(recv_data):
    global img_no
    global recv_no
    global img_bytes
    global face_detector
    global servo_h
    global servo_v

    recv_img_no = recv_data.pop(0)  # 画像番号
    no = recv_data.pop(0)           # パケット番号
    send_num = recv_data.pop(0)     # 送信回数
    print("recv_img_no:", recv_img_no, "no:", no, "send_num:", send_num, "size:", len(recv_data))

    if img_no != recv_img_no:
        # 画像番号が変わった
        img_no = recv_img_no
        recv_no = []
        img_bytes = bytearray((UDP_BUF_SIZE - 3) * send_num)    # 受信バッファ

    recv_no.append(no)
    for i in range(len(recv_data)):
        #print("i:", i)
        img_bytes[no * (UDP_BUF_SIZE - 3) + i] = recv_data[i]

    if len(recv_no) < send_num:
        return
    
    # すべて受信完了
    print("show")
    ndarray = np.frombuffer(img_bytes, np.uint8)
    img = cv2.imdecode(ndarray, cv2.IMREAD_COLOR)
    #print(img.shape)

    # 入力サイズを指定
    height, width, _ = img.shape
    face_detector.setInputSize((width, height))
    
    # 顔検出
    _, faces = face_detector.detect(img)
    if faces is not None:
        # 一番大きい顔を探す
        i = 0
        max_face_area = -1       # 最大面積
        max_face_idx = 0
        for face in faces:
            # バウンディングボックス
            box = list(map(int, face[:4]))
            face_area = box[2] * box[3]
            if max_face_area < face_area:
                max_face_area = face_area
                max_face_idx = i
            i += 1

        # 検出した顔のバウンディングボックスとランドマークを描画する
        face = faces[max_face_idx]
        # バウンディングボックス
        box = list(map(int, face[:4]))
        color = (0, 0, 255)
        thickness = 2
        cv2.rectangle(img, box, color, thickness, cv2.LINE_AA)

        # ランドマーク(右目、左目、鼻、右口角、左口角)
        landmarks = list(map(int, face[4:len(face)-1]))
        landmarks = np.array_split(landmarks, len(landmarks) / 2)
        for landmark in landmarks:
            radius = 5
            thickness = -1
            cv2.circle(img, landmark, radius, color, thickness, cv2.LINE_AA)
            
        # 信頼度
        confidence = face[-1]
        confidence = "{:.2f}".format(confidence)
        position = (box[0], box[1] - 10)
        font = cv2.FONT_HERSHEY_SIMPLEX
        scale = 0.5
        thickness = 2
        cv2.putText(img, confidence, position, font, scale, color, thickness, cv2.LINE_AA)

        # 検出した顔の中心座標
        x = box[0] + box[2] / 2
        y = box[1] + box[3] / 2
        if len(landmarks) >= 2:
            # 右目、左目の間
             x = (landmarks[0][0] + landmarks[1][0]) /2 
             y = (landmarks[0][1] + landmarks[1][1]) /2 

        if width / 3 < x and x < width * 2 / 3 and height / 3 < y and y < height * 2 / 3:
            # 中心付近なら動かさない
            pass
        else:
            # サーボ位置
            if x > width / 2:
                servo_h -= 1
            else:
                servo_h += 1

            if y > height / 2:
                servo_v += 1
            else:
                servo_v -= 1

            if servo_h < 0: servo_h = 0
            if servo_h > 180: servo_h = 180
            if servo_v < 0: servo_v = 0
            if servo_v > 180: servo_v = 180

            print(x, y, servo_h, servo_v)

            conn_send.send([servo_h, servo_v])

    cv2.imshow('ESP32 CAM' , img)
    #recv_no = []
    cv2.waitKey(1)
    
#==============================================================================
# UDP受信メイン
#==============================================================================
conn_recv = RoboConnRecv(PORT, recv)
conn_send = RoboConnSend(DEST_IP, PORT)

# FaceDetectorYNの生成
face_detector = cv2.FaceDetectorYN_create("face_detection_yunet_2023mar.onnx", "", [0, 0], 0.6, 0.3, 5000, cv2.dnn.DNN_BACKEND_DEFAULT, target_id=cv2.dnn.DNN_TARGET_CPU)

servo_h = 90
servo_v = 90

while True:
    try:
        time.sleep(0.1)
    except KeyboardInterrupt:  # KeyboardInterruptはExceptionで補足されないので明示する
        print("*** KeyboardInterrupt ***")
        break
    except:
        break

conn_recv.close()
conn_send.close()
cv2.destroyAllWindows()
本格派対局将棋 ぴよ将棋
本格派対局将棋アプリ ぴよ将棋
[Android] [iOS]

かわいい「ひよこ」と対局する将棋アプリ。かわいいけどAIは本格派!
対局後の検討機能や棋譜管理機能も充実!棋譜解析機能も搭載!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です