Real-Time Slack Bots Powered By Generative AI and Data Flows

 ---


Real-Time Slack Bots Powered By Generative AI and Data Flows

Utilizing WatsonX.AI LLM Foundation Models with Cloudera DataFlow via Apache NiFi sending, receiving and processing Slack messages.


Real-time Integration of WatsonX.AI Foundation Models with NiFi

Hi, I am Timothy Spann, Principal Developer Advocate for Streaming at Cloudera.


In this article I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real-time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models.

I'll show you how easy it is to build a real-time data pipeline feeding your like Slack and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage and governance with Cloudera Data Flow. As part of decision making we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models.

You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens real-time, with no code, full governance, lineage, data management and security at any scale and on any platform.

The power of IBM and Cloudera together in private, public and hybrid cloud environments for real-time data and AI is just getting started. Try it today.

2023Step by Step Real-Time Flow

First, in Slack I type a question,

"Q: What is a good way to integrate Generative AI and Apache NiFi?"

NiFi Flow TopOnce that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing.

https://api.slack.com/apps/<YOURAPP>/event-subscriptions

Slack APIOnce enabled your server will start received JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud hosted edition with ease, even in Designer mode.

NiFi Top Flow 2In the first part of the flow we received the REST JSON Post, which is as follows.

Slackbot 1.0 (+https://api.slack.com/robots)

application/json

POST

HTTP/1.1


{

  "token" : "qHvJe59yetAp1bao6wmQzH0C",

  "team_id" : "T1SD6MZMF",

  "context_team_id" : "T1SD6MZMF",

  "context_enterprise_id" : null,

  "api_app_id" : "A04U64MN9HS",

  "event" : {

    "type" : "message",

    "subtype" : "bot_message",

    "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z\n\n  Prompt: Q:   What is a good way to integrate Generative AI and Apache NiFi?\n\nResponse: )\n\nYou can use Apache NiFi to integrate Generative AI models in several ways:\n\n1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.\n2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.\n3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.\n4. Real-time Inference: You can use NiFi's StreamingJobs\n\nToken: 200\nReq Duration: 8153\nHTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb\nIBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: <https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx>\nIBM Msg ID: disclaimer_warning\nModel ID: meta-llama/llama-2-70b-chat\nStop Reason: max_tokens\nToken Count: 38\nTX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nUUID: da0806cb-6133-4bf4-808e-1fbf419c09e3\nCorr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nGlobal TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b\nService Time: 478\nRequest ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c\nFile Name: 1a3c4386-86d2-4969-805b-37649c16addb\nRequest Duration: 8153\nRequest URL: <https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29>\ncf-ray: 82689bfd28e48ce2-EWR\n\n=====",

    "ts" : "1700063009.661159",

    "bot_id" : "B062HDEJCBH",

    "blocks" : [ {

      "type" : "rich_text",

      "block_id" : "sAFS",

      "elements" : [ {

        "type" : "rich_text_section",

        "elements" : [ {

          "type" : "text",

          "text" : "==== NiFi to IBM "

        }, {

          "type" : "link",

          "url" : "http://WatsonX.AI",

          "text" : "WatsonX.AI"

        }, {

          "type" : "text",

          "text" : " LLM Answers\n\nOn Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z\n\n  Prompt: Q:   What is a good way to integrate Generative AI and Apache NiFi?\n\nResponse: )\n\nYou can use Apache NiFi to integrate Generative AI models in several ways:\n\n1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.\n2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.\n3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.\n4. Real-time Inference: You can use NiFi's StreamingJobs\n\nToken: 200\nReq Duration: 8153\nHTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb\nIBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: "

        }, {

          "type" : "link",

          "url" : "https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx"

        }, {

          "type" : "text",

          "text" : "\nIBM Msg ID: disclaimer_warning\nModel ID: meta-llama/llama-2-70b-chat\nStop Reason: max_tokens\nToken Count: 38\nTX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nUUID: da0806cb-6133-4bf4-808e-1fbf419c09e3\nCorr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nGlobal TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b\nService Time: 478\nRequest ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c\nFile Name: 1a3c4386-86d2-4969-805b-37649c16addb\nRequest Duration: 8153\nRequest URL: "

        }, {

          "type" : "link",

          "url" : "https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29"

        }, {

          "type" : "text",

          "text" : "\ncf-ray: 82689bfd28e48ce2-EWR\n\n====="

        } ]

      } ]

    } ],

    "channel" : "C05QAAVEC0H",

    "event_ts" : "1700063009.661159",

    "channel_type" : "channel"

  },

  "type" : "event_callback",

  "event_id" : "Ev065VTPRMV0",

  "event_time" : 1700063009,

  "authorizations" : [ {

    "enterprise_id" : null,

    "team_id" : "T1SD6MZMF",

    "user_id" : "ULMRENSE4",

    "is_bot" : false,

    "is_enterprise_install" : false

  } ],

  "is_ext_shared_channel" : false,

  "event_context" : "4-eyJldCI6Im1lc3NhZ2UiLCJ0aWQiOiJUMVNENk1aTUYiLCJhaWQiOiJBMDRVNjRNTjlIUyIsImNpZCI6IkMwNVFBQVZFQzBIIn0"

}

This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic or an object store as a JSON document. (Enhancement Option) I am just going to parse what I need.

EvaluateJSONPathWe parse out the channel ID and plain text of the post. I only want messages from general ("C1SD6N197"). Then I copy the texts to an inputs field as is required for hugging face.

We check our input if it's stocks or weather (more to come) we avoid calling the LLM.

SELECT * FROM FLOWFILE

WHERE upper(inputs) like '%WEATHER%'

AND not upper(inputs) like '%LLM SKIPPED%'


SELECT * FROM FLOWFILE

WHERE upper(inputs) like '%STOCK%'

AND not upper(inputs) like '%LLM SKIPPED%'


SELECT * FROM FLOWFILE

WHERE (upper(inputs) like 'QUESTION:%'

OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'

and not upper(inputs) like '%STOCK%'

For Stocks processing:

To parse what stock we need I am using my Open NLP processor to get it.

So you will need to download the processor and the Entity extraction models.

GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor

Apache NiFi NLP Processor. Contribute to tspannhw/nifi-nlp-processor development by creating an account on GitHub.github.com

Open NLP Example Apache NiFi Processor

Open NLP Example Apache NiFi Processor I wanted to be able to add NLP processing to my dataflow without calling to…community.cloudera.com

Then we pass that company name to an HTTP REST Endpoint from AlphaVantage that converts Company Name's to Stock symbols. In free accounts you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in.

https://www.alphavantage.co/query?function=SYMBOL_SEARCH&keywords=${nlp_org_1:trim()}&apikey=GetYourselfAKey&datatype=csv

Using RouteOnContent we filter an Error messages out.

Then we use a QueryRecord processor to convert from CSV to JSON and filter.

SELECT name as companyName, symbol  FROM FLOWFILE

ORDER BY matchScore DESC

LIMIT 1

We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes.

In an UpdateAttribute we trim the symbol just in case.

${stockSymbol:trim()}

We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data.

https://api.twelvedata.com/time_series?apikey=GetAnAPIKey&interval=1min&symbol=${stockSymbol}&format=JSON

We then get a lot of stock data back.

{

  "meta" : {

    "symbol" : "IBM",

    "interval" : "1min",

    "currency" : "USD",

    "exchange_timezone" : "America/New_York",

    "exchange" : "NYSE",

    "mic_code" : "XNYS",

    "type" : "Common Stock"

  },

  "values" : [ {

    "datetime" : "2023-11-15 10:37:00",

    "open" : "152.07001",

    "high" : "152.08000",

    "low" : "151.99500",

    "close" : "152.00999",

    "volume" : "8525"

  }, {

    "datetime" : "2023-11-15 10:36:00",

    "open" : "152.08501",

    "high" : "152.12250",

    "low" : "152.08000",

    "close" : "152.08501",

    "volume" : "15204"

  } ...

We then run EvaluateJSONPath to grab the exchange information.

We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack.

SELECT * FROM FLOWFILE

ORDER BY 'datetime' DESC

LIMIT 1

We run an EvaluateJsonPath to get the most value fields to display.

We then run a PutSlack with our message.

LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}.  stock date ${stockdateTime}.  stock exchange ${exchange}

We also have a separate flow that split from Company Name.

In the first step we call Yahoo Finance to get RSS headlines for that stock.

https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}&region=US&lang=en-US

We use QueryRecord to convert RSS/XML Records to JSON.

We then run a SplitJSON to break out the news items.

We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message.

We then run UpdateRecord to finalize our JSON.

We then send this message to Slack.

LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date}

${title} : ${description}.

${guid} article date ${pubdate}

For those who selected weather, we follow a similiar route (we should add caching with Redis @ Aiven), to stocks. We use my OpenNLP processor to extract locations you might want to have weather on.

The next step is taking the output of the processor and building a value to send to our Geoencoder.

weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")}

If we couldn't find a valid location, I am going to say "New York City". We could use some other lookup. I am doing some work on loading all location and could do some advanced PostgreSQL searches on that. Or perhaps OpenSearch or a vectorized datastore.

I pass that location to Open Meteo to find the geo via InvokeHTTP.

https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json

We then parse the values we need from the results.

{

  "results" : [ {

    "id" : 5128581,

    "name" : "New York",

    "latitude" : 40.71427,

    "longitude" : -74.00597,

    "elevation" : 10.0,

    "feature_code" : "PPL",

    "country_code" : "US",

    "admin1_id" : 5128638,

    "timezone" : "America/New_York",

    "population" : 8175133,

    "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ],

    "country_id" : 6252001,

    "country" : "United States",

    "admin1" : "New York"

  } ],

  "generationtime_ms" : 0.92196465

}

We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP.

https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}

The results are geo-json.

{

    "@context": [

        "https://geojson.org/geojson-ld/geojson-context.jsonld",

        {

            "@version": "1.1",

            "wx": "https://api.weather.gov/ontology#",

            "s": "https://schema.org/",

            "geo": "http://www.opengis.net/ont/geosparql#",

            "unit": "http://codes.wmo.int/common/unit/",

            "@vocab": "https://api.weather.gov/ontology#",

            "geometry": {

                "@id": "s:GeoCoordinates",

                "@type": "geo:wktLiteral"

            },

            "city": "s:addressLocality",

            "state": "s:addressRegion",

            "distance": {

                "@id": "s:Distance",

                "@type": "s:QuantitativeValue"

            },

            "bearing": {

                "@type": "s:QuantitativeValue"

            },

            "value": {

                "@id": "s:value"

            },

            "unitCode": {

                "@id": "s:unitCode",

                "@type": "@id"

            },

            "forecastOffice": {

                "@type": "@id"

            },

            "forecastGridData": {

                "@type": "@id"

            },

            "publicZone": {

                "@type": "@id"

            },

            "county": {

                "@type": "@id"

            }

        }

    ],

    "id": "https://api.weather.gov/points/40.7143,-74.006",

    "type": "Feature",

    "geometry": {

        "type": "Point",

        "coordinates": [

            -74.006,

            40.714300000000001

        ]

    },

    "properties": {

        "@id": "https://api.weather.gov/points/40.7143,-74.006",

        "@type": "wx:Point",

        "cwa": "OKX",

        "forecastOffice": "https://api.weather.gov/offices/OKX",

        "gridId": "OKX",

        "gridX": 33,

        "gridY": 35,

        "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast",

        "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly",

        "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35",

        "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations",

        "relativeLocation": {

            "type": "Feature",

            "geometry": {

                "type": "Point",

                "coordinates": [

                    -74.0279259,

                    40.745251000000003

                ]

            },

            "properties": {

                "city": "Hoboken",

                "state": "NJ",

                "distance": {

                    "unitCode": "wmoUnit:m",

                    "value": 3906.1522008034999

                },

                "bearing": {

                    "unitCode": "wmoUnit:degree_(angle)",

                    "value": 151

                }

            }

        },

        "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072",

        "county": "https://api.weather.gov/zones/county/NYC061",

        "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212",

        "timeZone": "America/New_York",

        "radarStation": "KDIX"

    }

}

We use EvaluateJSONPath to grab a forecast URL.

Then we call that forecast URL via invokeHTTP.

That produces a larger JSON output that we will parse for the results we want to return to Slack.

{

    "@context": [

        "https://geojson.org/geojson-ld/geojson-context.jsonld",

        {

            "@version": "1.1",

            "wx": "https://api.weather.gov/ontology#",

            "geo": "http://www.opengis.net/ont/geosparql#",

            "unit": "http://codes.wmo.int/common/unit/",

            "@vocab": "https://api.weather.gov/ontology#"

        }

    ],

    "type": "Feature",

    "geometry": {

        "type": "Polygon",

        "coordinates": [

            [

                [

                    -74.025095199999996,

                    40.727052399999998

                ],

                [

                    -74.0295579,

                    40.705361699999997

                ],

                [

                    -74.000948300000005,

                    40.701977499999998

                ],

                [

                    -73.996479800000003,

                    40.723667899999995

                ],

                [

                    -74.025095199999996,

                    40.727052399999998

                ]

            ]

        ]

    },

    "properties": {

        "updated": "2023-11-15T14:34:46+00:00",

        "units": "us",

        "forecastGenerator": "BaselineForecastGenerator",

        "generatedAt": "2023-11-15T15:11:39+00:00",

        "updateTime": "2023-11-15T14:34:46+00:00",

        "validTimes": "2023-11-15T08:00:00+00:00/P7DT17H",

        "elevation": {

            "unitCode": "wmoUnit:m",

            "value": 2.1335999999999999

        },

        "periods": [

            {

                "number": 1,

                "name": "Today",

                "startTime": "2023-11-15T10:00:00-05:00",

                "endTime": "2023-11-15T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 51,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 2.2222222222222223

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 68

                },

                "windSpeed": "1 to 7 mph",

                "windDirection": "SW",

                "icon": "https://api.weather.gov/icons/land/day/bkn?size=medium",

                "shortForecast": "Partly Sunny",

                "detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph."

            },

            {

                "number": 2,

                "name": "Tonight",

                "startTime": "2023-11-15T18:00:00-05:00",

                "endTime": "2023-11-16T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 44,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 3.8888888888888888

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 82

                },

                "windSpeed": "8 mph",

                "windDirection": "SW",

                "icon": "https://api.weather.gov/icons/land/night/sct?size=medium",

                "shortForecast": "Partly Cloudy",

                "detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph."

            },

            {

                "number": 3,

                "name": "Thursday",

                "startTime": "2023-11-16T06:00:00-05:00",

                "endTime": "2023-11-16T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 60,

                "temperatureUnit": "F",

                "temperatureTrend": "falling",

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 5.5555555555555554

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 82

                },

                "windSpeed": "6 mph",

                "windDirection": "SW",

                "icon": "https://api.weather.gov/icons/land/day/few?size=medium",

                "shortForecast": "Sunny",

                "detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph."

            },

            {

                "number": 4,

                "name": "Thursday Night",

                "startTime": "2023-11-16T18:00:00-05:00",

                "endTime": "2023-11-17T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 47,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 6.1111111111111107

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 80

                },

                "windSpeed": "3 mph",

                "windDirection": "SW",

                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",

                "shortForecast": "Mostly Clear",

                "detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph."

            },

            {

                "number": 5,

                "name": "Friday",

                "startTime": "2023-11-17T06:00:00-05:00",

                "endTime": "2023-11-17T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 63,

                "temperatureUnit": "F",

                "temperatureTrend": "falling",

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": 20

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 12.222222222222221

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 86

                },

                "windSpeed": "2 to 10 mph",

                "windDirection": "S",

                "icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium",

                "shortForecast": "Partly Sunny then Slight Chance Light Rain",

                "detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%."

            },

            {

                "number": 6,

                "name": "Friday Night",

                "startTime": "2023-11-17T18:00:00-05:00",

                "endTime": "2023-11-18T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 51,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": 70

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 12.777777777777779

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 100

                },

                "windSpeed": "6 to 10 mph",

                "windDirection": "SW",

                "icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium",

                "shortForecast": "Light Rain Likely",

                "detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible."

            },

            {

                "number": 7,

                "name": "Saturday",

                "startTime": "2023-11-18T06:00:00-05:00",

                "endTime": "2023-11-18T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 55,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": 70

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 11.111111111111111

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 100

                },

                "windSpeed": "8 to 18 mph",

                "windDirection": "NW",

                "icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium",

                "shortForecast": "Light Rain Likely",

                "detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%."

            },

            {

                "number": 8,

                "name": "Saturday Night",

                "startTime": "2023-11-18T18:00:00-05:00",

                "endTime": "2023-11-19T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 40,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 1.1111111111111112

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 65

                },

                "windSpeed": "12 to 17 mph",

                "windDirection": "NW",

                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",

                "shortForecast": "Mostly Clear",

                "detailedForecast": "Mostly clear, with a low around 40."

            },

            {

                "number": 9,

                "name": "Sunday",

                "startTime": "2023-11-19T06:00:00-05:00",

                "endTime": "2023-11-19T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 50,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": -0.55555555555555558

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 65

                },

                "windSpeed": "10 to 14 mph",

                "windDirection": "W",

                "icon": "https://api.weather.gov/icons/land/day/few?size=medium",

                "shortForecast": "Sunny",

                "detailedForecast": "Sunny, with a high near 50."

            },

            {

                "number": 10,

                "name": "Sunday Night",

                "startTime": "2023-11-19T18:00:00-05:00",

                "endTime": "2023-11-20T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 38,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": -0.55555555555555558

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 67

                },

                "windSpeed": "13 mph",

                "windDirection": "NW",

                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",

                "shortForecast": "Mostly Clear",

                "detailedForecast": "Mostly clear, with a low around 38."

            },

            {

                "number": 11,

                "name": "Monday",

                "startTime": "2023-11-20T06:00:00-05:00",

                "endTime": "2023-11-20T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 46,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": -1.6666666666666667

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 70

                },

                "windSpeed": "13 mph",

                "windDirection": "NW",

                "icon": "https://api.weather.gov/icons/land/day/sct?size=medium",

                "shortForecast": "Mostly Sunny",

                "detailedForecast": "Mostly sunny, with a high near 46."

            },

            {

                "number": 12,

                "name": "Monday Night",

                "startTime": "2023-11-20T18:00:00-05:00",

                "endTime": "2023-11-21T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 38,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": null

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": -1.1111111111111112

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 70

                },

                "windSpeed": "10 mph",

                "windDirection": "N",

                "icon": "https://api.weather.gov/icons/land/night/sct?size=medium",

                "shortForecast": "Partly Cloudy",

                "detailedForecast": "Partly cloudy, with a low around 38."

            },

            {

                "number": 13,

                "name": "Tuesday",

                "startTime": "2023-11-21T06:00:00-05:00",

                "endTime": "2023-11-21T18:00:00-05:00",

                "isDaytime": true,

                "temperature": 49,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": 30

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 2.7777777777777777

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 73

                },

                "windSpeed": "9 to 13 mph",

                "windDirection": "E",

                "icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium",

                "shortForecast": "Partly Sunny then Chance Light Rain",

                "detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%."

            },

            {

                "number": 14,

                "name": "Tuesday Night",

                "startTime": "2023-11-21T18:00:00-05:00",

                "endTime": "2023-11-22T06:00:00-05:00",

                "isDaytime": false,

                "temperature": 46,

                "temperatureUnit": "F",

                "temperatureTrend": null,

                "probabilityOfPrecipitation": {

                    "unitCode": "wmoUnit:percent",

                    "value": 50

                },

                "dewpoint": {

                    "unitCode": "wmoUnit:degC",

                    "value": 7.7777777777777777

                },

                "relativeHumidity": {

                    "unitCode": "wmoUnit:percent",

                    "value": 86

                },

                "windSpeed": "13 to 18 mph",

                "windDirection": "S",

                "icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium",

                "shortForecast": "Chance Light Rain",

                "detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%."

            }

        ]

    }

}

We parse the data with EvaluateJSONPath to get primary fields for weather.

We then format those fields to PutSlack.

LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}

Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}

There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}

Slack Output


---


If we do have an LLM question, let's make sure it's just one record.

We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be access by our REST prompts.

I tested and built the prompts initially at IBM's Prompt Lab and then copied the initial curl statement from there.

IBM watsonx.ai

https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-

ibm/mpt-7b-instruct2 

meta-llama/llama-2–70b-chat

ibm/granite-13b-chat-v1

We have to send our unique secure key to IBM and they will give us a token to use in our next call.

We parse out the question and then send to the WatsonX via REST API.

We build a prompt to send to IBM as follows.

{

  "model_id": "meta-llama/llama-2-70b-chat",

  "input": "${inputs:urlEncode()}",

  "parameters": {

    "decoding_method": "greedy",

    "max_new_tokens": 200,

    "min_new_tokens": 50,

    "stop_sequences": [],

    "repetition_penalty": 1

  },

  "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }

We parse the generated text which is our Generative AI results plus some helpful metadata on timings.

The result posted to Slack is:

"You can use Apache NiFi to integrate Generative AI models in several ways:1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.

2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.

3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.

4. Real-time Inference: You can use NiFi's StreamingJobs"

After the slack bot posted the results, it posted metrics and debugging information to the chat channel.

All of the metadata is posted to another slack channel for administrator monitoring.

==== NiFi to IBM WatsonX.AI LLM Answers

On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z

  Prompt: Q:   What is a good way to integrate Generative AI and Apache NiFi?

Response: )

You can use Apache NiFi to integrate Generative AI models in several ways:

1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.

2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.

3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.

4. Real-time Inference: You can use NiFi's StreamingJobs

Token: 200

Req Duration: 8153

HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb

IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx

IBM Msg ID: disclaimer_warning

Model ID: meta-llama/llama-2-70b-chat

Stop Reason: max_tokens

Token Count: 38

TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756

UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3

Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756

Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b

Service Time: 478

Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c

File Name: 1a3c4386-86d2-4969-805b-37649c16addb

Request Duration: 8153

Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29

cf-ray: 82689bfd28e48ce2-EWR

=====

Source

GitHub - tspannhw/FLaNK-watsonx.ai: FLaNK Stack with watsonx.ai for google/flan-ul2…

FLaNK Stack with watsonx.ai for google/flan-ul2, google/flan-t5-xxl, Granite and other foundation models - GitHub …github.com

Make your own Slack Bot


Slack Output

Kafka Distribute

Apache Flink SQL Table Creation DDL

CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (

  `date` VARCHAR(2147483647),

  `x_global_transaction_id` VARCHAR(2147483647),

  `x_request_id` VARCHAR(2147483647),

  `cf_ray` VARCHAR(2147483647),

  `inputs` VARCHAR(2147483647),

  `created_at` VARCHAR(2147483647),

  `stop_reason` VARCHAR(2147483647),

  `x_correlation_id` VARCHAR(2147483647),

  `x_proxy_upstream_service_time` VARCHAR(2147483647),

  `message_id` VARCHAR(2147483647),

  `model_id` VARCHAR(2147483647),

  `invokehttp_request_duration` VARCHAR(2147483647),

  `message` VARCHAR(2147483647),

  `uuid` VARCHAR(2147483647),

  `generated_text` VARCHAR(2147483647),

  `transaction_id` VARCHAR(2147483647),

  `tokencount` VARCHAR(2147483647),

  `generated_token` VARCHAR(2147483647),

  `ts` VARCHAR(2147483647),

  `advisoryId` VARCHAR(2147483647),

  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',

  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND

) WITH (

  'deserialization.failure.policy' = 'ignore_and_log',

  'properties.request.timeout.ms' = '120000',

  'format' = 'json',

  'properties.bootstrap.servers' = 'kafka:9092',

  'connector' = 'kafka',

  'properties.transaction.timeout.ms' = '900000',

  'topic' = 'watsonxaillmanswers',

  'scan.startup.mode' = 'group-offsets',

  'properties.auto.offset.reset' = 'earliest',

  'properties.group.id' = 'watsonxaillmconsumer'

)


CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (

  `date` VARCHAR(2147483647),

  `x_global_transaction_id` VARCHAR(2147483647),

  `x_request_id` VARCHAR(2147483647),

  `cf_ray` VARCHAR(2147483647),

  `inputs` VARCHAR(2147483647),

  `created_at` VARCHAR(2147483647),

  `stop_reason` VARCHAR(2147483647),

  `x_correlation_id` VARCHAR(2147483647),

  `x_proxy_upstream_service_time` VARCHAR(2147483647),

  `message_id` VARCHAR(2147483647),

  `model_id` VARCHAR(2147483647),

  `invokehttp_request_duration` VARCHAR(2147483647),

  `message` VARCHAR(2147483647),

  `uuid` VARCHAR(2147483647),

  `generated_text` VARCHAR(2147483647),

  `transaction_id` VARCHAR(2147483647),

  `tokencount` VARCHAR(2147483647),

  `generated_token` VARCHAR(2147483647),

  `ts` VARCHAR(2147483647),

  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',

  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND

) WITH (

  'deserialization.failure.policy' = 'ignore_and_log',

  'properties.request.timeout.ms' = '120000',

  'format' = 'json',

  'properties.bootstrap.servers' = 'kafka:9092',

  'connector' = 'kafka',

  'properties.transaction.timeout.ms' = '900000',

  'topic' = 'watsonxaillm',

  'scan.startup.mode' = 'group-offsets',

  'properties.auto.offset.reset' = 'earliest',

  'properties.group.id' = 'allwatsonx1'

)

Example Prompt


{"inputs":"Please answer to the following question. What is the capital of the United States?"}

IBM DB2 SQL

alter table  "DB2INST1"."TRAVELADVISORY"

add column "summary" VARCHAR(2048);


-- DB2INST1.TRAVELADVISORY definition


CREATE TABLE "DB2INST1"."TRAVELADVISORY"  (

    "TITLE" VARCHAR(250 OCTETS) , 

    "PUBDATE" VARCHAR(250 OCTETS) , 

    "LINK" VARCHAR(250 OCTETS) , 

    "GUID" VARCHAR(250 OCTETS) , 

    "ADVISORYID" VARCHAR(250 OCTETS) , 

    "DOMAIN" VARCHAR(250 OCTETS) , 

    "CATEGORY" VARCHAR(4096 OCTETS) , 

    "DESCRIPTION" VARCHAR(4096 OCTETS) , 

    "UUID" VARCHAR(250 OCTETS) NOT NULL , 

    "TS" BIGINT NOT NULL , 

    "summary" VARCHAR(2048 OCTETS) )   

   IN "IBMDB2SAMPLEREL"  

   ORGANIZE BY ROW;


ALTER TABLE "DB2INST1"."TRAVELADVISORY" 

 ADD PRIMARY KEY

  ("UUID")

 ENFORCED;


GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";


GRANT CONTROL ON INDEX "SYSIBM  "."SQL230620142604860" TO USER "DB2INST1";


SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE  FROM DB2INST1.TRAVELADVISORY t 

WHERE "summary" IS NOT NULL 

ORDER BY ts DESC 

For an example output email

https://github.com/tspannhw/FLaNK-watsonx.ai/blob/main/example.email.txt

References

The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond

Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com

Streaming LLM with Apache NiFi (HuggingFace)

See my talk on August 23, 2023 at NYC AI Dev Day.medium.com

IBM Documentation

IBM Documentation.www.ibm.com

Supported foundation models available with watsonx.ai

A collection of open source and IBM foundation models are deployed in IBM watsonx.ai.www.ibm.com

Tips for writing foundation model prompts: prompt engineering

Part art, part science, prompt engineering is the process of crafting prompt text to best effect for a given model and…www.ibm.com

Sample foundation model prompts for common tasks

Try these samples to learn how different prompts can guide foundation models to do common tasks.www.ibm.com

IBM Documentation

IBM Documentation.www.ibm.com

IBM Technology Chooses Cloudera as its Preferred Partner for Addressing Real Time Data Movement…

Organizations increasingly rely on streaming data sources not only to bring data into the enterprise but also to…blog.cloudera.com

Travel Advisories

https://travel.state.gov/content/travel/en/traveladvisories/traveladvisories.html/

https://travel.state.gov/_res/rss/TAsTWs.xml

https://medium.com/@tspann/building-a-travel-advisory-app-with-apache-nifi-in-k8-969b44c84958

https://github.com/tspannhw/FLaNK-TravelAdvisory


Video

https://www.youtube.com/watch?v=RPz7Xm4fLF4&t=6s

Source Code

https://github.com/tspannhw/FLaNK-watsonx.ai

Models

https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-

https://medium.com/cloudera-inc/ingesting-events-into-dockerized-ibm-db2-jdbc-with-apache-nifi-f0ca452d1351

https://www.youtube.com/watch?v=-r8zf_nfxCw

https://medium.com/cloudera-inc/no-code-sentiment-analysis-with-hugging-face-and-apache-nifi-for-article-summaries-cf06d1df1283

https://medium.com/cloudera-inc/cdc-not-cat-data-capture-e43713879c03

https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb

https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb


Other Related Articles

The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond

Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com

Streaming LLM with Apache NiFi (HuggingFace)

See my talk on August 23, 2023 at NYC AI Dev Day.medium.com

Evolve NYC 2023 Wrap-Up

Evolve NYC 2023 was an intense event with a ton of speakers, sessions, topics and cool people.medium.com

No Code Sentiment Analysis with Hugging Face and Apache NiFi for Article Summaries

Apache NiFi, Hugging Face, NY Times RSS, Sentiment Analysis, Deep Learning, REST API, Classification, Distilbert…medium.com