【Raspberry Pi Pico W】WiFi UDP通信 サンプルプログラム

Raspberry Pi Pico W で WiFi UDP通信するサンプルプログラムを作成しました。
PC側のPythonプログラムとUDPで送受信します。

UDPはパケットが届かない場合があったり、一回で送れるパケットサイズに制限があったりしますが、TCPのように事前にコネクションを張る必要がなく、データを送り付けるイメージで、取り扱いが楽です。シリアル通信の代替としても使えるので、私はマイコンとの通信によく利用しています。

■Raspberry Pi Pico W側
EarlePhilhower版 Arduino環境です。
バイナリデータを受信して、そのまま送信(返信)するサンプルプログラムです。

pico_w_udp.ino


// Pico WでWIFI(UDP)
// ※バイナリデータを送受信

#include <WiFi.h>
#include <WiFiUdp.h>

const char *SSID = "xxxxxxxxxx";     // WiFi環境に合わせて書き換える
const char *SSID_PASS = "xxxxxxxxx"; // 同上

// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 2);
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1);    // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1);        // 同上

const unsigned int LOCAL_PORT = 56789;     // 受信ポート

const IPAddress DEST_IP(192, 168, 0, 3);  // 送信先IP
const unsigned int DEST_PORT = 56789;      // 送信先ポート

#define BUF_SIZE 1400   // 1400バイト以上は分割して送信すること

WiFiUDP udp;

void setup() {
  Serial.begin(115200);
  while (!Serial){};
  pinMode(LED_BUILTIN, OUTPUT);

  // IPを指定する (DHCPで割り当てられる場合は不要)
  WiFi.config(LOCAL_IP, DNS, GATEWAY, SUBNET);

  WiFi.begin(SSID, SSID_PASS);
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print('.');
    delay(500);
  }
  Serial.print("Connected! local IP: ");
  Serial.println(WiFi.localIP());

  udp.begin(LOCAL_PORT);
}

uint16_t count = 0;

void loop() {
  // UDP受信
  int packetSize = udp.parsePacket();
  if (packetSize > 0) {
    //Serial.printf("Received packet size:%d from:%s:%d to %s:%d\n", 
    //  packetSize, 
    //  udp.remoteIP().toString().c_str(), udp.remotePort(), 
    //  udp.destinationIP().toString().c_str(), udp.localPort());

    uint8_t recvBuffer[BUF_SIZE];
    int recvSize = udp.read(recvBuffer, BUF_SIZE);
    for (int i = 0; i < recvSize; i++) {
      Serial.print(recvBuffer[i]);
      Serial.print(',');
    }
    Serial.println();

    // UDP送信用データ準備
    char sendBuffer[BUF_SIZE];
    for (int i = 0; i < recvSize; i++) {
      sendBuffer[i] = recvBuffer[i];
    }

    // UDP送信
    udp.beginPacket(DEST_IP, DEST_PORT);
    udp.write(sendBuffer, recvSize);
    udp.endPacket();

    count++;
  }

  digitalWrite(LED_BUILTIN, count % 2);
}

■PC側(Python)
バイナリデータをUDP送信し、UDP受信したデータを表示するサンプルです。

udp_send_recv.py


# UDPブロードキャスト 送信・受信

import socket
import threading
import time

DEST_IP = "192.168.0.78"
PORT = 56789

# ===========================================
# UDP送信クラス
class RoboConnSend:
    # dest_ip   : 送信先IP
    # dest_port : 送信先PORT
    def __init__(self, dest_ip, dest_port):
        self.dest_ip = dest_ip
        self.dest_port = dest_port

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # ar_data : bytearray型
    # 1400バイト以上になる場合は分割すること
    def send(self, ar_data):
        send_data = bytes(ar_data)
        self.sock.sendto(send_data, (self.dest_ip, self.dest_port))

    def close(self):
        self.sock.close()

# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
    # port : UDP通信のPORT 送受信で合わせる
    # callback : 受信時に呼ばれる関数 callback(bytearray)
    def __init__(self, port, callback):
        super(RoboConnRecv, self).__init__()

        self.port = port
        self.callback = callback

        self.stop_flag = False  # スレッド停止フラグ

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(('', self.port))  # すべて受信 
        self.sock.settimeout(0.1)

        self.start()    # スレッド開始

    # 1パケットを受信してdata部を返す(bytearray型)
    # ※受信するまでブロック
    def recv(self):
        ar_data = bytearray()

        # 有効なデータが届くまで待ち
        while True:
            if self.stop_flag:
                return None

            try:
                data, address = self.sock.recvfrom(1500)
            except:
                # タイムアウト
                continue

            ar_data = bytearray(data)

            return ar_data

    def close(self):
        self.stop()
        self.join()
        self.sock.close()

    # ------------------------
    # 受信スレッド
    def run(self):
        print("受信スレッド開始 " + self.name)

        while True:
            if self.stop_flag:
                print("受信スレッド停止")
                break
            recv_data = self.recv()
            if recv_data is None:
                print("受信スレッド停止")
                break
            self.callback(recv_data)

    # ------------------------
    # スレッド停止
    def stop(self):
        self.stop_flag = True
# ===========================================

# 受信時のcallback関数
def recv(recv_data):
    s = "recv: "
    for d in recv_data:
        s += str(d) + ", "
    print(s)
#==============================================================================
# UDP受信メイン
#==============================================================================
conn_recv = RoboConnRecv(PORT, recv)
conn_send = RoboConnSend(DEST_IP, PORT)

no = 0
while True:
    try:
        conn_send.send([no, 0, 1, 2])
        no += 1
        if no > 255: no = 0
        time.sleep(1)
    except KeyboardInterrupt:  # KeyboardInterruptはExceptionで補足されないので明示する
        print("*** KeyboardInterrupt ***")
        break
    except:
        break

conn_recv.close()
conn_send.close()

■Raspberry Pi Picoの関連記事
【Raspberry Pi Pico W】WiFi UDP通信 サンプルプログラム
Raspberry Pi Pico+Arduinoでサーボをたくさん動かしたい
会話ができる「ぴよロボ」作りました! (Raspberry Pi + Pico + ChatGPT)
Raspberry Pi Pico W でPCとBluetooth(シリアル)接続する
Raspberry Pi Pico/Pico WをArduino開発環境で使うためのメモ
超音波距離センサー + Raspberry Pi Picoで潜水艦ソナー風
コップの水がこぼれない台 MPU6050 + Raspberry Pi Pico(Arduino)
MPU6050 + Raspberry Pi Pico(Arduino) -> PCで3Dのキューブを回転表示

本格派対局将棋 ぴよ将棋
本格派対局将棋アプリ ぴよ将棋
[Android] [iOS]

かわいい「ひよこ」と対局する将棋アプリ。かわいいけどAIは本格派!
対局後の検討機能や棋譜管理機能も充実!棋譜解析機能も搭載!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です