Monday, 16 October 2017

A Method to Test NiFi with Java Serialization

This is my current methodology to test NiFi processors when I'm using Serialized objects.

1) Ensure that that the Flowfile being sent is a .ser (it doesn't have to be but this is how Java usually expects serialised objects)
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
    @Override    public void process(OutputStream outputStream) throws IOException {
        ObjectOutputStream out = new ObjectOutputStream(outputStream);
        out.writeObject(queries);
    }
});

output = session.putAttribute(output, CoreAttributes.FILENAME.key(), "postcode-queries-"+postcodes.get(0).getPostcodeArea()+".ser");
session.transfer(output, REL_QUERIES);
2) Use a putFile processor in NiFi to output to a file as well as your module that does not work, here we send the relationship MySQL queries to the putSQL (where we want them to go) and putFile (where the test SQL will go)
3) Copy the .ser file to the nifi-yourbundle-bundle\nifi-yourprocessors-processors\src\test
4) In your unit test file you can now do the following:
@Test
public void testProcessor() {
    try {
        testRunner.enqueue(new FileInputStream(new File("src/test/my-file.ser")));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    }

    testRunner.run();
}
5) Now you can debug to your hearts content and use sout to print details, note that the de-serialisation expects the exact version of the original so if you change the base class that you are are trying to serialise and then deserialise you must run the process to create a .ser file and copy it to the test directory again.

No comments:

Post a Comment