Thursday 12 October 2017

How to Use the Java Serializable Interface to Pass Objects in Flowfiles in Nifi (IntelliJ/Windows)

This guide explains how to pass objects in Flowfiles using the java serializable interface. This is useful if you have a repository of classes that you would like to use in many different processors, a typical use case might be if you have 2 different input files which share some - but not all data, this data can become objects and can be passed to another processor for that specific data type. For example: If file A and B contain information about sales but both contain information about who made a sale, that information can be parsed in a SalesPerson list and passed via a flowfile to a SalesPersonProcessor which will count sales for that person in each timeframe.

1) Create a new Maven project via File > New > Project.. and on the left select Maven
2) The Groupid should be the package name "com.yourcompany.yourpackage", and the Artifactid should be "yourpackage", version "1.0" (or whatever version your package is)
3) Create the classes you want to implement the Serializeable interface, eg. "public class Query implements Serializable {"
4) In the Terminal run "mvn compile" this turns the package into a jar file and "mvn install" to copy it to your local package repository
5) In your processor go to \nifi-yourprocessor-bundle\nifi-yourprocessor-processors and open pom.xml
6) In dependencies add:
<dependency>
    <groupId>com.yourcompany.yourpackage</groupId>
    <artifactId>yourpackage</artifactId>
    <version>1.0</version>
</dependency>
8) This will allow you to use this class inside the processor
7) In the processor you want to serialize the object use the following code in onTrigger
MySerialisableClass mySerialisableClass = new MySerialisableClass(1);

FlowFile output = session.write(flowFile, new OutputStreamCallback(){
    @Override    public void process(OutputStream outputStream) throws IOException {
        ObjectOutputStream out = new ObjectOutputStream(outputStream);
        out.writeObject(mySerialisableClass);
    }
});

output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".txt");
session.transfer(output, MY_RELATIONSHIP);
8) In the processor you want to deserialize use the following code in onTrigger
final MySerialisableClass[] mySerialisableClass = new MySerialisableClass[1];
session.read(flowFile, new InputStreamCallback() {
    @Override    public void process(InputStream in) throws IOException {
        try {
            // prints out each line 1 by 1            ObjectInputStream objectInputStream = new ObjectInputStream(in);
            mySerialisableClass[0] = (MySerialisableClass) objectInputStream.readObject();
            objectInputStream.close();
        } catch (Exception ex) {
        }
    }
});
9) You can test this by creating a Flowfile with the value and outputting the value of "i" MySeralisableClass and outputting the Flowfile to putFile
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
    @Override    public void process(OutputStream outputStream) throws IOException {
        IOUtils.write("I = " + mySerialisableClass[0].getI(), outputStream, "UTF-8");
    }
});

output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".txt");
session.transfer(output, MY_RELATIONSHIP);

No comments:

Post a Comment