Raspberry Pi Pico W で WiFi UDP通信するサンプルプログラムを作成しました。
PC側のPythonプログラムとUDPで送受信します。
UDPはパケットが届かない場合があったり、一回で送れるパケットサイズに制限があったりしますが、TCPのように事前にコネクションを張る必要がなく、データを送り付けるイメージで、取り扱いが楽です。シリアル通信の代替としても使えるので、私はマイコンとの通信によく利用しています。
■Raspberry Pi Pico W側
EarlePhilhower版 Arduino環境です。
バイナリデータを受信して、そのまま送信(返信)するサンプルプログラムです。
pico_w_udp.ino
// Pico WでWIFI(UDP)
// ※バイナリデータを送受信
#include <WiFi.h>
#include <WiFiUdp.h>
const char *SSID = "xxxxxxxxxx"; // WiFi環境に合わせて書き換える
const char *SSID_PASS = "xxxxxxxxx"; // 同上
// IPを指定する場合
const IPAddress LOCAL_IP(192, 168, 0, 2);
const IPAddress SUBNET(255, 255, 255, 0);
const IPAddress GATEWAY(192, 168, 0, 1); // 不要な場合はLOCAL_IPと同じにする
const IPAddress DNS(192, 168, 0, 1); // 同上
const unsigned int LOCAL_PORT = 56789; // 受信ポート
const IPAddress DEST_IP(192, 168, 0, 3); // 送信先IP
const unsigned int DEST_PORT = 56789; // 送信先ポート
#define BUF_SIZE 1400 // 1400バイト以上は分割して送信すること
WiFiUDP udp;
void setup() {
Serial.begin(115200);
while (!Serial){};
pinMode(LED_BUILTIN, OUTPUT);
// IPを指定する (DHCPで割り当てられる場合は不要)
WiFi.config(LOCAL_IP, DNS, GATEWAY, SUBNET);
WiFi.begin(SSID, SSID_PASS);
while (WiFi.status() != WL_CONNECTED) {
Serial.print('.');
delay(500);
}
Serial.print("Connected! local IP: ");
Serial.println(WiFi.localIP());
udp.begin(LOCAL_PORT);
}
uint16_t count = 0;
void loop() {
// UDP受信
int packetSize = udp.parsePacket();
if (packetSize > 0) {
//Serial.printf("Received packet size:%d from:%s:%d to %s:%d\n",
// packetSize,
// udp.remoteIP().toString().c_str(), udp.remotePort(),
// udp.destinationIP().toString().c_str(), udp.localPort());
uint8_t recvBuffer[BUF_SIZE];
int recvSize = udp.read(recvBuffer, BUF_SIZE);
for (int i = 0; i < recvSize; i++) {
Serial.print(recvBuffer[i]);
Serial.print(',');
}
Serial.println();
// UDP送信用データ準備
char sendBuffer[BUF_SIZE];
for (int i = 0; i < recvSize; i++) {
sendBuffer[i] = recvBuffer[i];
}
// UDP送信
udp.beginPacket(DEST_IP, DEST_PORT);
udp.write(sendBuffer, recvSize);
udp.endPacket();
count++;
}
digitalWrite(LED_BUILTIN, count % 2);
}
■PC側(Python)
バイナリデータをUDP送信し、UDP受信したデータを表示するサンプルです。
udp_send_recv.py
# UDPブロードキャスト 送信・受信
import socket
import threading
import time
DEST_IP = "192.168.0.78"
PORT = 56789
# ===========================================
# UDP送信クラス
class RoboConnSend:
# dest_ip : 送信先IP
# dest_port : 送信先PORT
def __init__(self, dest_ip, dest_port):
self.dest_ip = dest_ip
self.dest_port = dest_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# ar_data : bytearray型
# 1400バイト以上になる場合は分割すること
def send(self, ar_data):
send_data = bytes(ar_data)
self.sock.sendto(send_data, (self.dest_ip, self.dest_port))
def close(self):
self.sock.close()
# ===========================================
# UDP受信クラス (別スレッドで動作)
class RoboConnRecv(threading.Thread):
# port : UDP通信のPORT 送受信で合わせる
# callback : 受信時に呼ばれる関数 callback(bytearray)
def __init__(self, port, callback):
super(RoboConnRecv, self).__init__()
self.port = port
self.callback = callback
self.stop_flag = False # スレッド停止フラグ
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', self.port)) # すべて受信
self.sock.settimeout(0.1)
self.start() # スレッド開始
# 1パケットを受信してdata部を返す(bytearray型)
# ※受信するまでブロック
def recv(self):
ar_data = bytearray()
# 有効なデータが届くまで待ち
while True:
if self.stop_flag:
return None
try:
data, address = self.sock.recvfrom(1500)
except:
# タイムアウト
continue
ar_data = bytearray(data)
return ar_data
def close(self):
self.stop()
self.join()
self.sock.close()
# ------------------------
# 受信スレッド
def run(self):
print("受信スレッド開始 " + self.name)
while True:
if self.stop_flag:
print("受信スレッド停止")
break
recv_data = self.recv()
if recv_data is None:
print("受信スレッド停止")
break
self.callback(recv_data)
# ------------------------
# スレッド停止
def stop(self):
self.stop_flag = True
# ===========================================
# 受信時のcallback関数
def recv(recv_data):
s = "recv: "
for d in recv_data:
s += str(d) + ", "
print(s)
#==============================================================================
# UDP受信メイン
#==============================================================================
conn_recv = RoboConnRecv(PORT, recv)
conn_send = RoboConnSend(DEST_IP, PORT)
no = 0
while True:
try:
conn_send.send([no, 0, 1, 2])
no += 1
if no > 255: no = 0
time.sleep(1)
except KeyboardInterrupt: # KeyboardInterruptはExceptionで補足されないので明示する
print("*** KeyboardInterrupt ***")
break
except:
break
conn_recv.close()
conn_send.close()
■Raspberry Pi Picoの関連記事
【Raspberry Pi Pico W】WiFi UDP通信 サンプルプログラム
Raspberry Pi Pico+Arduinoでサーボをたくさん動かしたい
会話ができる「ぴよロボ」作りました! (Raspberry Pi + Pico + ChatGPT)
Raspberry Pi Pico W でPCとBluetooth(シリアル)接続する
Raspberry Pi Pico/Pico WをArduino開発環境で使うためのメモ
超音波距離センサー + Raspberry Pi Picoで潜水艦ソナー風
コップの水がこぼれない台 MPU6050 + Raspberry Pi Pico(Arduino)
MPU6050 + Raspberry Pi Pico(Arduino) -> PCで3Dのキューブを回転表示