1) Create a new Maven project via File > New > Project.. and on the left select Maven
2) The Groupid should be the package name "com.yourcompany.yourpackage", and the Artifactid should be "yourpackage", version "1.0" (or whatever version your package is)
3) Create the classes you want to implement the Serializeable interface, eg. "public class Query implements Serializable {"
4) In the Terminal run "mvn compile" this turns the package into a jar file and "mvn install" to copy it to your local package repository
5) In your processor go to \nifi-yourprocessor-bundle\nifi-yourprocessor-processors and open pom.xml
6) In dependencies add:
<dependency> <groupId>com.yourcompany.yourpackage</groupId> <artifactId>yourpackage</artifactId> <version>1.0</version> </dependency>8) This will allow you to use this class inside the processor
7) In the processor you want to serialize the object use the following code in onTrigger
MySerialisableClass mySerialisableClass = new MySerialisableClass(1); FlowFile output = session.write(flowFile, new OutputStreamCallback(){ @Override public void process(OutputStream outputStream) throws IOException { ObjectOutputStream out = new ObjectOutputStream(outputStream); out.writeObject(mySerialisableClass); } }); output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".txt"); session.transfer(output, MY_RELATIONSHIP);8) In the processor you want to deserialize use the following code in onTrigger
final MySerialisableClass[] mySerialisableClass = new MySerialisableClass[1]; session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { try { // prints out each line 1 by 1 ObjectInputStream objectInputStream = new ObjectInputStream(in); mySerialisableClass[0] = (MySerialisableClass) objectInputStream.readObject(); objectInputStream.close(); } catch (Exception ex) { } } });9) You can test this by creating a Flowfile with the value and outputting the value of "i" MySeralisableClass and outputting the Flowfile to putFile
FlowFile output = session.write(flowFile, new OutputStreamCallback(){ @Override public void process(OutputStream outputStream) throws IOException { IOUtils.write("I = " + mySerialisableClass[0].getI(), outputStream, "UTF-8"); } }); output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".txt"); session.transfer(output, MY_RELATIONSHIP);
No comments:
Post a Comment