import ipfix import ipfix.vis from ipaddress import ip_address from datetime import datetime from datetime import timezone from IPython.display import SVG def iso8601(x): return datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") def draw_message(msg, length=256): return SVG(ipfix.vis.MessageBufferRenderer(msg, raster=4).render(length=length)) def draw_template(tmpl): ofd = ipfix.vis.OctetFieldDrawing(raster=4) ipfix.vis.draw_template(ofd, tmpl) return SVG(ofd.render((90,30))) ipfix.ie.use_iana_default() ipfix.ie.use_5103_default() tmpl = ipfix.template.for_specs(261, "flowStartMilliseconds", "flowEndMilliseconds", "sourceIPv4Address", "destinationIPv4Address", "sourceTransportPort", "destinationTransportPort", "protocolIdentifier", "octetDeltaCount", "packetDeltaCount") draw_template(tmpl) msg = ipfix.message.MessageBuffer() msg.begin_export(8304) # Observation Domain ID msg.add_template(tmpl) msg.export_new_set(261) # The set ID here refers to the Template ID of the template msg.export_namedict({ 'flowStartMilliseconds': iso8601('2012-10-22 09:29:07.170000'), 'flowEndMilliseconds': iso8601('2012-10-22 09:29:33.916000'), 'sourceIPv4Address': ip_address('192.0.2.11'), 'destinationIPv4Address': ip_address('192.0.2.212'), 'sourceTransportPort': 32798, 'destinationTransportPort': 80, 'protocolIdentifier': 6, 'packetDeltaCount': 17, 'octetDeltaCount': 3329}) draw_message(msg) msg.begin_export(8304) msg.export_new_set(261) msg.export_namedict({ 'flowStartMilliseconds': iso8601('2012-10-22 09:30:01.912000'), 'flowEndMilliseconds': iso8601('2012-10-22 09:31:15.009000'), 'sourceIPv4Address': ip_address('192.0.2.212'), 'destinationIPv4Address': ip_address('192.0.2.11'), 'sourceTransportPort': 80, 'destinationTransportPort': 32801, 'protocolIdentifier': 6, 'packetDeltaCount': 83, 'octetDeltaCount': 97501}) msg.export_namedict({ 'flowStartMilliseconds': iso8601('2012-10-22 09:30:08.182000'), 'flowEndMilliseconds': iso8601('2012-10-22 09:31:16.012000'), 'sourceIPv4Address': ip_address('192.0.2.212'), 'destinationIPv4Address': ip_address('192.0.2.11'), 'sourceTransportPort': 80, 'destinationTransportPort': 32802, 'protocolIdentifier': 6, 'packetDeltaCount': 99, 'octetDeltaCount': 136172}) draw_message(msg) vtmpl = ipfix.template.for_specs(262, "flowStartMilliseconds", "flowEndMilliseconds", "sourceIPv6Address", "destinationIPv6Address", "octetDeltaCount[4]", "packetDeltaCount[4]", "wlanSSID") msg.begin_export(8304) msg.add_template(vtmpl) msg.export_new_set(262) msg.export_namedict({'flowStartMilliseconds': iso8601('2012-10-22 09:31:54.903000'), 'flowEndMilliseconds': iso8601('2012-10-22 09:41:52.627000'), 'sourceIPv6Address': ip_address('2001:db8:c0:ffee::2'), 'destinationIPv6Address': ip_address('2001:bd8:b:ea75::3'), 'packetDeltaCount': 212, 'octetDeltaCount': 553290, 'wlanSSID': 'ietf-a-v6only'}) draw_message(msg) ipfix.ie.for_spec("ambientTemperatureCelsius(35566/2)[4]") ttmpl = ipfix.template.for_specs(263, "observationTimeMilliseconds", "observationPointId[4]", "ambientTemperatureCelsius") msg = ipfix.message.MessageBuffer() msg.begin_export(8304) msg.add_template(ttmpl) msg.export_new_set(263) msg.export_namedict({'observationTimeMilliseconds': datetime.utcnow(), 'observationPointId': 1, 'ambientTemperatureCelsius': 22.3}) draw_message(msg) ipfix.ie.for_spec("relativeHumidityPercent(35566/3)[4]") import socketserver import ipfix.reader import threading msg_length = 512 # Maximum number of bytes per message to render svgbuf = [] svgbuf_mtx = threading.Lock() class StreamRendererHandler(socketserver.StreamRequestHandler): def handle(self): global svgbuf print("connection from "+str(self.client_address)+".") msr = ipfix.vis.MessageStreamRenderer(self.rfile, scale=(90,30), raster=4) while True: try: svgbuf_mtx.acquire() svgbuf.append(msr.render_next_message(msg_length)) svgbuf_mtx.release() except: break print("connection from "+str(self.client_address)+" terminated.") class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass srv = None # shut down old server, if any, through loss of reference srv = ThreadingTCPServer(("", 4739), StreamRendererHandler) srvt = threading.Thread(target=srv.serve_forever) srvt.daemon = True srvt.start() %pushd ../raspi !./run_sim.sh localhost 4739 %popd srv.shutdown() SVG(svgbuf[0]) SVG(svgbuf[1]) SVG(svgbuf[2])