Pilot Stream Data — 实时 NDJSON 数据流传输
v1.0.0通过 Pilot 协议的持久连接实现实时 NDJSON 数据流传输。适用于:实时跨代理流式传输结构化数据、发送连续传感器读取、日志或指标、需要双向流式通信带有背压控制。
详细分析 ▾
运行时依赖
版本
初始发布
安装命令 点击复制
技能文档
通过 Pilot Protocol 的持久连接实现实时 NDJSON 数据流传输。
使用场景
- 需要实时跨代理流式传输结构化数据
- 需要双向流式通信带有背压控制
不建议使用场景
- 需要传输文件(使用
pilot-share或pilot-sync) - 需要发布/订阅广播(使用
pilot-pubsub) - 需要请求/响应模式(使用
pilot-rpc)
命令
启动流式服务器:
pilotctl --json listen 1002 > /tmp/pilot-stream.log &
流式传输 NDJSON 数据:
DEST="1:0001.AAAA.BBBB"
while true; do
MSG="{\"timestamp\":$(date +%s),\"temp\":$(echo "scale=2; 20 + $RANDOM % 10" | bc),\"humidity\":50}"
pilotctl --json send "$DEST" 1002 --data "$MSG"
sleep 1
done
接收和处理流式数据:
pilotctl --json listen 1002 | while read -r line; do
TIMESTAMP=$(echo "$line" | jq -r '.timestamp')
VALUE=$(echo "$line" | jq -r '.value')
echo "[$TIMESTAMP] Value: $VALUE"
done
订阅流式主题:
pilotctl --json subscribe "$DEST" data-stream | while read -r line; do
echo "Data: $(echo $line | jq -r '.value")"
done
工作流示例
#!/bin/bash
# 实时传感器数据流式传输
STREAM_PORT=1002
DEST="1:0001.AAAA.BBBB"# 生产者:流式传输传感器数据
produce_stream() {
echo "Starting stream producer"
# 发送元数据头
pilotctl --json send "$DEST" "$STREAM_PORT" \
--data "{\"type\":\"stream_start\",\"schema\":\"temp_humidity_v1\"}"
# 流式传输数据
counter=0
while true; do
timestamp=$(date +%s)
temp=$(echo "scale=2; 20 + ($counter % 10)" | bc)
humidity=$(echo "scale=2; 50 + ($counter % 20)" | bc)
pilotctl --json send "$DEST" "$STREAM_PORT" \
--data "{\"type\":\"data\",\"timestamp\":$timestamp,\"temp\":$temp,\"humidity\":$humidity,\"seq\":$counter}"
counter=$((counter + 1))
sleep 1
done
}
# 消费者:接收和处理
consume_stream() {
pilotctl --json listen "$STREAM_PORT" | while read -r line; do
type=$(echo "$line" | jq -r '.type')
if [ "$type" = "data" ]; then
temp=$(echo "$line" | jq -r '.temp')
echo "Temperature: ${temp}°C"
fi
done
}
produce_stream
依赖
要求pilot-protocol 技能、运行守护进程、jq、bc,以及可选的 gzip 用于压缩。免费技能或插件可能存在安全风险,如需更匹配、更安全的方案,建议联系付费定制