【Raspberry Pi Pico W】昔のPHS用カメラ”Treva”でWiFiカメラ


昔のPHS用カメラ「Treva(トレバ)」をRaspberry Pi Pico WにつないでWiFiカメラを作りました‼
▲の写真のようにバッテリーをつなげばケーブルレスで運用可能‼

Trevaは2000年11月8日発売だったようです。25年前‼
懐かしい方がいる一方、産まれていない方も多いかもですね💦
現在は入手はほぼ不可能でしょうね。
Trevaからの画像の受け取り方法は解析された方がいて、マイコンで使いやすかったため電子工作で良く使われていました。

▼の2009年頃に作ったロボットで使用していたTrevaを懐かしくて掘り起こしてみました。

f:id:studio_k:20100814222605j:image:w480

Trevaで撮った動画

Trevaの解像度は96x72pixelです💦
PCのPythonプログラムで受信して4倍に拡大して表示しています。
工作ルームの様子が公開されてしまったw

プログラムソース

もうTrevaを使う人もいないと思いますが、記録としてソースを載せておきます。
ArduinoからUDPでPC(Python)にバイナリデータ送信するサンプルにはなるかと。

■Raspberry Pi Pico W側
開発環境はArduino(EarlePhilhower版)を使用しています。
画像を1フレーム毎に指定IPへUDPで送信します。
1フレームのデータサイズは96x72x2=13.5KBなので、UDPのパケットサイズの上限(1472バイト)から一度には送れないため、12回に分割して送るようにします。

pico_wifi_camera.ino


// Pico WでWIFI(UDP) Trevaカメラ

#include <WiFi.h>
#include <WiFiUdp.h>

#define TREVA_INP_PIN 16  // TREVA OUT (黄)
#define TREVA_CLK_PIN 17  // TREVA CLK (緑)

const char *SSID = "xxxxxxxx";      // 環境に合わせる
const char *SSID_PASS = "xxxxxxx";  // 環境に合わせる

// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 2);  // 環境に合わせる
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1);    // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1);        // 不要な場合はLOCAL_IPと同じにする

const IPAddress DEST_IP(192, 168, 0, 3);   // 送信先IP
const unsigned int DEST_PORT = 24759;      // 送信先ポート

#define BUF_SIZE 1400   // 1400バイト以上は分割して送信すること

WiFiUDP udp;

void setup() {
  Serial.begin(115200);
  //while (!Serial){};

  pinMode(TREVA_INP_PIN, INPUT);
  pinMode(TREVA_CLK_PIN, OUTPUT);
  pinMode(LED_BUILTIN, OUTPUT);

  // IPを指定する (DHCPで割り当てられる場合は不要)
  WiFi.config(LOCAL_IP, DNS, GATEWAY, SUBNET);

  WiFi.begin(SSID, SSID_PASS);
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print('.');
    delay(500);
  }
  Serial.print("Connected! local IP: ");
  Serial.println(WiFi.localIP());
}

//───────────────────────────────────
//  TREVA取込処理
//───────────────────────────────────

// TREVA画像データ(YUV形式) 13,824 bytes
unsigned char YUV_data[96*72*2];

// 1bit読み出し処理
int treva_read_bit()
{
  int i, a;

  digitalWrite(TREVA_CLK_PIN, 0);
  //delayMicroseconds(1);
  a = digitalRead(TREVA_INP_PIN);
  digitalWrite(TREVA_CLK_PIN, 1);
  
  // WAIT (約500kHzになるように調整する)
  //delayMicroseconds(1);
  
  return a;
}

// TREVA読み出し処理
void treva_read()
{
  int i, j, k, d, rgb, p, Y, U, V, red, green, blue;
  unsigned char treva_data[96*2];   // 1行分のデータ(YUV形式)

  digitalWrite(LED_BUILTIN, 1);

  // 100bit連続して'1'が来るのを待つ
  i = 0;
  while (i<100){
    if( treva_read_bit() == 1) {
      i++;
    } else {
      i = 0;
    }
  }

  digitalWrite(LED_BUILTIN, 0);

  // 連続65bit分の'0'を検出
  i = 0;
  while (i<65){
    if( treva_read_bit() == 0) {
      i++;
    } else {
      i = 0;
    }
  }

  digitalWrite(LED_BUILTIN, 1);

  // 続く2byte分のデータは無視
  for( i=0; i<2*8; i++ ) {
    treva_read_bit();
  }

  digitalWrite(LED_BUILTIN, 0);

  //
  // ここから画像データ取得開始
  //
  
  rgb = 0;
  for(i=0; i<72; i++){
  
    // 1行分の画像データ取得
    for( j=0; j < 96*2; j++){
      d = 0;
      for (k=0; k<8; k++){
        d <<= 1;
        if(treva_read_bit()) {
          d |= 0x1;
        }
      }
      treva_data[j] = d;    
    }
    
    // 1行分のデータをコピー
    for(j=0; j<96*2; j++){
      YUV_data[rgb++] = treva_data[j];
    }
  }
  
  digitalWrite(LED_BUILTIN, 1);
}

unsigned char img_no = 0;

void loop() {
  treva_read();

  // UDP送信用バッファ
  char sendBuffer[BUF_SIZE];

  // 12回に分割して送信
  int SEND_SIZE = 1152;
  for (int no = 0; no < 12; no++) {
    sendBuffer[0] = img_no;
    sendBuffer[1] = no;
    memcpy(&sendBuffer[2], &YUV_data[no * SEND_SIZE], SEND_SIZE);

    // UDP送信
    udp.beginPacket(DEST_IP, DEST_PORT);
    udp.write(sendBuffer, SEND_SIZE + 2);
    udp.endPacket();
  }
  img_no++;
}

■PC側(Python)
他の処理と組み合わせやすいようにUDPの受信は別スレッドで行っています。

〇カメラデータ受信・表示 camera_recv.py


# Treva UDP受信

import socket
import threading
import time
import cv2
import numpy as np

PORT = 24759

# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
    # port : UDP通信のPORT 送受信で合わせる
    # callback : 受信時に呼ばれる関数 callback(bytearray)
    def __init__(self, port, callback):
        super(RoboConnRecv, self).__init__()

        self.port = port
        self.callback = callback

        self.stop_flag = False  # スレッド停止フラグ

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(('', self.port))  # すべて受信 
        self.sock.settimeout(0.1)

        self.start()    # スレッド開始

    # 1パケットを受信してdata部を返す(bytearray型)
    # ※受信するまでブロック
    # ※ヘッダーが来るまでのデータは無視, ID・チェックサムが合わない場合は破棄
    def recv(self):
        ar_data = bytearray()

        # 有効なデータが届くまで待ち
        while True:
            if self.stop_flag:
                return None

            try:
                data, address = self.sock.recvfrom(1500)
            except:
                # タイムアウト
                continue

            ar_data = bytearray(data)

            return ar_data

    def close(self):
        self.stop()
        self.join()
        self.sock.close()

    # ------------------------
    # 受信スレッド
    def run(self):
        print("受信スレッド開始 " + self.name)

        while True:
            if self.stop_flag:
                print("受信スレッド停止")
                break
            recv_data = self.recv()
            if recv_data is None:
                print("受信スレッド停止")
                break
            self.callback(recv_data)

    # ------------------------
    # スレッド停止
    def stop(self):
        self.stop_flag = True
# ===========================================

SEND_SIZE = 1152    # 1パケットのサイズ
WIDTH = 96
HEIGHT = 72

YUV_data = bytearray(WIDTH * HEIGHT * 2)    # 受信バッファ
img_no = -1     # 画像番号
recv_no = []    # 受信済パケットNo
img = np.zeros((HEIGHT, WIDTH, 3), np.uint8)

# 受信時のcallback関数
def recv(recv_data):
    global YUV_data
    global img_no
    global recv_no
    global img

    recv_img_no = recv_data.pop(0)   # 画像番号
    if img_no != recv_img_no:
        # 画像番号が変わった
        img_no = recv_img_no
        recv_no = []

    no = recv_data.pop(0)       # パケット番号
    print("no:", no)
    if no in recv_no:
        # すべて埋まってないのに重複して受信 -> 破棄
        recv_no = []
        return

    recv_no.append(no)
    for i in range(SEND_SIZE):
        YUV_data[no * SEND_SIZE + i] = recv_data[i]

    if len(recv_no) < 12:
        return
    
    # すべて受信完了
    print("show")
    for y in range(HEIGHT):
        for x in range(WIDTH):
            p = y * WIDTH * 2 + x * 2
            Y = YUV_data[p + 1]
            
            if x % 2 == 0:
                V =  YUV_data[p]
                U =  YUV_data[p+2]
            else:
                V =  YUV_data[p-2]
                U =  YUV_data[p]
            
            U = U - 128
            V = V - 128
            
            red = U + Y 
            green = int((98*Y - 53*U - 19*V) / 100)
            blue = V + Y
            
            if red > 255:   red = 255
            if red < 0:     red = 0
            if green > 255: green = 255
            if green < 0:   green = 0
            if blue > 255:  blue = 255
            if blue < 0:    blue = 0
            
            img[y, x] = [blue, green, red]

    img2 = cv2.resize(img, (WIDTH * 4, HEIGHT * 4)) # 4倍に拡大
    cv2.imshow('Treva' , img2)
    recv_no = []
    cv2.waitKey(1)
    
#==============================================================================
# UDP受信メイン
#==============================================================================
conn = RoboConnRecv(PORT, recv)

while True:
    try:
        time.sleep(0.1)
    except KeyboardInterrupt:  # KeyboardInterruptはExceptionで補足されないので明示する
        print("*** KeyboardInterrupt ***")
        break
    except:
        break

conn.close()
cv2.destroyAllWindows()
本格派対局将棋 ぴよ将棋
本格派対局将棋アプリ ぴよ将棋
[Android] [iOS]

かわいい「ひよこ」と対局する将棋アプリ。かわいいけどAIは本格派!
対局後の検討機能や棋譜管理機能も充実!棋譜解析機能も搭載!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です