EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)
EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi
Building MiNiFi IoT Apps with the new Cloudera EFM
It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.
Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!
Download the newest CEM today!
NiFi Flow Receiving From MiNiFi Java Agent
In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu. We filter our data with Streaming SQL.
We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned. Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text. Read and write different formats and convert when your SQL is done. Or just to SELECT * FROM FLOWFILE to get everything.
We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.
We can search Atlas for Kafka Topics.
From coral Kafka topic to NiFi to Kudu.
Details on Coral Kafka Topic
Examining the Hive Metastore Data on the Coral Kudu Table
NiFi Flow Details in Atlas
Details on Alerts Topic
Statistics from Atlas
Example Web Camera Image
Example JSON Record
[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]
Querying Kudu results in Hue
Pushing Alerts to Slack from NiFi
I am running on Apache NiFi 1.11.1 and wanted to point out a new feature. Download flow: Will download the highlighted flow/pgroup as JSON.
Looking at NiFi counters to monitor progress:
We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.
Shell (coralrun.sh)
#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null
Kudu Table DDL
https://github.com/tspannhw/table-ddl
Python 3 (test.py)
import timeimport sysimport subprocessimport osimport base64import uuidimport datetimeimport tracebackimport base64import jsonfrom time import gmtime, strftimeimport mathimport random, stringimport timeimport psutilimport uuidfrom getmac import get_mac_addressfrom coral.enviro.board import EnviroBoardfrom luma.core.render import canvasfrom PIL import Image, ImageDraw, ImageFontimport osimport argparsefrom edgetpu.classification.engine import ClassificationEngine# Importing socket libraryimport socketstart = time.time()starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')def ReadLabelFile(file_path):with open(file_path, 'r') as f:lines = f.readlines()ret = {}for line in lines:pair = line.strip().split(maxsplit=1)ret[int(pair[0])] = pair[1].strip()return ret# Google Example Codedef update_display(display, msg):with canvas(display) as draw:draw.text((0, 0), msg, fill='white')def getCPUtemperature():res = os.popen('vcgencmd measure_temp').readline()return(res.replace("temp=","").replace("'C\n",""))# Get MAC address of a local interfacesdef psutil_iface(iface):# type: (str) -> Optional[str]import psutilnics = psutil.net_if_addrs()if iface in nics:nic = nics[iface]for i in nic:if i.family == psutil.AF_LINK:return i.address# /opt/demo/examples-camera/all_modelsrow = { }try:#i = 1#while i == 1:parser = argparse.ArgumentParser()parser.add_argument('--image', help='File path of the image to be recognized.', required=True)args = parser.parse_args()# Prepare labels.labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')# Initialize engine.engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')# Run inference.img = Image.open(args.image)scores = {}kCount = 1# Iterate Inference Resultsfor result in engine.ClassifyWithImage(img, top_k=5):scores['label_' + str(kCount)] = labels[result[0]]scores['score_' + str(kCount)] = "{:.2f}".format(result[1])kCount = kCount + 1enviro = EnviroBoard()host_name = socket.gethostname()host_ip = socket.gethostbyname(host_name)cpuTemp=int(float(getCPUtemperature()))uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())usage = psutil.disk_usage("/")end = time.time()row.update(scores)row['host'] = os.uname()[1]row['ip'] = host_iprow['macaddress'] = psutil_iface('wlan0')row['cputemp'] = round(cpuTemp,2)row['te'] = "{0:.2f}".format((end-start))row['starttime'] = starttfrow['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')row['cpu'] = psutil.cpu_percent(interval=1)row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)row['memory'] = psutil.virtual_memory().percentrow['id'] = str(uuid2)row['message'] = "Success"row['temperature'] = '{0:.2f}'.format(enviro.temperature)row['humidity'] = '{0:.2f}'.format(enviro.humidity)row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)row['ambient_light'] = '{0}'.format(enviro.ambient_light)row['pressure'] = '{0:.2f}'.format(enviro.pressure)msg = 'Temp: {0}'.format(row['temperature'])msg += 'IP: {0}'.format(row['ip'])update_display(enviro.display, msg)# i = 2except:row['message'] = "Error"print(json.dumps(row))
Source Code:
Sensors / Devices / Hardware:
- Humdity-HDC2010 humidity sensor
- Light-OPT3002 ambient light sensor
- Barometric-BMP280 barometric pressure sensor
- PS3 Eye Camera and Microphone USB
- Raspberry Pi 3B+
- Google Coral Environmental Sensor Board
- Google Coral USB Accelerator TPU
References:
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://coral.ai/docs/enviro-board/datasheet/
- https://github.com/tspannhw/nifi-minifi-coral-env
- https://github.com/tspannhw/nifi-minifi-coral
- https://www.datainmotion.dev/2019/08/google-coral-tpu-with-edge-devices-and.html
- https://github.com/tspannhw/minifi-grove-sensors
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral
- https://github.com/tensorflow/examples/tree/master/lite/examples/image_classification/raspberry_pi
- https://github.com/google-coral/examples-camera
- https://github.com/google-coral/project-keyword-spotter
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral