Best in Flow Competition Tutorials Part 2 - Tutorial 2
Writing critical syslog events to Apache Iceberg for analysis
A few weeks have passed since you built your data flow with DataFlow Designer to filter out critical syslog events to a dedicated Kafka topic. Now that everyone has better visibility into real-time health, management wants to do historical analysis on the data. Your company is evaluating Apache Iceberg to build an open data lakehouse and you are tasked with building a flow that ingests the most critical syslog events into an Iceberg table.
Ensure your table is built and accessible.
Create an Apache Iceberg Table
From the Home page, click the Data Hub Clusters. Navigate to oss-kudu-demo from the Data Hubs list
Navigate to Hue from the Kudu Data Hub.
Inside of Hue you can now create your table. You will have your own database to work with. To get to your database, click on the ‘<’ icon next to default database. You should see your specific database in the format: <YourEmailWithUnderscores>_db. Click on your database to go to the SQL Editor.
Create your Apache Iceberg table with the sql below and clicking the play icon to execute the sql query. Note that the the table name must prefixed with your Work Load User Name (userid).
CREATE TABLE <<userid>>_syslog_critical_archive
(priority int, severity int, facility int, version int, event_timestamp bigint, hostname string,
body string, appName string, procid string, messageid string,
structureddata struct<sdid:struct<eventid:string,eventsource:string,iut:string>>)
STORED BY ICEBERG;
Once you have sent data to your table, you can query it.
Additional Documentation
2.1 Open ReadyFlow & start Test Session
Navigate to DataFlow from the Home Page
Navigate to the ReadyFlow Gallery
Explore the ReadyFlow Gallery
Search for the “Kafka to Iceberg” ReadyFlow.
Click on “Create New Draft” to open the ReadyFlow in the Designer named yourid_kafkatoiceberg Ex: tim_kafkatoiceberg
Start a Test Session by either clicking on the start a test session link in the banner or going to Flow Options and selecting Start in the Test Session section.
In the Test Session creation wizard, select the latest NiFi version and click Start Test Session. Notice how the status at the top now says “Initializing Test Session”.
2.2 Modifying the flow to read syslog data
The flow consists of three processors and looks very promising for our use case. The first processor reads data from a Kafka topic, the second processor gives us the option to batch up events and create larger files which are then written out to Iceberg by the PutIceberg processor.
All we have to do now to reach our goal is to customize its configuration to our use case.
Provide values for predefined parameters
Navigate to Flow Options→ Parameters
Select all parameters that show No value set and provide the following values
Click Apply Changes to save the parameter values
Start Controller Services
Navigate to Flow Options → Services
Select CDP_Schema_Registry service and click Enable Service and Referencing Components action
Start from the top of the list and enable all remaining Controller services including KerberosPasswordUserService, HiveCatalogService, AvroReader, …
Click Ok if confirmation is asked.
Make sure all services have been enabled
Start the ConsumeFromKafka processor using the right click action menu or the Start button in the configuration drawer. It might already be started.
After starting the processor, you should see events starting to queue up in the success_ConsumeFromKafka-FilterEvents connection.
NOTE:
To receive data from your topic, you will need either the first deployment still running or to run it from another Flow Designer Test Session.
2.3 Changing the flow to modify the schema for Iceberg integration
Our data warehouse team has created an Iceberg table that they want us to ingest the critical syslog data in. A challenge we are facing is that not all column names in the Iceberg table match our syslog record schema. So we have to add functionality to our flow that allows us to change the schema of our syslog records. To do this, we will be using the JoltTransformRecord processor.
Add a new JoltTransformRecord to the canvas by dragging the processor icon to the canvas.
In the Add Processor window, select the JoltTransformRecord type and name the processor TransformSchema.
Validate that your new processor now appears on the canvas.
Create connections from ConsumeFromKafka to TransformSchema by hovering over the ConsumeFromKafka processor and dragging the arrow that appears to TransformSchema. Pick the success relationship to connect.
Now connect the success relationship of TransformSchema to the MergeRecords processor.Now that we have connected our new TransformSchema processor, we can delete the original connection between ConsumeFromKafka and MergeRecords.
Make sure that the ConsumeFromKafka processor is stopped. Then select the connection, empty the queue if needed, and then delete it. Now all syslog events that we receive, will go through the TransformSchema processor.To make sure that our schema transformation works, we have to create a new Record Writer Service and use it as the Record Writer for the TransformSchema processor.
Select the TransformSchema processor and open the configuration panel. Scroll to the Properties section, click the three dot menu in the Record Writer row and select Add Service to create a new Record Writer.Select AvroRecordSetWriter , name it TransformedSchemaWriter and click Add.
Click Apply in the configuration panel to save your changes.Now click the three dot menu again and select Go To Service to configure our new Avro Record Writer.
To configure our new Avro Record Writer, provide the following values:
Convert the value that you provided for Schema Name into a parameter. Click on the three dot menu in the Schema Name row and select Convert To Parameter.
Give the parameter the name Schema Name Transformed and click “add”. You have now created a new parameter from a value that can be used in more places in your data flow.
Apply your configuration changes and Enable the Service by clicking the power icon. Now you have configured our new Schema Writer and we can return back to the Flow Designer canvas.
If you have any issues, end the test session and restart. If your login timed out, close your browser and re login.
Click Back to Flow Designer to navigate back to the canvas.
Select TransformSchema to configure it and provide the following values:
Scroll to Relationships and select Terminate for the failure, original relationships and click Apply.
Start your ConsumeFromKafka and TransformSchema processor and validate that the transformed data matches our Iceberg table schema.
Once events are queuing up in the connection between TransformSchema and MergeRecord, right click the connection and select List Queue.
Select any of the queued files and select the book icon to open it in the Data Viewer
Notice how all field names have been transformed to lower case and how the timestamp field has been renamed to event_timestamp.
2.4 Merging records and start writing to Iceberg
Now that we have verified that our schema is being transformed as needed, it’s time to start the remaining processors and write our events into the Iceberg table. The MergeRecords processor is configured to batch events up to increase efficiency when writing to Iceberg. The final processor, WriteToIceberg takes our Avro records and writes them into a Parquet formatted table.
Tip: You can change the configuration to something like “30 sec” to speed up processing.
Select the MergeRecords processor and explore its configuration. It is configured to batch events up for at least 30 seconds or until the queued up events have reached Maximum Bin Size of 1GB. You will want to lower these for testing.
Start the MergeRecords processor and verify that it batches up events and writes them out after 30 seconds.
Select the WriteToIceberg processor and explore its configuration. Notice how it relies on several parameters to establish a connection to the right database and table.
Start the WriteToIceberg processor and verify that it writes records successfully to Iceberg. If the metrics on the processor increase and you don’t see any warnings or events being written to the failure_WriteToIceberg connection, your writes are successful!
Congratulations! With this you have completed the second use case.
You may want to log into Hue to check your data has loaded.
Feel free to publish your flow to the catalog and create a deployment just like you did for the first one.