Python实现PROFINET数据采集方案
- Python
- 2小时前
- 3热度
- 0评论
PROFINET数据采集
下面给你一套可直接运行的 Python 采集 PROFINET(你写的 profine 应该是笔误)数据的方案,包含:环境准备、依赖安装、设备扫描、循环读 I/O、简易存储。
一、背景与方案选择
PROFINET 是工业以太网协议,分 RT(实时)/IRT(等时实时),Python 采集一般有三条路:
profinet-py(推荐,开源)
支持 DCP 扫描、循环 I/O、报警、诊断
底层用 raw socket(Linux)/Npcap(Windows)
需要 root / 管理员权限
商业 SDK + Python 绑定
- 如 Enatalk、Softing,稳定但收费
抓包解析(pyshark/scapy)
- 适合监听,不适合主动读写
下面用 profinet-py 做完整示例。
二、环境准备
1. 系统与权限
Linux(推荐 Ubuntu 22.04)或 Windows 10+
必须 root / 管理员
网卡直连 PROFINET 设备,或接同交换机
2. 安装依赖
# Python 3.10+
sudo apt install python3 python3-pip libpcap-dev
pip install "profinet-py>=0.3.0" construct>=2.10三、扫描网络中的 PROFINET 设备(DCP)
from profinet_py import scan
# 扫描 5 秒,返回所有在线 PN 设备
devices = scan(timeout=5)
for dev in devices:
print("-" * 40)
print(f"MAC: {dev.mac}")
print(f"IP: {dev.ip}")
print(f"Name: {dev.name}")
print(f"Vendor ID: {dev.vendor_id}")输出示例:
MAC: 00:11:22:33:44:55
IP: 192.168.0.100
Name: PN-IO-Device-01
Vendor ID: 0x002a四、循环采集 I/O 数据(RT Class 1)
假设从站:
IP:
192\.168\.0\.100输入区:10 字节(Slot=1, Subslot=1)
输出区:4 字节(Slot=1, Subslot=2)
import time
from profinet_py import ProfinetDevice
# 1. 连接设备
dev = ProfinetDevice("192.168.0.100")
dev.connect()
print("设备已连接")
# 2. 配置循环 I/O(根据你的 GSDML 调整)
dev.configure_io(
input_length=10, # 输入字节数
output_length=4 # 输出字节数
)
# 3. 循环采集 + 存 CSV
import csv
with open("profinet_data.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "input_bytes", "output_bytes"])
while True:
# 读输入
input_data = dev.read_input() # bytes
# 读输出(从站回发的输出状态)
output_data = dev.read_output()
ts = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{ts}] IN: {input_data.hex()} | OUT: {output_data.hex()}")
writer.writerow([ts, input_data.hex(), output_data.hex()])
time.sleep(0.1) # 100ms 轮询,可改 50ms五、关键配置说明
Slot/Subslot/Index
从站 I/O 地址在 GSDML 文件 里定义
用 TIA Portal 或 GSDML Viewer 查看映射
数据解析 示例:前 2 字节是温度(INT16),后 4 字节是压力(FLOAT32)
import struct temp, = struct.unpack(">h", input_data[0:2]) # 大端 INT16 press, = struct.unpack(">f", input_data[2:6]) # 大端 FLOAT32 print(f"温度: {temp} ℃ | 压力: {press:.2f} bar")断线重连
try: input_data = dev.read_input() except Exception as e: print("连接断开,重连中...", e) dev.disconnect() time.sleep(2) dev.connect()
六、Windows 特别说明
安装 Npcap(https://nmap.org/npcap/)
命令行用 管理员身份 运行
扫描时指定网卡:
devices = scan(interface="Ethernet", timeout=5)
七、常见问题
扫描不到设备:检查网线、IP 同网段、防火墙关闭
权限错误:Linux 加 sudo,Windows 管理员
数据乱码:核对 GSDML 的数据类型与字节序(通常大端)