Monday, 4 December 2017

How to use DBCPService to connect to a MySQL database in Apache NiFi




To set up a DBCPService in NiFi you must first change your pom.xml to add the following dependencies (note that these versions are correct as of 4/12/2017 and may have been updated since today)

<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>1.4</version>
</dependency>
<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-dbcp-service-api</artifactId>
    <version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>6.0.6</version>
</dependency>
<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-standard-services-api-nar</artifactId>
    <type>nar</type>
</dependency>

Go to your desired processor and create the following property, this will create the dropdown box which allows you to edit and update the controller service.

public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
        .Builder().name("DBCPService test processor")
        .description("DBCPService test processor")
        .identifiesControllerService(DBCPService.class)
        .required(true)
        .build();

The code below shows how to turn the service into a connection and execute an update, note that the only difference here is the use of dbcpService.getConnection() instead of mySQLDataSource.getConnection() and therefore is easily added into existing code.

final DBCPService dbcpService =  context.getProperty(MY_PROPERTY).asControllerService(DBCPService.class);

Connection connection =  dbcpService.getConnection();

try {
    Statement statement = connection.createStatement();

    statement.executeUpdate("INSERT INTO `temp` (`number`) VALUES ('1')");
} catch (SQLException e) {
    e.printStackTrace();
}

Build the .nar file and you can create the controller service, although most of the properties are fairly standard, here are the two difficult ones.

JDBC URL
jdbc:mysql://localhost:3306/databasename?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
JDBC Driver
com.mysql.jdbc.Driver

Monday, 23 October 2017

How to Serialise Java into JSON in Apache NiFi

This tutorial will explain how to serialise Java into JSON, this allows Java objects to be converted into JSON without any extra programming/annotations/comments and allows your classes to be independent of the methods used for serialisation. The serialisation package that will be used is Jackson Databind the GitHub has some tutorials on how to use this package but we will focus on the following usecase: You have a general data object which contains other objects, for example an Order object may contain a Customer object. These objects are used in many different cases and you do not want to hard code in annotation that would require the package that contains Order/Customers to require Jackson Databind. You would like to take in a Flowfile from one NiFi processor, for example getFile, extract some pieces of information and leave the rest, transferring the serialised list of orders to be saved elsewhere.

1) Create your classes that you will be serialising using Maven. In IntelliJ this is completed by File>New>Project... and selecting Maven from the sidebar. Your groupId will be the package name eg com.yourcompany.core and the artifactId will be the particular bundle name eg orders your pom.xml should look like this:
<groupId>com.yourcompany.core</groupId>
<artifactId>orders</artifactId>
<version>1.0</version>

2) Create your classes, create getters and setters for each field and encapsulate appropriately, you will also need a default empty constructor. Your class should look like the following example:
package com.yourcompany.core;
public class Customer {
    public Customer() {
    }

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
Note:You should not have methods called getX() if you do not have a matching field and setter as it will stop the class from being deserialised later.

3) Run "mvn install" in the console to install these classes into your local maven repository


5) Open \Documents\nifi-dev\nifi-yourname-bundle\nifi-yourname-processors\pom.xml and add the following dependencies
<dependency>
    <groupId>com.yourcompany.core</groupId>
    <artifactId>orders</artifactId>
    <version>1.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

6) Lets assume that you would like to send and ArrayList of orders between your two processors, SerialisableSend and SerialiseableReceive.

Code for SerialisableSend
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile session.create();

    ArrayList<Order> orders = new ArrayList<>();
    orders.add(new Order());
    orders.add(new Order());

    ObjectMapper objectMapper = new ObjectMapper();

    FlowFile output = session.write(flowFile, new OutputStreamCallback(){
        @Override        public void process(InputStream in) throws IOException {
            objectMapper.writeValue(outputStream, orders);
        }
    });
    session.transfer(output, REL_SUCCESS);
}

Code for SerialisableReceive
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    List<Order>[] _orders = new List[1];
    ObjectMapper objectMapper = new ObjectMapper();
    session.read(flowFile, new InputStreamCallback() {
        @Override        public void process(InputStream in) throws IOException {
            _orders[0] = objectMapper.readValue(in, new TypeReference<List<Order>>(){});
        }
    });
    ArrayList<Order> orders = (ArrayList<Order>) _orders[0];
}

Monday, 16 October 2017

A Method to Test NiFi with Java Serialization

This is my current methodology to test NiFi processors when I'm using Serialized objects.

1) Ensure that that the Flowfile being sent is a .ser (it doesn't have to be but this is how Java usually expects serialised objects)
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
    @Override    public void process(OutputStream outputStream) throws IOException {
        ObjectOutputStream out = new ObjectOutputStream(outputStream);
        out.writeObject(queries);
    }
});

output = session.putAttribute(output, CoreAttributes.FILENAME.key(), "postcode-queries-"+postcodes.get(0).getPostcodeArea()+".ser");
session.transfer(output, REL_QUERIES);
2) Use a putFile processor in NiFi to output to a file as well as your module that does not work, here we send the relationship MySQL queries to the putSQL (where we want them to go) and putFile (where the test SQL will go)
3) Copy the .ser file to the nifi-yourbundle-bundle\nifi-yourprocessors-processors\src\test
4) In your unit test file you can now do the following:
@Test
public void testProcessor() {
    try {
        testRunner.enqueue(new FileInputStream(new File("src/test/my-file.ser")));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    }

    testRunner.run();
}
5) Now you can debug to your hearts content and use sout to print details, note that the de-serialisation expects the exact version of the original so if you change the base class that you are are trying to serialise and then deserialise you must run the process to create a .ser file and copy it to the test directory again.

Friday, 13 October 2017

How to Read a Flowfile Line by Line in NiFi

Edit: This original code posted was not accurate. IOUtils.toString(in) returns the whole file, therefore it must be split by \r\n or \n (EOL) into an array and this can be added to the lines arraylist using a foreach loop as shown below:

@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if ( flowFile == null ) {
        return;
    }

    ArrayList<String> lines = new ArrayList<>();
    session.read(flowFile, new InputStreamCallback() {
        @Override        public void process(InputStream in) throws IOException {
            try{
                String text = IOUtils.toString(in);
                String _lines[] = text.split("\\r?\\n");
                for (String l : _lines) {
                    lines.add(l);
                }
            }catch(Exception ex){
            }
        }
    });

Alternative:

ArrayList<String> lines = new ArrayList<>();
session.read(flowFile, new InputStreamCallback() {
    @Override    public void process(InputStream in) throws IOException {
        try{
            String text = IOUtils.toString(in);
            String _lines[] = text.split("\\r?\\n");
            lines.addAll(Arrays.asList(_lines));
        }catch(Exception ex){
        }
    }
});

Thursday, 12 October 2017

How to Add More than One Processor to a Bundle in NiFi (IntelliJ/Windows)

1) Go to nifi-yourbundle-bundle\nifi-yourbundle-processors\src\main\resources\META-INF\services
2) Open org.apache.nifi.processor.Processor
3) Add your processors like so
org.apache.nifi.processors.yourbundle.MyProcessor
org.apache.nifi.processors.yourbundle.MyProcessor2
org.apache.nifi.processors.yourbundle.MyThirdProcessor
4) Apache NiFi will now load these processors in the bundle

How to Create Get and Put Processors in NiFi (Intellij/Windows)

Put: Takes input and puts it in a format eg, putSQL will take the Flowfile from the input and put it into SQL
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if ( flowFile == null ) {
        return;
    }

Get: Takes raw input and puts it into a Flowfile
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.create();

How to Test a Processor Using MyProcessorTest in NiFi (Intellij/Windows)

This guide explains how to test a processor in Apache NiRi using the provided test class generated for MyProcessor. This guide is done by providing a typical problem and giving the solution, like a Q&A and will be updated as I figure out how to do things.

How to queue a file to test input?
public void testProcessor() {
    try {
        testRunner.enqueue(new FileInputStream(new File("src/test/data/your_data.txt")));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    }

    testRunner.run();
}

How to set properties?
public void testProcessor() {

    testRunner.setProperty(MyProcessor.MY_PROPERTY , "value");
    testRunner.run();

}
How to set Flowfile attributes
public void testProcessor() {

    Map< String, String > attributes = new HashMap<>();
    attributes.put( "datasource_id", "2" );
    try {
        testRunner.enqueue(new FileInputStream(new File("src/test/data/108124118589781")), attributes);
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    }
    testRunner.run();
}