昔のPHS用カメラ「Treva(トレバ)」をRaspberry Pi Pico WにつないでWiFiカメラを作りました‼
▲の写真のようにバッテリーをつなげばケーブルレスで運用可能‼
Trevaは2000年11月8日発売だったようです。25年前‼
懐かしい方がいる一方、産まれていない方も多いかもですね💦
現在は入手はほぼ不可能でしょうね。
Trevaからの画像の受け取り方法は解析された方がいて、マイコンで使いやすかったため電子工作で良く使われていました。
▼の2009年頃に作ったロボットで使用していたTrevaを懐かしくて掘り起こしてみました。
Trevaで撮った動画
Trevaの解像度は96x72pixelです💦
PCのPythonプログラムで受信して4倍に拡大して表示しています。
工作ルームの様子が公開されてしまったw
プログラムソース
もうTrevaを使う人もいないと思いますが、記録としてソースを載せておきます。
ArduinoからUDPでPC(Python)にバイナリデータ送信するサンプルにはなるかと。
■Raspberry Pi Pico W側
開発環境はArduino(EarlePhilhower版)を使用しています。
画像を1フレーム毎に指定IPへUDPで送信します。
1フレームのデータサイズは96x72x2=13.5KBなので、UDPのパケットサイズの上限(1472バイト)から一度には送れないため、12回に分割して送るようにします。
pico_wifi_camera.ino
// Pico WでWIFI(UDP) Trevaカメラ
#include <WiFi.h>
#include <WiFiUdp.h>
#define TREVA_INP_PIN 16 // TREVA OUT (黄)
#define TREVA_CLK_PIN 17 // TREVA CLK (緑)
const char *SSID = "xxxxxxxx"; // 環境に合わせる
const char *SSID_PASS = "xxxxxxx"; // 環境に合わせる
// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 2); // 環境に合わせる
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1); // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1); // 不要な場合はLOCAL_IPと同じにする
const IPAddress DEST_IP(192, 168, 0, 3); // 送信先IP
const unsigned int DEST_PORT = 24759; // 送信先ポート
#define BUF_SIZE 1400 // 1400バイト以上は分割して送信すること
WiFiUDP udp;
void setup() {
Serial.begin(115200);
//while (!Serial){};
pinMode(TREVA_INP_PIN, INPUT);
pinMode(TREVA_CLK_PIN, OUTPUT);
pinMode(LED_BUILTIN, OUTPUT);
// IPを指定する (DHCPで割り当てられる場合は不要)
WiFi.config(LOCAL_IP, DNS, GATEWAY, SUBNET);
WiFi.begin(SSID, SSID_PASS);
while (WiFi.status() != WL_CONNECTED) {
Serial.print('.');
delay(500);
}
Serial.print("Connected! local IP: ");
Serial.println(WiFi.localIP());
}
//───────────────────────────────────
// TREVA取込処理
//───────────────────────────────────
// TREVA画像データ(YUV形式) 13,824 bytes
unsigned char YUV_data[96*72*2];
// 1bit読み出し処理
int treva_read_bit()
{
int i, a;
digitalWrite(TREVA_CLK_PIN, 0);
//delayMicroseconds(1);
a = digitalRead(TREVA_INP_PIN);
digitalWrite(TREVA_CLK_PIN, 1);
// WAIT (約500kHzになるように調整する)
//delayMicroseconds(1);
return a;
}
// TREVA読み出し処理
void treva_read()
{
int i, j, k, d, rgb, p, Y, U, V, red, green, blue;
unsigned char treva_data[96*2]; // 1行分のデータ(YUV形式)
digitalWrite(LED_BUILTIN, 1);
// 100bit連続して'1'が来るのを待つ
i = 0;
while (i<100){
if( treva_read_bit() == 1) {
i++;
} else {
i = 0;
}
}
digitalWrite(LED_BUILTIN, 0);
// 連続65bit分の'0'を検出
i = 0;
while (i<65){
if( treva_read_bit() == 0) {
i++;
} else {
i = 0;
}
}
digitalWrite(LED_BUILTIN, 1);
// 続く2byte分のデータは無視
for( i=0; i<2*8; i++ ) {
treva_read_bit();
}
digitalWrite(LED_BUILTIN, 0);
//
// ここから画像データ取得開始
//
rgb = 0;
for(i=0; i<72; i++){
// 1行分の画像データ取得
for( j=0; j < 96*2; j++){
d = 0;
for (k=0; k<8; k++){
d <<= 1;
if(treva_read_bit()) {
d |= 0x1;
}
}
treva_data[j] = d;
}
// 1行分のデータをコピー
for(j=0; j<96*2; j++){
YUV_data[rgb++] = treva_data[j];
}
}
digitalWrite(LED_BUILTIN, 1);
}
unsigned char img_no = 0;
void loop() {
treva_read();
// UDP送信用バッファ
char sendBuffer[BUF_SIZE];
// 12回に分割して送信
int SEND_SIZE = 1152;
for (int no = 0; no < 12; no++) {
sendBuffer[0] = img_no;
sendBuffer[1] = no;
memcpy(&sendBuffer[2], &YUV_data[no * SEND_SIZE], SEND_SIZE);
// UDP送信
udp.beginPacket(DEST_IP, DEST_PORT);
udp.write(sendBuffer, SEND_SIZE + 2);
udp.endPacket();
}
img_no++;
}
■PC側(Python)
他の処理と組み合わせやすいようにUDPの受信は別スレッドで行っています。
〇カメラデータ受信・表示 camera_recv.py
# Treva UDP受信
import socket
import threading
import time
import cv2
import numpy as np
PORT = 24759
# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
# port : UDP通信のPORT 送受信で合わせる
# callback : 受信時に呼ばれる関数 callback(bytearray)
def __init__(self, port, callback):
super(RoboConnRecv, self).__init__()
self.port = port
self.callback = callback
self.stop_flag = False # スレッド停止フラグ
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', self.port)) # すべて受信
self.sock.settimeout(0.1)
self.start() # スレッド開始
# 1パケットを受信してdata部を返す(bytearray型)
# ※受信するまでブロック
# ※ヘッダーが来るまでのデータは無視, ID・チェックサムが合わない場合は破棄
def recv(self):
ar_data = bytearray()
# 有効なデータが届くまで待ち
while True:
if self.stop_flag:
return None
try:
data, address = self.sock.recvfrom(1500)
except:
# タイムアウト
continue
ar_data = bytearray(data)
return ar_data
def close(self):
self.stop()
self.join()
self.sock.close()
# ------------------------
# 受信スレッド
def run(self):
print("受信スレッド開始 " + self.name)
while True:
if self.stop_flag:
print("受信スレッド停止")
break
recv_data = self.recv()
if recv_data is None:
print("受信スレッド停止")
break
self.callback(recv_data)
# ------------------------
# スレッド停止
def stop(self):
self.stop_flag = True
# ===========================================
SEND_SIZE = 1152 # 1パケットのサイズ
WIDTH = 96
HEIGHT = 72
YUV_data = bytearray(WIDTH * HEIGHT * 2) # 受信バッファ
img_no = -1 # 画像番号
recv_no = [] # 受信済パケットNo
img = np.zeros((HEIGHT, WIDTH, 3), np.uint8)
# 受信時のcallback関数
def recv(recv_data):
global YUV_data
global img_no
global recv_no
global img
recv_img_no = recv_data.pop(0) # 画像番号
if img_no != recv_img_no:
# 画像番号が変わった
img_no = recv_img_no
recv_no = []
no = recv_data.pop(0) # パケット番号
print("no:", no)
if no in recv_no:
# すべて埋まってないのに重複して受信 -> 破棄
recv_no = []
return
recv_no.append(no)
for i in range(SEND_SIZE):
YUV_data[no * SEND_SIZE + i] = recv_data[i]
if len(recv_no) < 12:
return
# すべて受信完了
print("show")
for y in range(HEIGHT):
for x in range(WIDTH):
p = y * WIDTH * 2 + x * 2
Y = YUV_data[p + 1]
if x % 2 == 0:
V = YUV_data[p]
U = YUV_data[p+2]
else:
V = YUV_data[p-2]
U = YUV_data[p]
U = U - 128
V = V - 128
red = U + Y
green = int((98*Y - 53*U - 19*V) / 100)
blue = V + Y
if red > 255: red = 255
if red < 0: red = 0
if green > 255: green = 255
if green < 0: green = 0
if blue > 255: blue = 255
if blue < 0: blue = 0
img[y, x] = [blue, green, red]
img2 = cv2.resize(img, (WIDTH * 4, HEIGHT * 4)) # 4倍に拡大
cv2.imshow('Treva' , img2)
recv_no = []
cv2.waitKey(1)
#==============================================================================
# UDP受信メイン
#==============================================================================
conn = RoboConnRecv(PORT, recv)
while True:
try:
time.sleep(0.1)
except KeyboardInterrupt: # KeyboardInterruptはExceptionで補足されないので明示する
print("*** KeyboardInterrupt ***")
break
except:
break
conn.close()
cv2.destroyAllWindows()