Best in Flow Competition Tutorials Part 2 - Tutorial 2

 Writing critical syslog events to Apache Iceberg for analysis

A few weeks have passed since you built your data flow with DataFlow Designer to filter out critical syslog events to a dedicated Kafka topic. Now that everyone has better visibility into real-time health, management wants to do historical analysis on the data. Your company is evaluating Apache Iceberg to build an open data lakehouse and you are tasked with building a flow that ingests the most critical syslog events into an Iceberg table.


Ensure your table is built and accessible.



Create an Apache Iceberg Table


  1. From the Home page, click the Data Hub Clusters. Navigate to oss-kudu-demo from the Data Hubs list






  1. Navigate to Hue from the Kudu Data Hub.



  1. Inside of Hue you can now create your table.   You will have your own database to work with. To get to your database, click on the ‘<’ icon next to default database. You should see your specific database in the format: <YourEmailWithUnderscores>_db. Click on your database to go to the SQL Editor. 



  1. Create your Apache Iceberg table with the sql below and clicking the play icon to execute the sql query. Note that the the table name must prefixed with your Work Load User Name (userid).  




CREATE TABLE <<userid>>_syslog_critical_archive

(priority int, severity int, facility int, version int, event_timestamp bigint, hostname string,

body string, appName string, procid string, messageid string,

structureddata struct<sdid:struct<eventid:string,eventsource:string,iut:string>>)

STORED BY ICEBERG;






  1. Once you have sent data to your table, you can query it.




Additional Documentation




2.1 Open ReadyFlow & start Test Session


  1. Navigate to DataFlow from the Home Page

  2. Navigate to the ReadyFlow Gallery

  3. Explore the ReadyFlow Gallery

  4. Search for the “Kafka to Iceberg” ReadyFlow.

  5. Click on “Create New Draft” to open the ReadyFlow in the Designer named yourid_kafkatoiceberg Ex:   tim_kafkatoiceberg

  6. Start a Test Session by either clicking on the start a test session link in the banner or going to Flow Options and selecting Start in the Test Session section.

  7. In the Test Session creation wizard, select the latest NiFi version and click Start Test Session. Notice how the status at the top now says “Initializing Test Session”. 


2.2 Modifying the flow to read syslog data

The flow consists of three processors and looks very promising for our use case. The first processor reads data from a Kafka topic, the second processor gives us the option to batch up events and create larger files which are then written out to Iceberg by the PutIceberg processor.
All we have to do now to reach our goal is to customize its configuration to our use case. 


  1. Provide values for predefined parameters

    1. Navigate to Flow Options→ Parameters

    2. Select all parameters that show No value set and provide the following values

Name

Description

Value

CDP Workload User

CDP Workload User

<Your own workload user name>

CDP Workload User Password

CDP Workload User Password

<Your own workload user password>

Data Input Format

This flow supports AVRO, JSON and CSV

JSON

Hive Catalog Namespace


<YourEmailWithUnderScores_db>

Iceberg Table Name


<<replace_with_userid>>_syslog_critical_archive

Kafka Broker Endpoint

Comma-separated list of Kafka Broker addresses

oss-kafka-demo-corebroker2.oss-demo.qsm5-opic.cloudera.site:9093,

oss-kafka-demo-corebroker1.oss-demo.qsm5-opic.cloudera.site:9093,

oss-kafka-demo-corebroker0.oss-demo.qsm5-opic.cloudera.site:9093


Kafka Consumer Group Id


<<replace_with_userid>>_cdf Ex:  tim_cdf

Kafka Source Topic


<<replace_with_userid>>_syslog_critical Ex: tim_syslog_critical

Schema Name


syslog

Schema Registry Hostname


oss-kafka-demo-master0.oss-demo.qsm5-opic.cloudera.site

  1. Click Apply Changes to save the parameter values


  1. Start Controller Services

    1. Navigate to Flow Options → Services

    2. Select CDP_Schema_Registry service and click Enable Service and Referencing Components action
       

    3. Start from the top of the list and enable all remaining Controller services including KerberosPasswordUserService, HiveCatalogService, AvroReader, …

    4. Click Ok if confirmation is asked.

  1. Make sure all services have been enabled


  1. Start the ConsumeFromKafka processor using the right click action menu or the Start button in the configuration drawer. It might already be started.


After starting the processor, you should see events starting to queue up in the success_ConsumeFromKafka-FilterEvents connection.



NOTE:  

To receive data from your topic, you will need either the first deployment still running or to run it from another Flow Designer Test Session.


2.3 Changing the flow to modify the schema for Iceberg integration


Our data warehouse team has created an Iceberg table that they want us to ingest the critical syslog data in. A challenge we are facing is that not all column names in the Iceberg table match our syslog record schema. So we have to add functionality to our flow that allows us to change the schema of our syslog records. To do this, we will be using the JoltTransformRecord processor.


  1. Add a new JoltTransformRecord to the canvas by dragging the processor icon to the canvas.


  2. In the Add Processor window, select the JoltTransformRecord type and name the processor TransformSchema.


  3. Validate that your new processor now appears on the canvas.


  4. Create connections from ConsumeFromKafka to TransformSchema by hovering over the ConsumeFromKafka processor and dragging the arrow that appears to TransformSchema. Pick the success relationship to connect.


    Now connect the success relationship of TransformSchema to the MergeRecords processor.




  5. Now that we have connected our new TransformSchema processor, we can delete the original connection between ConsumeFromKafka and MergeRecords.

    Make sure that the ConsumeFromKafka processor is stopped. Then select the connection, empty the queue if needed, and then  delete it. Now all syslog events that we receive, will go through the TransformSchema processor.



  6. To make sure that our schema transformation works, we have to create a new Record Writer Service and use it as the Record Writer for the TransformSchema processor.

    Select the TransformSchema processor and open the configuration panel. Scroll to the Properties section, click the three dot menu in the Record Writer row and select Add Service to create a new Record Writer.


  7. Select AvroRecordSetWriter , name it TransformedSchemaWriter and click Add.

    Click Apply in the configuration panel to save your changes.


  8. Now click the three dot menu again and select Go To Service to configure our new Avro Record Writer.


  9. To configure our new Avro Record Writer, provide the following values:


Name

Description

Value

Schema Write Strategy

Specify whether/how CDF should write schema information

Embed Avro Schema

Schema Access Strategy

Specify how CDF identifies the schema to apply.

Use ‘Schema Name’ Property

Schema Registry

Specify the Schema Registry that stores our schema

CDP_Schema_Registry

Schema Name

The schema name to look up in the Schema Registry

syslog_transformed



 

  1. Convert the value that you provided for Schema Name into a parameter. Click on the three dot menu in the Schema Name row and select Convert To Parameter.


  2. Give the parameter the name Schema Name Transformed and click “add”. You have now created a new parameter from a value that can be used in more places in your data flow.


  3. Apply your configuration changes and Enable the Service by clicking the power icon. Now you have configured our new Schema Writer and we can return back to the Flow Designer canvas.


If you have any issues, end the test session and restart.   If your login timed out, close your browser and re login.

  1. Click Back to Flow Designer to navigate back to the canvas.




  1. Select TransformSchema to configure it and provide the following values:

Name

Description

Value

Record Reader

Service used to parse incoming events

AvroReader

Record Writer

Service used to format outgoing events

TransformedSchemaWriter

Jolt Specification

The specification that describes how to modify the incoming JSON data. We are standardizing on lower case field names and renaming the timestamp field to event_timestamp.

[

  {

    "operation": "shift",

    "spec": {

      "appName": "appname",

      "timestamp": "event_timestamp",

      "structuredData": {

        "SDID": {

          "eventId": "structureddata.sdid.eventid",

          "eventSource": "structureddata.sdid.eventsource",

          "iut": "structureddata.sdid.iut"

        }

      },

      "*": {

        "@": "&"

      }

    }

    }

]


  1. Scroll to Relationships and select Terminate for the failure, original relationships and click Apply.


  1. Start your ConsumeFromKafka and TransformSchema processor and validate that the transformed data matches our Iceberg table schema.

  2. Once events are queuing up in the connection between TransformSchema and MergeRecord, right click the connection and select List Queue.


  3. Select any of the queued files and select the book icon to open it in the Data Viewer


  4. Notice how all field names have been transformed to lower case and how the timestamp field has been renamed to event_timestamp.



2.4 Merging records and start writing to Iceberg
Now that we have verified that our schema is being transformed as needed, it’s time to start the remaining processors and write our events into the Iceberg table. The MergeRecords processor is configured to batch events up to increase efficiency when writing to Iceberg. The final processor, WriteToIceberg takes our Avro records and writes them into a Parquet formatted table. 


  1.  Tip: You can change the configuration to something like “30 sec” to speed up processing.

  2. Select the MergeRecords processor and explore its configuration. It is configured to batch events up for at least 30 seconds or until the queued up events have reached Maximum Bin Size of 1GB.  You will want to lower these for testing.


  3. Start the MergeRecords processor and verify that it batches up events and writes them out after 30 seconds.

  4. Select the WriteToIceberg processor and explore its configuration. Notice how it relies on several parameters to establish a connection to the right database and table.


  5. Start the WriteToIceberg processor and verify that it writes records successfully to Iceberg. If the metrics on the processor increase and you don’t see any warnings or events being written to the failure_WriteToIceberg connection, your writes are successful!



Congratulations! With this you have completed the second use case. 


You may want to log into Hue to check your data has loaded.



Feel free to publish your flow to the catalog and create a deployment just like you did for the first one.