Nifi as a gateway for our transfer system

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Nifi as a gateway for our transfer system

Frank, Daniel
Hello,

I just found Apache Nifi recently and I decided to use it to take care of our ftp transfer system, which basically means to monitor the incoming data and generate Flowfiles for each incoming file.
The basic flow I have in mind is:
1. GetFile to monitor the incoming directory
2. Lookup the actions for each Flowfile based on their path and filename attributes in a database and store them as an attribute
3. Figure out the next action to do by looking at that attribute and route the Flowfile into the correct flow implementing that action (e.g. execute a command or send an email)
4. After that action has completed, send the Flowfile back to step 3 if there are more actions that need to be done

I'm somewhat stumped on how to implement that in a sensible way with Nifi. Step 2 will result in something like the following json and I guess I have to write my own processor for that:
[ { "unzip": null }, {"email": {"to": "[hidden email]", "cc": "[hidden email]", "withAttachment": false}, { "execute": "/bin/true" }, { "execute": "/bin/anothercommand " } ]

Now I can store that as an attribute, but apparently EvaluateJsonPath like all other Evaluate* processors are unable to parse attributes and storing them as content is not possible, as the existing content is what I have to work on later.
I could also store just the next action type (unzip, email, execute) as an Attribute for the RouteOnAttribute processor, but that means I need to run the Flowfile through yet another script and even more scripts once it's been routed to the flow implementing the action.

Due to the amount of scripts involved I'm beginning to question if Nifi is the right tool for this job. Am I missing an obvious solution to this?

Regards,
Daniel Frank

Reply | Threaded
Open this post in threaded view
|

Re: Nifi as a gateway for our transfer system

Bryan Bende
Hi Daniel,

This seems like a task that NiFi can help with...

For step #2 I think you are correct that there isn't an existing DB
processor that looks up a row and writes it to a flow file attribute.
You could definitely implement a custom processor to do this, or use
ExecuteScript.

In the most recent release (1.3.0), there is a new concept of a
"lookup service" and there is a processor LookupAttribute [1] that can
pass keys to a lookup service and add the results to a flow file.
There are a few available lookup services, such as properties file,
CSV, XML, etc., but it might be possible to implement one against a
relational database table which might help with what you are trying to
do, then you would use LookupAttribute configured with the DB
LookupService.

As far as the JSON processing... NiFi has an expression language (EL)
that can be used in properties and there is a jsonPath function in EL
[2]. After you have the JSON in an attribute, you could use an
UpdateAttribute processor and declare a new property like email =
${myJson:jsonPath('$.email')} which would add a new attribute called
email with the value being everything from the email , and you could
add as many of these as needed to parse out the information you want
from the JSON.

Hope that helps.

-Bryan

[1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.LookupAttribute/index.html
[2] https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html

On Tue, Jun 13, 2017 at 1:17 PM, Frank, Daniel
<[hidden email]> wrote:

> Hello,
>
> I just found Apache Nifi recently and I decided to use it to take care of our ftp transfer system, which basically means to monitor the incoming data and generate Flowfiles for each incoming file.
> The basic flow I have in mind is:
> 1. GetFile to monitor the incoming directory
> 2. Lookup the actions for each Flowfile based on their path and filename attributes in a database and store them as an attribute
> 3. Figure out the next action to do by looking at that attribute and route the Flowfile into the correct flow implementing that action (e.g. execute a command or send an email)
> 4. After that action has completed, send the Flowfile back to step 3 if there are more actions that need to be done
>
> I'm somewhat stumped on how to implement that in a sensible way with Nifi. Step 2 will result in something like the following json and I guess I have to write my own processor for that:
> [ { "unzip": null }, {"email": {"to": "[hidden email]", "cc": "[hidden email]", "withAttachment": false}, { "execute": "/bin/true" }, { "execute": "/bin/anothercommand " } ]
>
> Now I can store that as an attribute, but apparently EvaluateJsonPath like all other Evaluate* processors are unable to parse attributes and storing them as content is not possible, as the existing content is what I have to work on later.
> I could also store just the next action type (unzip, email, execute) as an Attribute for the RouteOnAttribute processor, but that means I need to run the Flowfile through yet another script and even more scripts once it's been routed to the flow implementing the action.
>
> Due to the amount of scripts involved I'm beginning to question if Nifi is the right tool for this job. Am I missing an obvious solution to this?
>
> Regards,
> Daniel Frank
>
Reply | Threaded
Open this post in threaded view
|

AW: Nifi as a gateway for our transfer system

Frank, Daniel
Hello Bryan,

thank you, that seems to be exactly what I was missing. I'll have a closer look later today.

Regards,
Daniel


> -----Urspr√ľngliche Nachricht-----
> Von: Bryan Bende [mailto:[hidden email]]
> Gesendet: Dienstag, 13. Juni 2017 21:31
> An: [hidden email]
> Betreff: Re: Nifi as a gateway for our transfer system
>
> Hi Daniel,
>
> This seems like a task that NiFi can help with...
>
> For step #2 I think you are correct that there isn't an existing DB
> processor that looks up a row and writes it to a flow file attribute.
> You could definitely implement a custom processor to do this, or use
> ExecuteScript.
>
> In the most recent release (1.3.0), there is a new concept of a
> "lookup service" and there is a processor LookupAttribute [1] that can
> pass keys to a lookup service and add the results to a flow file.
> There are a few available lookup services, such as properties file,
> CSV, XML, etc., but it might be possible to implement one against a
> relational database table which might help with what you are trying to
> do, then you would use LookupAttribute configured with the DB
> LookupService.
>
> As far as the JSON processing... NiFi has an expression language (EL)
> that can be used in properties and there is a jsonPath function in EL
> [2]. After you have the JSON in an attribute, you could use an
> UpdateAttribute processor and declare a new property like email =
> ${myJson:jsonPath('$.email')} which would add a new attribute called
> email with the value being everything from the email , and you could
> add as many of these as needed to parse out the information you want
> from the JSON.
>
> Hope that helps.
>
> -Bryan
>
> [1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-
> standard-
> nar/1.3.0/org.apache.nifi.processors.standard.LookupAttribute/index.html
> [2] https://nifi.apache.org/docs/nifi-docs/html/expression-language-
> guide.html
>
> On Tue, Jun 13, 2017 at 1:17 PM, Frank, Daniel
> <[hidden email]> wrote:
> > Hello,
> >
> > I just found Apache Nifi recently and I decided to use it to take care of our
> ftp transfer system, which basically means to monitor the incoming data and
> generate Flowfiles for each incoming file.
> > The basic flow I have in mind is:
> > 1. GetFile to monitor the incoming directory
> > 2. Lookup the actions for each Flowfile based on their path and filename
> attributes in a database and store them as an attribute
> > 3. Figure out the next action to do by looking at that attribute and route the
> Flowfile into the correct flow implementing that action (e.g. execute a
> command or send an email)
> > 4. After that action has completed, send the Flowfile back to step 3 if there
> are more actions that need to be done
> >
> > I'm somewhat stumped on how to implement that in a sensible way with
> Nifi. Step 2 will result in something like the following json and I guess I have
> to write my own processor for that:
> > [ { "unzip": null }, {"email": {"to": "[hidden email]", "cc":
> "[hidden email]", "withAttachment": false}, { "execute":
> "/bin/true" }, { "execute": "/bin/anothercommand " } ]
> >
> > Now I can store that as an attribute, but apparently EvaluateJsonPath like
> all other Evaluate* processors are unable to parse attributes and storing
> them as content is not possible, as the existing content is what I have to work
> on later.
> > I could also store just the next action type (unzip, email, execute) as an
> Attribute for the RouteOnAttribute processor, but that means I need to run
> the Flowfile through yet another script and even more scripts once it's been
> routed to the flow implementing the action.
> >
> > Due to the amount of scripts involved I'm beginning to question if Nifi is the
> right tool for this job. Am I missing an obvious solution to this?
> >
> > Regards,
> > Daniel Frank
> >