context = zmq.Context()
addr = "127.0.0.1"
port = "50020"
req = context.socket(zmq.REQ)
req.connect(f"tcp://{addr}:{port}")
req.send_string('SUB_PORT')
sub_port = req.recv_string()
sub = context.socket(zmq.SUB)
sub.connect(f"tcp://{addr}:{sub_port}")
sub.setsockopt_string(zmq.SUBSCRIBE, '')
wait = 1
data = pd.DataFrame(columns=("last", "count"))
try:
t = time.time()
while True:
topic = sub.recv_string()
msg = sub.recv()
msg = loads(msg, encoding='utf-8')
now = time.time()
if topic not in data.index:
data.loc[topic] = {"last": now, "count": 1}
data.loc[topic, "last"] = now
data.loc[topic, "count"] += 1
if now - t > wait:
update(
data.assign(last=now - data["last"])
.sort_values(by=["last"])
.style
.format({"last": "{:.2f}", "count": int})
.background_gradient(cmap="summer", axis="rows", subset=["last"])
)
t = now
except KeyboardInterrupt:
pass