Skip to content

Instantly share code, notes, and snippets.

@wardbekker
Last active March 6, 2017 09:27
Show Gist options
  • Save wardbekker/a80cbe7d12bc1866f393c5a74bf417a0 to your computer and use it in GitHub Desktop.
Save wardbekker/a80cbe7d12bc1866f393c5a74bf417a0 to your computer and use it in GitHub Desktop.
Mindwave NIFI ingest instructions

Mindwave Neurosky driver installation for OSX Sierra

  1. Download and install the latest driver from http://download.neurosky.com/public/Products/MindWave%20headset/RF%20driver%20for%20Mac/MindWaveDriver5.1.pkg
  2. After the driver is installed, download and install the latest MindWave Manager from http://download.neurosky.com/public/Products/MindWave%20headset/RF%20driver%20for%20Mac/MindWave%20Manager4.0.4.zip
  3. Launch the MindWave Manager, navigate to "Pairing" section and click the "Search for MindWave", then follow the instructions to pair the headset.

Install NIFI on OSX Sierra with Homebrew

  1. Install Homebrew from the terminal: /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
  2. Install NIFI (time of writing v1.1.2): brew install nifi

Import NIFI Flow Template

An example flow template can be downloaded using Curl:

curl -O https://gist.githubusercontent.com/wardbekker/a80cbe7d12bc1866f393c5a74bf417a0/raw/9d64daa748dec352ebe8cc350e3fb34e65130ec3/mindwave_nifi_ingest_template.xml

The most important processor here is the ListenTCP processor, which will listen on port 20000 and will receive the JSON payload. The flow also contains a Site 2 Site NIFI connection to a remote processgroup with the URL http://wbekkerhdf0.field.hortonworks.com:9090/nifi. You can remove this, or change it to your own remote NIFI cluster.

Get Ruby 'forward' script

The Mindwave Thinkgear driver will create a socket where we can consume the sensor data as Json messages. To ingest it with the current vanilla version of NIFI, we need to 'forward' the messages from the thinkgear port to the NIFI ListenTCP processor port number. Upcoming versions of NIFI will have a GetTCP processor, making this Ruby script obsolete.

Save this Ruby script as a file under thinkgear.rb. Run it with ruby thinkgear.rb AFTER you have connected your headset AND started ListenTCP processor on the NIFI flow. Otherwise you will run into connection errors.

require 'socket'
require 'json'
require 'date'

thinkgear_server_socket = TCPSocket.new 'localhost', 13854
nifi_server_socket = TCPSocket.new 'localhost', 20000

# trigger json output
thinkgear_server_socket.puts "{\"enableRawOutput\": true, \"format\": \"Json\"}\n"

while line = thinkgear_server_socket.gets # Read lines from socket
  hash = JSON.parse(line)
  hash['timestamp'] = DateTime.now.strftime('%Q')
  hash['user_id'] = 1
  json = JSON.generate(hash)
  puts json
  nifi_server_socket.puts json
end

thinkgear_server_socket.close
nifi_server_socket.close

Start ingestion of your brainwaves

  1. Connect you headset by launching the MindWave Manager, navigate to "Pairing" section and click the "Search for MindWave", then follow the instructions to pair the headset.
  2. Start the NIFI flow, or at least the ListenTCP processor.
  3. Start the ruby script with ruby thinkgear.rb.

At this point you should see JSON output from your Mindwave headset on your terminal, and new flowfiles into NIFI. Have fun with your brainwaves!

<?xml version="1.0" ?>
<template encoding-version="1.0">
<description></description>
<groupId>f39dc2ab-0159-1000-2e1f-e1a464d3df30</groupId>
<name>Mindwave NIFI Ingest template</name>
<snippet>
<connections>
<id>a64737d1-0159-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f39dc2ab-0159-1000-0000-000000000000</groupId>
<id>a6470076-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f39dc2ab-0159-1000-0000-000000000000</groupId>
<id>5f70a250-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>a6554b79-0159-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f39dc2ab-0159-1000-0000-000000000000</groupId>
<id>a6513dec-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>f39dc2ab-0159-1000-0000-000000000000</groupId>
<id>a6470076-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>7bcabfd3-015a-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>7bc84133-015a-1000-0000-000000000000</groupId>
<id>7bacfe02-015a-1000-ffff-ffffd66bfd28</id>
<type>REMOTE_INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>hasEegPower</selectedRelationships>
<source>
<groupId>f39dc2ab-0159-1000-0000-000000000000</groupId>
<id>a6513dec-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>a6470076-0159-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>652.9400634765625</x>
<y>7.999969482421875</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>hasEegPower</key>
<value>
<name>hasEegPower</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>auto-detect</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>hasEegPower</key>
<value>$.poorSignalLevel</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>a6513dec-0159-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>656.773193359375</x>
<y>250.99996948242188</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>hasEegPower</key>
<value>
<name>hasEegPower</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>hasEegPower</key>
<value>${hasEegPower:isEmpty():not()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>hasEegPower</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>5f70a250-0159-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Local Network Interface</key>
<value>
<name>Local Network Interface</name>
</value>
</entry>
<entry>
<key>Port</key>
<value>
<name>Port</name>
</value>
</entry>
<entry>
<key>Receive Buffer Size</key>
<value>
<name>Receive Buffer Size</name>
</value>
</entry>
<entry>
<key>Max Size of Message Queue</key>
<value>
<name>Max Size of Message Queue</name>
</value>
</entry>
<entry>
<key>Max Size of Socket Buffer</key>
<value>
<name>Max Size of Socket Buffer</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Max Batch Size</key>
<value>
<name>Max Batch Size</name>
</value>
</entry>
<entry>
<key>Message Delimiter</key>
<value>
<name>Message Delimiter</name>
</value>
</entry>
<entry>
<key>Max Number of TCP Connections</key>
<value>
<name>Max Number of TCP Connections</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Client Auth</key>
<value>
<name>Client Auth</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Local Network Interface</key>
</entry>
<entry>
<key>Port</key>
<value>20000</value>
</entry>
<entry>
<key>Receive Buffer Size</key>
<value>65507 B</value>
</entry>
<entry>
<key>Max Size of Message Queue</key>
<value>10000</value>
</entry>
<entry>
<key>Max Size of Socket Buffer</key>
<value>1 MB</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Max Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Message Delimiter</key>
<value>\n</value>
</entry>
<entry>
<key>Max Number of TCP Connections</key>
<value>2</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Client Auth</key>
<value>REQUIRED</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ListenTCP</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ListenTCP</type>
</processors>
<remoteProcessGroups>
<id>7bc84133-015a-1000-0000-000000000000</id>
<parentGroupId>f39dc2ab-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>652.0</x>
<y>507.99998474121094</y>
</position>
<communicationsTimeout>30 sec</communicationsTimeout>
<contents>
<inputPorts>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>true</connected>
<exists>true</exists>
<id>7bacfe02-015a-1000-ffff-ffffd66bfd28</id>
<name>input_from_local</name>
<targetRunning>true</targetRunning>
<transmitting>false</transmitting>
<useCompression>false</useCompression>
</inputPorts>
<inputPorts>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>false</connected>
<exists>true</exists>
<id>7bc8c48a-015a-1000-ffff-fffff4312f18</id>
<name>input2_from_local</name>
<targetRunning>true</targetRunning>
<transmitting>false</transmitting>
<useCompression>false</useCompression>
</inputPorts>
</contents>
<proxyHost></proxyHost>
<proxyUser></proxyUser>
<targetUri>http://wbekkerhdf0.field.hortonworks.com:9090/nifi</targetUri>
<transportProtocol>HTTP</transportProtocol>
<yieldDuration>10 sec</yieldDuration>
</remoteProcessGroups>
</snippet>
<timestamp>03/06/2017 10:17:51 CET</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment