1) Create your classes that you will be serialising using Maven. In IntelliJ this is completed by File>New>Project... and selecting Maven from the sidebar. Your groupId will be the package name eg com.yourcompany.core and the artifactId will be the particular bundle name eg orders your pom.xml should look like this:
<groupId>com.yourcompany.core</groupId> <artifactId>orders</artifactId> <version>1.0</version>
2) Create your classes, create getters and setters for each field and encapsulate appropriately, you will also need a default empty constructor. Your class should look like the following example:package com.yourcompany.core;
public class Customer { public Customer() { } private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } }Note:You should not have methods called getX() if you do not have a matching field and setter as it will stop the class from being deserialised later.
3) Run "mvn install" in the console to install these classes into your local maven repository
5) Open \Documents\nifi-dev\nifi-yourname-bundle\nifi-yourname-processors\pom.xml and add the following dependencies
<dependency> <groupId>com.yourcompany.core</groupId> <artifactId>orders</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
6) Lets assume that you would like to send and ArrayList of orders between your two processors, SerialisableSend and SerialiseableReceive.
Code for SerialisableSend
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile session.create();
ArrayList<Order> orders = new ArrayList<>(); orders.add(new Order()); orders.add(new Order()); ObjectMapper objectMapper = new ObjectMapper(); FlowFile output = session.write(flowFile, new OutputStreamCallback(){
@Override public void process(InputStream in) throws IOException {
objectMapper.writeValue(outputStream, orders);
}
}); session.transfer(output, REL_SUCCESS);
}
Code for SerialisableReceive
@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } List<Order>[] _orders = new List[1]; ObjectMapper objectMapper = new ObjectMapper(); session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { _orders[0] = objectMapper.readValue(in, new TypeReference<List<Order>>(){}); } }); ArrayList<Order> orders = (ArrayList<Order>) _orders[0];
}
No comments:
Post a Comment