豆包:
import queue
import threading
import time
from collections import defaultdict
# 数据格式类定义
class DataPoint:
def __init__(self, timestamp, channel, value):
self.timestamp = timestamp # 时序
self.channel = channel # 通道号
self.value = value # 值
# 高性能无锁队列实现
class ConcurrentDataQueue:
def __init__(self):
self.data_queue = queue.Queue(maxsize=100000) # 设置适当的队列大小防止内存溢出
self.channel_buffers = defaultdict(list) # 按通道号缓存数据
self.lock = threading.Lock() # 用于通道缓冲区的锁
self.stop_event = threading.Event() # 停止事件
def enqueue(self, data_point):
"""将数据点压入队列"""
try:
self.data_queue.put_nowait(data_point)
except queue.Full:
# 队列满时的处理策略,可以记录日志或丢弃最早数据
print("警告: 数据队列已满,可能存在消费不足情况")
try:
self.data_queue.get_nowait() # 丢弃最早数据
self.data_queue.put_nowait(data_point)
except:
pass
def start_processing(self):
"""启动数据处理线程"""
self.processing_thread = threading.Thread(target=self._process_data, daemon=True)
self.processing_thread.start()
def _process_data(self):
"""后台线程处理数据,将数据按通道号分类"""
while not self.stop_event.is_set():
try:
data_point = self.data_queue.get(timeout=0.1)
with self.lock:
self.channel_buffers[data_point.channel].append(data_point)
self.data_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"处理数据时出错: {e}")
def get_batch_by_channel(self, channel, batch_size=640, timeout=None):
"""
按通道号批量获取数据
channel: 通道号
batch_size: 需要获取的数据点数量
timeout: 超时时间(秒),None表示无限等待
"""
start_time = time.time()
while True:
with self.lock:
buffer = self.channel_buffers[channel]
if len(buffer) >= batch_size:
batch = buffer[:batch_size]
self.channel_buffers[channel] = buffer[batch_size:]
return batch
# 检查超时
if timeout is not None and (time.time() - start_time) >= timeout:
return [] # 返回空列表表示超时
time.sleep(0.001) # 短暂休眠避免CPU占用过高
def stop(self):
"""停止数据处理线程"""
self.stop_event.set()
if hasattr(self, 'processing_thread'):
self.processing_thread.join(timeout=1.0)
# 示例使用
if __name__ == "__main__":
# 创建并发数据队列
data_queue = ConcurrentDataQueue()
data_queue.start_processing()
# 模拟数据生产者线程
def data_producer():
timestamps = 3 # 示例中的时间戳数量
channels = 16 # 通道数量
for timestamp in range(timestamps):
for channel in range(channels):
# 模拟数据值 (实际应用中应该从数据源获取)
value = 1.0 + timestamp * 0.01 + channel * 0.001
data_point = DataPoint(timestamp, channel, value)
data_queue.enqueue(data_point)
time.sleep(1e-6) # 模拟1微秒的间隔
producer_thread = threading.Thread(target=data_producer, daemon=True)
producer_thread.start()
# 模拟数据消费者 - 从通道0获取640个数据点
def data_consumer():
batch = data_queue.get_batch_by_channel(channel=0, batch_size=640, timeout=5.0)
if batch:
print(f"成功从通道0获取了{len(batch)}个数据点")
# 处理数据批次...
else:
print("获取数据超时或数据不足")
consumer_thread = threading.Thread(target=data_consumer)
consumer_thread.start()
consumer_thread.join()
# 停止队列处理
data_queue.stop()
【 在 hgoldfish 的大作中提到: 】
: 发信人: xmbba (bba), 信区: KDE_Qt
: 标 题: 工业级应用求助
: 发信站: 水木社区 (Wed Jun 18 23:01:22 2025), 站内
: ...................
--
FROM 223.101.190.*