Bulk inserting into HBase with NiFi

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Bulk inserting into HBase with NiFi

Mike Thomsen
We have a very large body of CSV files (well over 1TB) that need to be imported into HBase. For a single 20GB segment, we are looking at having to push easily 100M flowfiles into HBase and most of the JSON files generated are rather small (like 20-250 bytes).

It's going very slowly, and I assume that is because we're taxing the disk very heavily because of the content and provenance repositories coming into play. So I'm wondering if anyone has a suggestion on a good NiFiesque way of solving this. Right now, I'm considering two options:

1. Looking for a way to inject the HBase controller service into an ExecuteScript processor so I can handle the data in large chunks (splitting text and generating a List<Put> inside the processor myself and doing one huge Put)

2. Creating a library that lets me generate HFiles from within an ExecuteScript processor.

What I really need is something fast within NiFi that would let me generate huge blocks of updates for HBase and push them out. Any ideas?

Thanks,

Mike
Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Bryan Bende
Mike,

With the recent record-oriented processors that have come out recently, a good solution would be to implement a PutHBaseRecord processor that would have a Record Reader configured. This way the processor could read in a large CSV without having to convert to individual JSON documents.

One thing to consider is how many records/puts to send in a single call to HBase. Assuming multi-GB csv files you'll want to send portions at a time to avoid having the whole content in memory (some kind of record batch size property), but then you also have to deal with what happens when things fail half way through. If the puts are idempotent then it may be fine to route the whole to failure and try again even if some data was already inserted.

Feel free to create a JIRA for hbase record processors, or I can do it later.

Hope that helps.

-Bryan


On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]> wrote:
We have a very large body of CSV files (well over 1TB) that need to be imported into HBase. For a single 20GB segment, we are looking at having to push easily 100M flowfiles into HBase and most of the JSON files generated are rather small (like 20-250 bytes).

It's going very slowly, and I assume that is because we're taxing the disk very heavily because of the content and provenance repositories coming into play. So I'm wondering if anyone has a suggestion on a good NiFiesque way of solving this. Right now, I'm considering two options:

1. Looking for a way to inject the HBase controller service into an ExecuteScript processor so I can handle the data in large chunks (splitting text and generating a List<Put> inside the processor myself and doing one huge Put)

2. Creating a library that lets me generate HFiles from within an ExecuteScript processor.

What I really need is something fast within NiFi that would let me generate huge blocks of updates for HBase and push them out. Any ideas?

Thanks,

Mike
--
Sent from Gmail Mobile
Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Bryan Bende
Mike,

Just following up on this...

I created this JIRA to track the idea of record-based HBase processors:
https://issues.apache.org/jira/browse/NIFI-4034

Also wanted to mention that with the existing processors, the main way
to scale up would be to increase the concurrent tasks on PutHBaseJson
and also to increase the Batch Size property which defaults to 25. The
Batch Size controls the maximum number of flow files that a concurrent
task will attempt to pull from the queue and send to HBase in one put
operation.

Even with those tweaks your flow may still be getting hammered with
lots of small flow files, but thought I would mention to see if it
helps at all.

-Bryan


On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:

> Mike,
>
> With the recent record-oriented processors that have come out recently, a
> good solution would be to implement a PutHBaseRecord processor that would
> have a Record Reader configured. This way the processor could read in a
> large CSV without having to convert to individual JSON documents.
>
> One thing to consider is how many records/puts to send in a single call to
> HBase. Assuming multi-GB csv files you'll want to send portions at a time to
> avoid having the whole content in memory (some kind of record batch size
> property), but then you also have to deal with what happens when things fail
> half way through. If the puts are idempotent then it may be fine to route
> the whole to failure and try again even if some data was already inserted.
>
> Feel free to create a JIRA for hbase record processors, or I can do it
> later.
>
> Hope that helps.
>
> -Bryan
>
>
> On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]> wrote:
>>
>> We have a very large body of CSV files (well over 1TB) that need to be
>> imported into HBase. For a single 20GB segment, we are looking at having to
>> push easily 100M flowfiles into HBase and most of the JSON files generated
>> are rather small (like 20-250 bytes).
>>
>> It's going very slowly, and I assume that is because we're taxing the disk
>> very heavily because of the content and provenance repositories coming into
>> play. So I'm wondering if anyone has a suggestion on a good NiFiesque way of
>> solving this. Right now, I'm considering two options:
>>
>> 1. Looking for a way to inject the HBase controller service into an
>> ExecuteScript processor so I can handle the data in large chunks (splitting
>> text and generating a List<Put> inside the processor myself and doing one
>> huge Put)
>>
>> 2. Creating a library that lets me generate HFiles from within an
>> ExecuteScript processor.
>>
>> What I really need is something fast within NiFi that would let me
>> generate huge blocks of updates for HBase and push them out. Any ideas?
>>
>> Thanks,
>>
>> Mike
>
> --
> Sent from Gmail Mobile
Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Mike Thomsen
Yeah, it's really getting hammered by the small files. I took a look at the new record APIs and that looked really promising. In fact, I'm taking a shot at creating a variant of PutHBaseJSON that uses the record API. Look fairly straight forward so far. My strategy is roughly like this:

GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute -> PutHBaseJSONRecord

ExecuteScript generates a larger flowfile that contains a structure like this now:

[
  { "key": "XYZ", "value": "ABC" }
]


My intention is to have a JsonPathReader take that bigger flowfile which is a JSON array and iterate over it as a bunch of records to turn into Puts with the new HBase processor. I'm borrowing some code for wiring in the reader from the QueryRecord processor.

So my only question now is, what is the best way to serialize the Record objects to JSON? The PutHBaseJson processor already has a Jackson setup internally. Any suggestions on doing this in a way that doesn't tie me at the hip to a particular reader implementation?

Thanks,

Mike


On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[hidden email]> wrote:
Mike,

Just following up on this...

I created this JIRA to track the idea of record-based HBase processors:
https://issues.apache.org/jira/browse/NIFI-4034

Also wanted to mention that with the existing processors, the main way
to scale up would be to increase the concurrent tasks on PutHBaseJson
and also to increase the Batch Size property which defaults to 25. The
Batch Size controls the maximum number of flow files that a concurrent
task will attempt to pull from the queue and send to HBase in one put
operation.

Even with those tweaks your flow may still be getting hammered with
lots of small flow files, but thought I would mention to see if it
helps at all.

-Bryan


On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:
> Mike,
>
> With the recent record-oriented processors that have come out recently, a
> good solution would be to implement a PutHBaseRecord processor that would
> have a Record Reader configured. This way the processor could read in a
> large CSV without having to convert to individual JSON documents.
>
> One thing to consider is how many records/puts to send in a single call to
> HBase. Assuming multi-GB csv files you'll want to send portions at a time to
> avoid having the whole content in memory (some kind of record batch size
> property), but then you also have to deal with what happens when things fail
> half way through. If the puts are idempotent then it may be fine to route
> the whole to failure and try again even if some data was already inserted.
>
> Feel free to create a JIRA for hbase record processors, or I can do it
> later.
>
> Hope that helps.
>
> -Bryan
>
>
> On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]> wrote:
>>
>> We have a very large body of CSV files (well over 1TB) that need to be
>> imported into HBase. For a single 20GB segment, we are looking at having to
>> push easily 100M flowfiles into HBase and most of the JSON files generated
>> are rather small (like 20-250 bytes).
>>
>> It's going very slowly, and I assume that is because we're taxing the disk
>> very heavily because of the content and provenance repositories coming into
>> play. So I'm wondering if anyone has a suggestion on a good NiFiesque way of
>> solving this. Right now, I'm considering two options:
>>
>> 1. Looking for a way to inject the HBase controller service into an
>> ExecuteScript processor so I can handle the data in large chunks (splitting
>> text and generating a List<Put> inside the processor myself and doing one
>> huge Put)
>>
>> 2. Creating a library that lets me generate HFiles from within an
>> ExecuteScript processor.
>>
>> What I really need is something fast within NiFi that would let me
>> generate huge blocks of updates for HBase and push them out. Any ideas?
>>
>> Thanks,
>>
>> Mike
>
> --
> Sent from Gmail Mobile

Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Bryan Bende
Mike,

Glad to hear that the record API looks promising for what you are trying to do!

Here are a couple of thoughts, and please correct me if I am not
understanding your flow correctly...

We should be able to make a generic PutHBaseRecord processor that uses
any record reader to read the incoming flow file and then converts
each record directly into a PutFlowFile (more on this in a minute).

Once we have PutHBaseRecord, then there may be no need for you to
convert your data from CSV to JSON (unless there is another reason I
am missing) because you can send your CSV data directly into
PutHBaseRecord configured with a CSVRecordReader.

If you are doing other processing/enrichment while going from CSV to
JSON, then you may be able to achieve some of the same things with
processors like UpdateRecord, PartitionRecord, and LookupRecord.
Essentially keeping the initial CSV intact and treating it like
records through the entire flow.

Now back to PutHBaseRecord and the question of how to go from a Record
to a PutFlowFile...

We basically need to know the rowId, column family, and then a list of
column-qualifier/value pairs. I haven't fully though this through yet,
but...

For the row id, we could have a similar strategy as PutHBaseJson,
where the value comes from a "Row Id" property in the processor or
from a "Row Id Record Path" which would evaluate the record path
against the record and use that value for the row id.

For column family, we could probably do the same as above, where it
could be from a property or a record path.

For the list of column-qualifier/value pairs, we can loop over all
fields in the record (skipping the row id and family if using record
fields) and then convert each one into a PutColumn. The bulk of the
work here is going to be taking the value of a field and turning it
into an appropriate byte[], so you'll likely want to use the type of
the field to cast into an appropriate Java type and then figure out
how to represent that as bytes.

I know this was a lot of information, but I hope this helps, and let
me know if anything is not making sense.

Thanks,

Bryan


On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <[hidden email]> wrote:

> Yeah, it's really getting hammered by the small files. I took a look at the
> new record APIs and that looked really promising. In fact, I'm taking a shot
> at creating a variant of PutHBaseJSON that uses the record API. Look fairly
> straight forward so far. My strategy is roughly like this:
>
> GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute ->
> PutHBaseJSONRecord
>
> ExecuteScript generates a larger flowfile that contains a structure like
> this now:
>
> [
>   { "key": "XYZ", "value": "ABC" }
> ]
>
>
> My intention is to have a JsonPathReader take that bigger flowfile which is
> a JSON array and iterate over it as a bunch of records to turn into Puts
> with the new HBase processor. I'm borrowing some code for wiring in the
> reader from the QueryRecord processor.
>
> So my only question now is, what is the best way to serialize the Record
> objects to JSON? The PutHBaseJson processor already has a Jackson setup
> internally. Any suggestions on doing this in a way that doesn't tie me at
> the hip to a particular reader implementation?
>
> Thanks,
>
> Mike
>
>
> On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[hidden email]> wrote:
>>
>> Mike,
>>
>> Just following up on this...
>>
>> I created this JIRA to track the idea of record-based HBase processors:
>> https://issues.apache.org/jira/browse/NIFI-4034
>>
>> Also wanted to mention that with the existing processors, the main way
>> to scale up would be to increase the concurrent tasks on PutHBaseJson
>> and also to increase the Batch Size property which defaults to 25. The
>> Batch Size controls the maximum number of flow files that a concurrent
>> task will attempt to pull from the queue and send to HBase in one put
>> operation.
>>
>> Even with those tweaks your flow may still be getting hammered with
>> lots of small flow files, but thought I would mention to see if it
>> helps at all.
>>
>> -Bryan
>>
>>
>> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:
>> > Mike,
>> >
>> > With the recent record-oriented processors that have come out recently,
>> > a
>> > good solution would be to implement a PutHBaseRecord processor that
>> > would
>> > have a Record Reader configured. This way the processor could read in a
>> > large CSV without having to convert to individual JSON documents.
>> >
>> > One thing to consider is how many records/puts to send in a single call
>> > to
>> > HBase. Assuming multi-GB csv files you'll want to send portions at a
>> > time to
>> > avoid having the whole content in memory (some kind of record batch size
>> > property), but then you also have to deal with what happens when things
>> > fail
>> > half way through. If the puts are idempotent then it may be fine to
>> > route
>> > the whole to failure and try again even if some data was already
>> > inserted.
>> >
>> > Feel free to create a JIRA for hbase record processors, or I can do it
>> > later.
>> >
>> > Hope that helps.
>> >
>> > -Bryan
>> >
>> >
>> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]>
>> > wrote:
>> >>
>> >> We have a very large body of CSV files (well over 1TB) that need to be
>> >> imported into HBase. For a single 20GB segment, we are looking at
>> >> having to
>> >> push easily 100M flowfiles into HBase and most of the JSON files
>> >> generated
>> >> are rather small (like 20-250 bytes).
>> >>
>> >> It's going very slowly, and I assume that is because we're taxing the
>> >> disk
>> >> very heavily because of the content and provenance repositories coming
>> >> into
>> >> play. So I'm wondering if anyone has a suggestion on a good NiFiesque
>> >> way of
>> >> solving this. Right now, I'm considering two options:
>> >>
>> >> 1. Looking for a way to inject the HBase controller service into an
>> >> ExecuteScript processor so I can handle the data in large chunks
>> >> (splitting
>> >> text and generating a List<Put> inside the processor myself and doing
>> >> one
>> >> huge Put)
>> >>
>> >> 2. Creating a library that lets me generate HFiles from within an
>> >> ExecuteScript processor.
>> >>
>> >> What I really need is something fast within NiFi that would let me
>> >> generate huge blocks of updates for HBase and push them out. Any ideas?
>> >>
>> >> Thanks,
>> >>
>> >> Mike
>> >
>> > --
>> > Sent from Gmail Mobile
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Mike Thomsen
Bryan,

I have the processor somewhat operational now, but I'm running into a problem with the record readers. What I've done is basically this:

Ex. JSON:

[
   {
       "key": "ABC", "value": "XYZ"
   },
   {
       "key": "DEF", "value": "LMN"
   }
]

Avro schema:

{
"type": "record",
"name": "GenomeRecord",
"fields": [{
"name": "value",
"type": "string"
},
]
}

1. ExecuteScript iterates over a line and builds a JSON array as mentioned above.
2. PutHBaseRecord is wired to use a JsonPathReader that uses an AvroSchemaRegistry.
  - I put a lot of logging in and can verify it is identifying the schema based on the attribute on the flowfile and looking at the appropriate field while looping over the Record to turn it into a serializable form for a Put.
  - All I get are nulls.
3. My JsonPath has been variously $.value and $[*].value. It just does not seem to want to parse that JSON.

The strategy I was going for is to use the "key" attribute in each JSON object to set the row key for the Put.

Any ideas would be great.

Thanks,

Mike

On Wed, Jun 7, 2017 at 4:40 PM, Bryan Bende <[hidden email]> wrote:
Mike,

Glad to hear that the record API looks promising for what you are trying to do!

Here are a couple of thoughts, and please correct me if I am not
understanding your flow correctly...

We should be able to make a generic PutHBaseRecord processor that uses
any record reader to read the incoming flow file and then converts
each record directly into a PutFlowFile (more on this in a minute).

Once we have PutHBaseRecord, then there may be no need for you to
convert your data from CSV to JSON (unless there is another reason I
am missing) because you can send your CSV data directly into
PutHBaseRecord configured with a CSVRecordReader.

If you are doing other processing/enrichment while going from CSV to
JSON, then you may be able to achieve some of the same things with
processors like UpdateRecord, PartitionRecord, and LookupRecord.
Essentially keeping the initial CSV intact and treating it like
records through the entire flow.

Now back to PutHBaseRecord and the question of how to go from a Record
to a PutFlowFile...

We basically need to know the rowId, column family, and then a list of
column-qualifier/value pairs. I haven't fully though this through yet,
but...

For the row id, we could have a similar strategy as PutHBaseJson,
where the value comes from a "Row Id" property in the processor or
from a "Row Id Record Path" which would evaluate the record path
against the record and use that value for the row id.

For column family, we could probably do the same as above, where it
could be from a property or a record path.

For the list of column-qualifier/value pairs, we can loop over all
fields in the record (skipping the row id and family if using record
fields) and then convert each one into a PutColumn. The bulk of the
work here is going to be taking the value of a field and turning it
into an appropriate byte[], so you'll likely want to use the type of
the field to cast into an appropriate Java type and then figure out
how to represent that as bytes.

I know this was a lot of information, but I hope this helps, and let
me know if anything is not making sense.

Thanks,

Bryan


On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <[hidden email]> wrote:
> Yeah, it's really getting hammered by the small files. I took a look at the
> new record APIs and that looked really promising. In fact, I'm taking a shot
> at creating a variant of PutHBaseJSON that uses the record API. Look fairly
> straight forward so far. My strategy is roughly like this:
>
> GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute ->
> PutHBaseJSONRecord
>
> ExecuteScript generates a larger flowfile that contains a structure like
> this now:
>
> [
>   { "key": "XYZ", "value": "ABC" }
> ]
>
>
> My intention is to have a JsonPathReader take that bigger flowfile which is
> a JSON array and iterate over it as a bunch of records to turn into Puts
> with the new HBase processor. I'm borrowing some code for wiring in the
> reader from the QueryRecord processor.
>
> So my only question now is, what is the best way to serialize the Record
> objects to JSON? The PutHBaseJson processor already has a Jackson setup
> internally. Any suggestions on doing this in a way that doesn't tie me at
> the hip to a particular reader implementation?
>
> Thanks,
>
> Mike
>
>
> On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[hidden email]> wrote:
>>
>> Mike,
>>
>> Just following up on this...
>>
>> I created this JIRA to track the idea of record-based HBase processors:
>> https://issues.apache.org/jira/browse/NIFI-4034
>>
>> Also wanted to mention that with the existing processors, the main way
>> to scale up would be to increase the concurrent tasks on PutHBaseJson
>> and also to increase the Batch Size property which defaults to 25. The
>> Batch Size controls the maximum number of flow files that a concurrent
>> task will attempt to pull from the queue and send to HBase in one put
>> operation.
>>
>> Even with those tweaks your flow may still be getting hammered with
>> lots of small flow files, but thought I would mention to see if it
>> helps at all.
>>
>> -Bryan
>>
>>
>> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:
>> > Mike,
>> >
>> > With the recent record-oriented processors that have come out recently,
>> > a
>> > good solution would be to implement a PutHBaseRecord processor that
>> > would
>> > have a Record Reader configured. This way the processor could read in a
>> > large CSV without having to convert to individual JSON documents.
>> >
>> > One thing to consider is how many records/puts to send in a single call
>> > to
>> > HBase. Assuming multi-GB csv files you'll want to send portions at a
>> > time to
>> > avoid having the whole content in memory (some kind of record batch size
>> > property), but then you also have to deal with what happens when things
>> > fail
>> > half way through. If the puts are idempotent then it may be fine to
>> > route
>> > the whole to failure and try again even if some data was already
>> > inserted.
>> >
>> > Feel free to create a JIRA for hbase record processors, or I can do it
>> > later.
>> >
>> > Hope that helps.
>> >
>> > -Bryan
>> >
>> >
>> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]>
>> > wrote:
>> >>
>> >> We have a very large body of CSV files (well over 1TB) that need to be
>> >> imported into HBase. For a single 20GB segment, we are looking at
>> >> having to
>> >> push easily 100M flowfiles into HBase and most of the JSON files
>> >> generated
>> >> are rather small (like 20-250 bytes).
>> >>
>> >> It's going very slowly, and I assume that is because we're taxing the
>> >> disk
>> >> very heavily because of the content and provenance repositories coming
>> >> into
>> >> play. So I'm wondering if anyone has a suggestion on a good NiFiesque
>> >> way of
>> >> solving this. Right now, I'm considering two options:
>> >>
>> >> 1. Looking for a way to inject the HBase controller service into an
>> >> ExecuteScript processor so I can handle the data in large chunks
>> >> (splitting
>> >> text and generating a List<Put> inside the processor myself and doing
>> >> one
>> >> huge Put)
>> >>
>> >> 2. Creating a library that lets me generate HFiles from within an
>> >> ExecuteScript processor.
>> >>
>> >> What I really need is something fast within NiFi that would let me
>> >> generate huge blocks of updates for HBase and push them out. Any ideas?
>> >>
>> >> Thanks,
>> >>
>> >> Mike
>> >
>> > --
>> > Sent from Gmail Mobile
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Bryan Bende
Mike,

Just out of curiosity, what would the original data for your example
look like that produced that JSON?

Is it a CSV with two lines, like:

ABC, XYZ
DEF, LMN

and then ExecuteScript is turning that into the JSON array?


As far as reading the JSON, I created a simple flow of GeneratFlowFile
-> ConvertRecord -> LogAttribute  where ConvertRecord uses the
JsonPathReader with $.value

https://gist.github.com/bbende/3789a6907a9af09aa7c32413040e7e2b

LogAttribute ends up logging:

[ {
  "value" : "XYZ"
}, {
  "value" : "LMN"
} ]

Which seems correct given that its reading in the JSON with a schema
that only has the field "value" in it.

Let me know if that is not what you are looking for.



On Thu, Jun 8, 2017 at 4:13 PM, Mike Thomsen <[hidden email]> wrote:

> Bryan,
>
> I have the processor somewhat operational now, but I'm running into a
> problem with the record readers. What I've done is basically this:
>
> Ex. JSON:
>
> [
>    {
>        "key": "ABC", "value": "XYZ"
>    },
>    {
>        "key": "DEF", "value": "LMN"
>    }
> ]
>
> Avro schema:
>
> {
> "type": "record",
> "name": "GenomeRecord",
> "fields": [{
> "name": "value",
> "type": "string"
> },
> ]
> }
>
> 1. ExecuteScript iterates over a line and builds a JSON array as mentioned
> above.
> 2. PutHBaseRecord is wired to use a JsonPathReader that uses an
> AvroSchemaRegistry.
>   - I put a lot of logging in and can verify it is identifying the schema
> based on the attribute on the flowfile and looking at the appropriate field
> while looping over the Record to turn it into a serializable form for a Put.
>   - All I get are nulls.
> 3. My JsonPath has been variously $.value and $[*].value. It just does not
> seem to want to parse that JSON.
>
> The strategy I was going for is to use the "key" attribute in each JSON
> object to set the row key for the Put.
>
> Any ideas would be great.
>
> Thanks,
>
> Mike
>
> On Wed, Jun 7, 2017 at 4:40 PM, Bryan Bende <[hidden email]> wrote:
>>
>> Mike,
>>
>> Glad to hear that the record API looks promising for what you are trying
>> to do!
>>
>> Here are a couple of thoughts, and please correct me if I am not
>> understanding your flow correctly...
>>
>> We should be able to make a generic PutHBaseRecord processor that uses
>> any record reader to read the incoming flow file and then converts
>> each record directly into a PutFlowFile (more on this in a minute).
>>
>> Once we have PutHBaseRecord, then there may be no need for you to
>> convert your data from CSV to JSON (unless there is another reason I
>> am missing) because you can send your CSV data directly into
>> PutHBaseRecord configured with a CSVRecordReader.
>>
>> If you are doing other processing/enrichment while going from CSV to
>> JSON, then you may be able to achieve some of the same things with
>> processors like UpdateRecord, PartitionRecord, and LookupRecord.
>> Essentially keeping the initial CSV intact and treating it like
>> records through the entire flow.
>>
>> Now back to PutHBaseRecord and the question of how to go from a Record
>> to a PutFlowFile...
>>
>> We basically need to know the rowId, column family, and then a list of
>> column-qualifier/value pairs. I haven't fully though this through yet,
>> but...
>>
>> For the row id, we could have a similar strategy as PutHBaseJson,
>> where the value comes from a "Row Id" property in the processor or
>> from a "Row Id Record Path" which would evaluate the record path
>> against the record and use that value for the row id.
>>
>> For column family, we could probably do the same as above, where it
>> could be from a property or a record path.
>>
>> For the list of column-qualifier/value pairs, we can loop over all
>> fields in the record (skipping the row id and family if using record
>> fields) and then convert each one into a PutColumn. The bulk of the
>> work here is going to be taking the value of a field and turning it
>> into an appropriate byte[], so you'll likely want to use the type of
>> the field to cast into an appropriate Java type and then figure out
>> how to represent that as bytes.
>>
>> I know this was a lot of information, but I hope this helps, and let
>> me know if anything is not making sense.
>>
>> Thanks,
>>
>> Bryan
>>
>>
>> On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <[hidden email]>
>> wrote:
>> > Yeah, it's really getting hammered by the small files. I took a look at
>> > the
>> > new record APIs and that looked really promising. In fact, I'm taking a
>> > shot
>> > at creating a variant of PutHBaseJSON that uses the record API. Look
>> > fairly
>> > straight forward so far. My strategy is roughly like this:
>> >
>> > GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute ->
>> > PutHBaseJSONRecord
>> >
>> > ExecuteScript generates a larger flowfile that contains a structure like
>> > this now:
>> >
>> > [
>> >   { "key": "XYZ", "value": "ABC" }
>> > ]
>> >
>> >
>> > My intention is to have a JsonPathReader take that bigger flowfile which
>> > is
>> > a JSON array and iterate over it as a bunch of records to turn into Puts
>> > with the new HBase processor. I'm borrowing some code for wiring in the
>> > reader from the QueryRecord processor.
>> >
>> > So my only question now is, what is the best way to serialize the Record
>> > objects to JSON? The PutHBaseJson processor already has a Jackson setup
>> > internally. Any suggestions on doing this in a way that doesn't tie me
>> > at
>> > the hip to a particular reader implementation?
>> >
>> > Thanks,
>> >
>> > Mike
>> >
>> >
>> > On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[hidden email]> wrote:
>> >>
>> >> Mike,
>> >>
>> >> Just following up on this...
>> >>
>> >> I created this JIRA to track the idea of record-based HBase processors:
>> >> https://issues.apache.org/jira/browse/NIFI-4034
>> >>
>> >> Also wanted to mention that with the existing processors, the main way
>> >> to scale up would be to increase the concurrent tasks on PutHBaseJson
>> >> and also to increase the Batch Size property which defaults to 25. The
>> >> Batch Size controls the maximum number of flow files that a concurrent
>> >> task will attempt to pull from the queue and send to HBase in one put
>> >> operation.
>> >>
>> >> Even with those tweaks your flow may still be getting hammered with
>> >> lots of small flow files, but thought I would mention to see if it
>> >> helps at all.
>> >>
>> >> -Bryan
>> >>
>> >>
>> >> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:
>> >> > Mike,
>> >> >
>> >> > With the recent record-oriented processors that have come out
>> >> > recently,
>> >> > a
>> >> > good solution would be to implement a PutHBaseRecord processor that
>> >> > would
>> >> > have a Record Reader configured. This way the processor could read in
>> >> > a
>> >> > large CSV without having to convert to individual JSON documents.
>> >> >
>> >> > One thing to consider is how many records/puts to send in a single
>> >> > call
>> >> > to
>> >> > HBase. Assuming multi-GB csv files you'll want to send portions at a
>> >> > time to
>> >> > avoid having the whole content in memory (some kind of record batch
>> >> > size
>> >> > property), but then you also have to deal with what happens when
>> >> > things
>> >> > fail
>> >> > half way through. If the puts are idempotent then it may be fine to
>> >> > route
>> >> > the whole to failure and try again even if some data was already
>> >> > inserted.
>> >> >
>> >> > Feel free to create a JIRA for hbase record processors, or I can do
>> >> > it
>> >> > later.
>> >> >
>> >> > Hope that helps.
>> >> >
>> >> > -Bryan
>> >> >
>> >> >
>> >> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> We have a very large body of CSV files (well over 1TB) that need to
>> >> >> be
>> >> >> imported into HBase. For a single 20GB segment, we are looking at
>> >> >> having to
>> >> >> push easily 100M flowfiles into HBase and most of the JSON files
>> >> >> generated
>> >> >> are rather small (like 20-250 bytes).
>> >> >>
>> >> >> It's going very slowly, and I assume that is because we're taxing
>> >> >> the
>> >> >> disk
>> >> >> very heavily because of the content and provenance repositories
>> >> >> coming
>> >> >> into
>> >> >> play. So I'm wondering if anyone has a suggestion on a good
>> >> >> NiFiesque
>> >> >> way of
>> >> >> solving this. Right now, I'm considering two options:
>> >> >>
>> >> >> 1. Looking for a way to inject the HBase controller service into an
>> >> >> ExecuteScript processor so I can handle the data in large chunks
>> >> >> (splitting
>> >> >> text and generating a List<Put> inside the processor myself and
>> >> >> doing
>> >> >> one
>> >> >> huge Put)
>> >> >>
>> >> >> 2. Creating a library that lets me generate HFiles from within an
>> >> >> ExecuteScript processor.
>> >> >>
>> >> >> What I really need is something fast within NiFi that would let me
>> >> >> generate huge blocks of updates for HBase and push them out. Any
>> >> >> ideas?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Mike
>> >> >
>> >> > --
>> >> > Sent from Gmail Mobile
>> >
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Bulk inserting into HBase with NiFi

Mike Thomsen
Brian,

Looks like I have approval to open source our implementation of PutHBaseRecord. I'm working on cleaning it up, and will hopefully have time in the near future to submit a pull request on 1.4.0-SNAPSHOT.

Thanks,

Mike

On Thu, Jun 8, 2017 at 4:52 PM, Bryan Bende <[hidden email]> wrote:
Mike,

Just out of curiosity, what would the original data for your example
look like that produced that JSON?

Is it a CSV with two lines, like:

ABC, XYZ
DEF, LMN

and then ExecuteScript is turning that into the JSON array?


As far as reading the JSON, I created a simple flow of GeneratFlowFile
-> ConvertRecord -> LogAttribute  where ConvertRecord uses the
JsonPathReader with $.value

https://gist.github.com/bbende/3789a6907a9af09aa7c32413040e7e2b

LogAttribute ends up logging:

[ {
  "value" : "XYZ"
}, {
  "value" : "LMN"
} ]

Which seems correct given that its reading in the JSON with a schema
that only has the field "value" in it.

Let me know if that is not what you are looking for.



On Thu, Jun 8, 2017 at 4:13 PM, Mike Thomsen <[hidden email]> wrote:
> Bryan,
>
> I have the processor somewhat operational now, but I'm running into a
> problem with the record readers. What I've done is basically this:
>
> Ex. JSON:
>
> [
>    {
>        "key": "ABC", "value": "XYZ"
>    },
>    {
>        "key": "DEF", "value": "LMN"
>    }
> ]
>
> Avro schema:
>
> {
> "type": "record",
> "name": "GenomeRecord",
> "fields": [{
> "name": "value",
> "type": "string"
> },
> ]
> }
>
> 1. ExecuteScript iterates over a line and builds a JSON array as mentioned
> above.
> 2. PutHBaseRecord is wired to use a JsonPathReader that uses an
> AvroSchemaRegistry.
>   - I put a lot of logging in and can verify it is identifying the schema
> based on the attribute on the flowfile and looking at the appropriate field
> while looping over the Record to turn it into a serializable form for a Put.
>   - All I get are nulls.
> 3. My JsonPath has been variously $.value and $[*].value. It just does not
> seem to want to parse that JSON.
>
> The strategy I was going for is to use the "key" attribute in each JSON
> object to set the row key for the Put.
>
> Any ideas would be great.
>
> Thanks,
>
> Mike
>
> On Wed, Jun 7, 2017 at 4:40 PM, Bryan Bende <[hidden email]> wrote:
>>
>> Mike,
>>
>> Glad to hear that the record API looks promising for what you are trying
>> to do!
>>
>> Here are a couple of thoughts, and please correct me if I am not
>> understanding your flow correctly...
>>
>> We should be able to make a generic PutHBaseRecord processor that uses
>> any record reader to read the incoming flow file and then converts
>> each record directly into a PutFlowFile (more on this in a minute).
>>
>> Once we have PutHBaseRecord, then there may be no need for you to
>> convert your data from CSV to JSON (unless there is another reason I
>> am missing) because you can send your CSV data directly into
>> PutHBaseRecord configured with a CSVRecordReader.
>>
>> If you are doing other processing/enrichment while going from CSV to
>> JSON, then you may be able to achieve some of the same things with
>> processors like UpdateRecord, PartitionRecord, and LookupRecord.
>> Essentially keeping the initial CSV intact and treating it like
>> records through the entire flow.
>>
>> Now back to PutHBaseRecord and the question of how to go from a Record
>> to a PutFlowFile...
>>
>> We basically need to know the rowId, column family, and then a list of
>> column-qualifier/value pairs. I haven't fully though this through yet,
>> but...
>>
>> For the row id, we could have a similar strategy as PutHBaseJson,
>> where the value comes from a "Row Id" property in the processor or
>> from a "Row Id Record Path" which would evaluate the record path
>> against the record and use that value for the row id.
>>
>> For column family, we could probably do the same as above, where it
>> could be from a property or a record path.
>>
>> For the list of column-qualifier/value pairs, we can loop over all
>> fields in the record (skipping the row id and family if using record
>> fields) and then convert each one into a PutColumn. The bulk of the
>> work here is going to be taking the value of a field and turning it
>> into an appropriate byte[], so you'll likely want to use the type of
>> the field to cast into an appropriate Java type and then figure out
>> how to represent that as bytes.
>>
>> I know this was a lot of information, but I hope this helps, and let
>> me know if anything is not making sense.
>>
>> Thanks,
>>
>> Bryan
>>
>>
>> On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <[hidden email]>
>> wrote:
>> > Yeah, it's really getting hammered by the small files. I took a look at
>> > the
>> > new record APIs and that looked really promising. In fact, I'm taking a
>> > shot
>> > at creating a variant of PutHBaseJSON that uses the record API. Look
>> > fairly
>> > straight forward so far. My strategy is roughly like this:
>> >
>> > GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute ->
>> > PutHBaseJSONRecord
>> >
>> > ExecuteScript generates a larger flowfile that contains a structure like
>> > this now:
>> >
>> > [
>> >   { "key": "XYZ", "value": "ABC" }
>> > ]
>> >
>> >
>> > My intention is to have a JsonPathReader take that bigger flowfile which
>> > is
>> > a JSON array and iterate over it as a bunch of records to turn into Puts
>> > with the new HBase processor. I'm borrowing some code for wiring in the
>> > reader from the QueryRecord processor.
>> >
>> > So my only question now is, what is the best way to serialize the Record
>> > objects to JSON? The PutHBaseJson processor already has a Jackson setup
>> > internally. Any suggestions on doing this in a way that doesn't tie me
>> > at
>> > the hip to a particular reader implementation?
>> >
>> > Thanks,
>> >
>> > Mike
>> >
>> >
>> > On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[hidden email]> wrote:
>> >>
>> >> Mike,
>> >>
>> >> Just following up on this...
>> >>
>> >> I created this JIRA to track the idea of record-based HBase processors:
>> >> https://issues.apache.org/jira/browse/NIFI-4034
>> >>
>> >> Also wanted to mention that with the existing processors, the main way
>> >> to scale up would be to increase the concurrent tasks on PutHBaseJson
>> >> and also to increase the Batch Size property which defaults to 25. The
>> >> Batch Size controls the maximum number of flow files that a concurrent
>> >> task will attempt to pull from the queue and send to HBase in one put
>> >> operation.
>> >>
>> >> Even with those tweaks your flow may still be getting hammered with
>> >> lots of small flow files, but thought I would mention to see if it
>> >> helps at all.
>> >>
>> >> -Bryan
>> >>
>> >>
>> >> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[hidden email]> wrote:
>> >> > Mike,
>> >> >
>> >> > With the recent record-oriented processors that have come out
>> >> > recently,
>> >> > a
>> >> > good solution would be to implement a PutHBaseRecord processor that
>> >> > would
>> >> > have a Record Reader configured. This way the processor could read in
>> >> > a
>> >> > large CSV without having to convert to individual JSON documents.
>> >> >
>> >> > One thing to consider is how many records/puts to send in a single
>> >> > call
>> >> > to
>> >> > HBase. Assuming multi-GB csv files you'll want to send portions at a
>> >> > time to
>> >> > avoid having the whole content in memory (some kind of record batch
>> >> > size
>> >> > property), but then you also have to deal with what happens when
>> >> > things
>> >> > fail
>> >> > half way through. If the puts are idempotent then it may be fine to
>> >> > route
>> >> > the whole to failure and try again even if some data was already
>> >> > inserted.
>> >> >
>> >> > Feel free to create a JIRA for hbase record processors, or I can do
>> >> > it
>> >> > later.
>> >> >
>> >> > Hope that helps.
>> >> >
>> >> > -Bryan
>> >> >
>> >> >
>> >> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> We have a very large body of CSV files (well over 1TB) that need to
>> >> >> be
>> >> >> imported into HBase. For a single 20GB segment, we are looking at
>> >> >> having to
>> >> >> push easily 100M flowfiles into HBase and most of the JSON files
>> >> >> generated
>> >> >> are rather small (like 20-250 bytes).
>> >> >>
>> >> >> It's going very slowly, and I assume that is because we're taxing
>> >> >> the
>> >> >> disk
>> >> >> very heavily because of the content and provenance repositories
>> >> >> coming
>> >> >> into
>> >> >> play. So I'm wondering if anyone has a suggestion on a good
>> >> >> NiFiesque
>> >> >> way of
>> >> >> solving this. Right now, I'm considering two options:
>> >> >>
>> >> >> 1. Looking for a way to inject the HBase controller service into an
>> >> >> ExecuteScript processor so I can handle the data in large chunks
>> >> >> (splitting
>> >> >> text and generating a List<Put> inside the processor myself and
>> >> >> doing
>> >> >> one
>> >> >> huge Put)
>> >> >>
>> >> >> 2. Creating a library that lets me generate HFiles from within an
>> >> >> ExecuteScript processor.
>> >> >>
>> >> >> What I really need is something fast within NiFi that would let me
>> >> >> generate huge blocks of updates for HBase and push them out. Any
>> >> >> ideas?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Mike
>> >> >
>> >> > --
>> >> > Sent from Gmail Mobile
>> >
>> >
>
>