Freenove ESP32-WROVER CAMで顔追跡カメラを作りました。
動作の概要
①ESP32-WROVERのカメラで撮った画像をWiFi(UDP)でPCに送信
②PC上で顔検出(OpenCV/YuNet)
③顔が中央に来るようにサーボの位置を計算し、ESP32-WROVERに送信
④ESP32-WROVERで上下-左右(2軸)のサーボを動かす
構成
サーボのマウントはサーボ2台を直交させて接続します。お手軽にラズパイカメラ用2軸マウント(A838)を使用しました。
▼から購入できます。
サーボはSG90(互換品)▼を使用しました。
Freenove ESP32-WROVER CAMは▼で購入しました。技適はついてました。
ソース
■ESP32-WROVER側
“esp32″のボードマネージャをインストールしてください。(今回はv3.1.1で確認)
“ESP32Servo”ライブラリを使用していますので、インストールしてください。(今回はv3.0.6で確認)
WiFiのSSID/パスワードとIPアドレスは変更してください。
ESP32CameraTracking.ino
// カメラ 顔トラッキング
// ESP32 WROVER カメラ画像をUDPで送信する
// サーボ(2個)の角度を受信する
#include "esp_camera.h"
#include <WiFi.h>
// ESP32 WROVER カメラのPIN定義
#define PWDN_GPIO_NUM -1
#define RESET_GPIO_NUM -1
#define XCLK_GPIO_NUM 21
#define SIOD_GPIO_NUM 26
#define SIOC_GPIO_NUM 27
#define Y9_GPIO_NUM 35
#define Y8_GPIO_NUM 34
#define Y7_GPIO_NUM 39
#define Y6_GPIO_NUM 36
#define Y5_GPIO_NUM 19
#define Y4_GPIO_NUM 18
#define Y3_GPIO_NUM 5
#define Y2_GPIO_NUM 4
#define VSYNC_GPIO_NUM 25
#define HREF_GPIO_NUM 23
#define PCLK_GPIO_NUM 22
// サーボ関連
#include <ESP32Servo.h>
#define SERVO_H_PIN 12
#define SERVO_V_PIN 14
Servo servo_h;
Servo servo_v;
// ネットワーク関連の定義
const char *ssid = "xxxxxxxx"; // 環境に合わせる
const char *password = "xxxxxxxx"; // 同上
// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 3);
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1); // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1); // 不要な場合はLOCAL_IPと同じにする
const unsigned int LOCAL_PORT = 24759; // 受信ポート
const IPAddress DEST_IP(192, 168, 0, 2); // 送信先IP(環境に合わせる)
const unsigned int DEST_PORT = 24759; // 送信先ポート
#define UDP_BUF_SIZE 1400 // 1400バイト以上は分割して送信する
WiFiUDP udp;
void setup() {
Serial.begin(115200);
Serial.setDebugOutput(true);
Serial.println();
camera_config_t config;
config.ledc_channel = LEDC_CHANNEL_0;
config.ledc_timer = LEDC_TIMER_0;
config.pin_d0 = Y2_GPIO_NUM;
config.pin_d1 = Y3_GPIO_NUM;
config.pin_d2 = Y4_GPIO_NUM;
config.pin_d3 = Y5_GPIO_NUM;
config.pin_d4 = Y6_GPIO_NUM;
config.pin_d5 = Y7_GPIO_NUM;
config.pin_d6 = Y8_GPIO_NUM;
config.pin_d7 = Y9_GPIO_NUM;
config.pin_xclk = XCLK_GPIO_NUM;
config.pin_pclk = PCLK_GPIO_NUM;
config.pin_vsync = VSYNC_GPIO_NUM;
config.pin_href = HREF_GPIO_NUM;
config.pin_sccb_sda = SIOD_GPIO_NUM;
config.pin_sccb_scl = SIOC_GPIO_NUM;
config.pin_pwdn = PWDN_GPIO_NUM;
config.pin_reset = RESET_GPIO_NUM;
config.xclk_freq_hz = 20000000;
//config.frame_size = FRAMESIZE_VGA;
//config.frame_size = FRAMESIZE_320X320;
config.frame_size = FRAMESIZE_240X240;
config.pixel_format = PIXFORMAT_JPEG;
config.grab_mode = CAMERA_GRAB_WHEN_EMPTY;
config.fb_location = CAMERA_FB_IN_PSRAM;
config.jpeg_quality = 12;
config.fb_count = 1;
esp_err_t err = esp_camera_init(&config);
if (err != ESP_OK) {
Serial.printf("Camera init failed with error 0x%x", err);
return;
}
sensor_t *s = esp_camera_sensor_get();
// initial sensors are flipped vertically and colors are a bit saturated
if (s->id.PID == OV3660_PID) {
s->set_vflip(s, 1); // flip it back
s->set_brightness(s, 1); // up the brightness just a bit
s->set_saturation(s, -2); // lower the saturation
}
// ネットワーク関連
WiFi.config(LOCAL_IP, GATEWAY, SUBNET, DNS); // IPを指定する場合 (DHCPで割り当てられる場合は不要)
WiFi.begin(ssid, password);
WiFi.setSleep(false);
Serial.print("WiFi connecting");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi connected");
Serial.println(WiFi.localIP());
udp.begin(LOCAL_PORT);
// サーボ関連初期化
servo_h.attach(SERVO_H_PIN);
servo_v.attach(SERVO_V_PIN);
servo_h.write(90);
servo_v.write(90);
delay(500);
}
unsigned char img_no = 0;
void loop() {
// カメラキャプチャ
camera_fb_t *fb = esp_camera_fb_get();
if (!fb){
Serial.println("Camera capture failed");
return;
}
Serial.print("capture size:");
Serial.println(fb->len);
// UDP送信用バッファ
uint8_t send_buf[UDP_BUF_SIZE];
// 分割してUDP送信
int send_num = fb->len / (UDP_BUF_SIZE - 3); // 送信回数
if (send_num * (UDP_BUF_SIZE - 3) < fb->len) send_num++;
for (int no = 0; no < send_num; no++) {
send_buf[0] = img_no; // 画像番号
send_buf[1] = no; // パケット番号
send_buf[2] = send_num; // 送信回数
/*
Serial.print(img_no);
Serial.print(' ');
Serial.print(no);
Serial.print(' ');
Serial.println(send_num);
*/
int size = UDP_BUF_SIZE - 3;
if (no == send_num - 1) size = fb->len - no * (UDP_BUF_SIZE - 3);
memcpy(&send_buf[3], &fb->buf[no * (UDP_BUF_SIZE - 3)], size);
// UDP送信
udp.beginPacket(DEST_IP, DEST_PORT);
udp.write(send_buf, size + 3);
udp.endPacket();
}
esp_camera_fb_return(fb);
img_no++;
// UDP受信
int packetSize = udp.parsePacket();
if (packetSize > 0) {
uint8_t recv_buf[UDP_BUF_SIZE];
int recvSize = udp.read(recv_buf, UDP_BUF_SIZE);
for (int i = 0; i < recvSize; i++) {
Serial.print(recv_buf[i]);
Serial.print(',');
}
Serial.println();
if (recvSize == 2) {
servo_h.write(recv_buf[0]);
servo_v.write(recv_buf[1]);
delay(100);
}
}
}
■PC側
顔検出はOpenCV/YuNetを使用しています。GPU不要でCPUのみで実用的に動作します。
OpenCV/YuNetを使用した顔検出と描画は▼を参考にさせていただきました。
OpenCVの新しい顔検出を試してみる
あらかじめ▼よりface_detection_yunet_2023mar.onnxをダウンロードしておいてください。
https://github.com/opencv/opencv_zoo/tree/main/models/face_detection_yunet
IPアドレスは変更してください。
camera_recv.py
#
# ・カメラデータ UDP受信
# ・顔検出 (OpenCV/YuNet)
# ・サーボ位置 UDP送信
import socket
import threading
import time
import cv2
import numpy as np
DEST_IP = "192.168.0.3"
PORT = 24759
UDP_BUF_SIZE = 1400
# ===========================================
# UDP送信クラス
class RoboConnSend:
# dest_ip : 送信先IP ブロードキャスト->'255.255.255.255'
# dest_port : 送信先PORT
def __init__(self, dest_ip, dest_port):
self.dest_ip = dest_ip
self.dest_port = dest_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# ar_data : bytearray型
# 1400バイト以上になる場合は分割すること
def send(self, ar_data):
send_data = bytes(ar_data)
self.sock.sendto(send_data, (self.dest_ip, self.dest_port))
def close(self):
self.sock.close()
# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
# port : UDP通信のPORT 送受信で合わせる
# callback : 受信時に呼ばれる関数 callback(bytearray)
def __init__(self, port, callback):
super(RoboConnRecv, self).__init__()
self.port = port
self.callback = callback
self.stop_flag = False # スレッド停止フラグ
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', self.port)) # すべて受信
self.sock.settimeout(0.1)
self.start() # スレッド開始
# 1パケットを受信してdata部を返す(bytearray型)
# ※受信するまでブロック
def recv(self):
ar_data = bytearray()
# 有効なデータが届くまで待ち
while True:
if self.stop_flag:
return None
try:
data, address = self.sock.recvfrom(UDP_BUF_SIZE)
except:
# タイムアウト
continue
ar_data = bytearray(data)
return ar_data
def close(self):
self.stop()
self.join()
self.sock.close()
# ------------------------
# 受信スレッド
def run(self):
print("受信スレッド開始 " + self.name)
while True:
if self.stop_flag:
print("受信スレッド停止")
break
recv_data = self.recv()
if recv_data is None:
print("受信スレッド停止")
break
self.callback(recv_data)
# ------------------------
# スレッド停止
def stop(self):
self.stop_flag = True
# ===========================================
img_no = -1 # 画像番号
recv_no = [] # 受信済パケットNo
img_bytes = bytearray() # 受信バッファ
# 受信時のcallback関数
def recv(recv_data):
global img_no
global recv_no
global img_bytes
global face_detector
global servo_h
global servo_v
recv_img_no = recv_data.pop(0) # 画像番号
no = recv_data.pop(0) # パケット番号
send_num = recv_data.pop(0) # 送信回数
print("recv_img_no:", recv_img_no, "no:", no, "send_num:", send_num, "size:", len(recv_data))
if img_no != recv_img_no:
# 画像番号が変わった
img_no = recv_img_no
recv_no = []
img_bytes = bytearray((UDP_BUF_SIZE - 3) * send_num) # 受信バッファ
recv_no.append(no)
for i in range(len(recv_data)):
#print("i:", i)
img_bytes[no * (UDP_BUF_SIZE - 3) + i] = recv_data[i]
if len(recv_no) < send_num:
return
# すべて受信完了
print("show")
ndarray = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(ndarray, cv2.IMREAD_COLOR)
#print(img.shape)
# 入力サイズを指定
height, width, _ = img.shape
face_detector.setInputSize((width, height))
# 顔検出
_, faces = face_detector.detect(img)
if faces is not None:
# 一番大きい顔を探す
i = 0
max_face_area = -1 # 最大面積
max_face_idx = 0
for face in faces:
# バウンディングボックス
box = list(map(int, face[:4]))
face_area = box[2] * box[3]
if max_face_area < face_area:
max_face_area = face_area
max_face_idx = i
i += 1
# 検出した顔のバウンディングボックスとランドマークを描画する
face = faces[max_face_idx]
# バウンディングボックス
box = list(map(int, face[:4]))
color = (0, 0, 255)
thickness = 2
cv2.rectangle(img, box, color, thickness, cv2.LINE_AA)
# ランドマーク(右目、左目、鼻、右口角、左口角)
landmarks = list(map(int, face[4:len(face)-1]))
landmarks = np.array_split(landmarks, len(landmarks) / 2)
for landmark in landmarks:
radius = 5
thickness = -1
cv2.circle(img, landmark, radius, color, thickness, cv2.LINE_AA)
# 信頼度
confidence = face[-1]
confidence = "{:.2f}".format(confidence)
position = (box[0], box[1] - 10)
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 0.5
thickness = 2
cv2.putText(img, confidence, position, font, scale, color, thickness, cv2.LINE_AA)
# 検出した顔の中心座標
x = box[0] + box[2] / 2
y = box[1] + box[3] / 2
if len(landmarks) >= 2:
# 右目、左目の間
x = (landmarks[0][0] + landmarks[1][0]) /2
y = (landmarks[0][1] + landmarks[1][1]) /2
if width / 3 < x and x < width * 2 / 3 and height / 3 < y and y < height * 2 / 3:
# 中心付近なら動かさない
pass
else:
# サーボ位置
if x > width / 2:
servo_h -= 1
else:
servo_h += 1
if y > height / 2:
servo_v += 1
else:
servo_v -= 1
if servo_h < 0: servo_h = 0
if servo_h > 180: servo_h = 180
if servo_v < 0: servo_v = 0
if servo_v > 180: servo_v = 180
print(x, y, servo_h, servo_v)
conn_send.send([servo_h, servo_v])
cv2.imshow('ESP32 CAM' , img)
#recv_no = []
cv2.waitKey(1)
#==============================================================================
# UDP受信メイン
#==============================================================================
conn_recv = RoboConnRecv(PORT, recv)
conn_send = RoboConnSend(DEST_IP, PORT)
# FaceDetectorYNの生成
face_detector = cv2.FaceDetectorYN_create("face_detection_yunet_2023mar.onnx", "", [0, 0], 0.6, 0.3, 5000, cv2.dnn.DNN_BACKEND_DEFAULT, target_id=cv2.dnn.DNN_TARGET_CPU)
servo_h = 90
servo_v = 90
while True:
try:
time.sleep(0.1)
except KeyboardInterrupt: # KeyboardInterruptはExceptionで補足されないので明示する
print("*** KeyboardInterrupt ***")
break
except:
break
conn_recv.close()
conn_send.close()
cv2.destroyAllWindows()