Streaming File Uploads to HDFS

I couldn’t find a good source for this information in one place, that’s a dead giveaway that I should be writing an article.

Some meta discussion, it looks like I’ve been away a while and that’s because I’ve been working on a organizing my cookbook. I used to just post recipes here (in the ‘before times’) but I’ve found that I hate using a digital device when I need to reference a recipe in the heat of the moment. This leads me to the desire for print media and I have been working a long time on my Markdown processing pipeline, so that’s a natural fit. I’ll post the results here and probably put the book and contribution scripts in a dedicated repo in the future.

Caveats

Unfortunately, this article might be incomplete. I saved it as a draft in Hugo for a long time. I was waiting to publish it because I was still working for the company that paid me to write this solution. Fortunately, that company does not exist anymore. I think the information contained here is useful enough to publish even in its disjointed state.

Here’s the portions of the outline that I wrote down but did not complete. I’m not sure I would have had lots to say about them, but I felt that they were important at the time, so I should include them in caveats.

  • Frontend
    • Redux actions?
  • Backend
    • Streams
    • HDFS client library
    • Bare PUT requests and why that didn’t work
  • Transport
    • nginx Body Size
    • Kubernetes Annotations
  • Applications
    • Maybe talk about the spark inference process?

Background

So you have a web app. It’s made with React (though that doesn’t matter) and Express (this matters very much). You also have access to HDFS and that resource has WebHDFS turned on. That’s great! So you try to upload a file. Being a developer, you use the smallest possible thing that can technically be called a file. Eventually you get that working, maybe using body-parser to serialize that data? Maybe appending it to FormData? Cool, that works. Now let’s try the file your boss is actually interested in. 1.5M rows, 192MiB. That sounds fine right? You download crap all day. This is where our story begins.

Larger Files

So you’ll invariably hit a limitation here. If you use a library like body-parser that attempts to serialize the body of a large file, you’ll fail. Either your connection will timeout, some server in the chain will say ‘girl I think my butt gettin’ big’, or you’ll run your heap out of memory like I did:

⮑ Stack Trace
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
 1: node::Abort() [node]
 2: 0x1356bec [node]
 3: v8::Utils::ReportOOMFailure(char const*, bool) [node]
 4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node]
 5: v8::internal::Factory::NewUninitializedFixedArray(int) [node]
 6: 0xf2480c [node]
 7: 0xf3a865 [node]

<--- Last few GCs --->

[16:0x2f5f7c0]   148699 ms: Mark-sweep 824.5 (838.3) -> 824.5 (838.3) MB, 378.4 / 0.0 ms  allocation failure GC in old space requested
[16:0x2f5f7c0]   149081 ms: Mark-sweep 824.5 (838.3) -> 824.4 (835.3) MB, 382.5 / 0.0 ms  last resort
[16:0x2f5f7c0]   149430 ms: Mark-sweep 824.4 (835.3) -> 824.4 (835.3) MB, 348.5 / 0.0 ms  last resort


<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x3e9dbba8799 <JSObject>
    2: SimpleSlice(aka SimpleSlice) [native array.js:1] [bytecode=0x29bde9660b01 offset=41](this=0x1c3717402311 <undefined>,p=0x210e388b9a99 <Uint8Array map = 0x239f21141a79>,O=0,P=124793265,Q=124793265,R=0x210e388b9b59 <JSArray[124793265]>)
    4: ArraySlice [native array.js:1] [bytecode=0x29bde9660529 offset=282](this=0x210e388b9a99 <Uint8Array map = 0x239f21141a79>,as=0,at=0x1c3717402311 <unde...

 8: v8::internal::JSObject::AddDataElement(v8::internal::Handle<v8::internal::JSObject>, unsigned int, v8::internal::Handle<v8::internal::Object>, v8::internal::PropertyAttributes, v8::internal::Object::ShouldThrow) [node]
 9: v8::internal::Object::AddDataProperty(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::PropertyAttributes, v8::internal::Object::ShouldThrow, v8::internal::Object::StoreFromKeyed) [node]
10: v8::internal::JSObject::DefineOwnPropertyIgnoreAttributes(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::PropertyAttributes, v8::internal::Object::ShouldThrow, v8::internal::JSObject::AccessorInfoHandling) [node]
11: v8::internal::JSObject::CreateDataProperty(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::Object::ShouldThrow) [node]
12: v8::internal::JSReceiver::CreateDataProperty(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::Object::ShouldThrow) [node]
13: v8::internal::Runtime_CreateDataProperty(int, v8::internal::Object**, v8::internal::Isolate*) [node]
14: 0x184262e988f8
Aborted (core dumped)

So what is the solution?

Well, by default that Body would have been a stream in Node.js land. The reason you are in this situation is because using middleware to decorate your request objects makes your life easier generally. With body-parser in-place, you get a friendly json object at req.body and usually, that’s all you wanted.

For large files though, we need to stream the request body through our handler. We don’t want to suck the whole thing up, and then put it somewhere else.

Prerequisite

We need to make sure this process for raw curling items to HDFS works.

Note I’m in a Kubernetes environment here, so there are some things that I have to do to even get access to the Name and Data Nodes. I think they’re useful to note, so I’m leaving them in.

First you have to create the port forward to Name Node. This is analogous to configuring the connection string in your JS code:

kubectl -n testing port-forward $(kubectl -n testing get pods | grep splicedb-hdfs-nn-0 | awk '{print $1}') 50070:50070 &

Then we need to do the first PUT request they describe. This is analogous to the first PUT request in the JS flow.

curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/curl/nik.csv?op=CREATE&overwrite=true&noredirect=true"
⮑ curl result
Handling connection for 50070
HTTP/1.1 200 OK
Date: Wed, 29 Jul 2020 16:00:39 GMT
Cache-Control: no-cache
Expires: Wed, 29 Jul 2020 16:00:39 GMT
Date: Wed, 29 Jul 2020 16:00:39 GMT
Pragma: no-cache
X-Content-Type-Options: nosniff
X-FRAME-OPTIONS: SAMEORIGIN
X-XSS-Protection: 1; mode=block
Content-Type: application/json
Transfer-Encoding: chunked

{"Location":"http://splicedb-hdfs-dn-0.splicedb-hdfs-dn.testing.svc.cluster.local:9004/webhdfs/v1/tmp/curl/nik.csv?op=CREATE&namenoderpcaddress=hdfs&createflag=&createparent=true&overwrite=true"}

That first PUT was us just talking to the Name Node. It tells us now where to actually put the file in the Location field of the response. In the JS flow, we’ll just make a new request with this Location. So we’ll set up another forward to the specified Data Node and fire the request.

kubectl -n testing port-forward $(kubectl -n testing get pods | grep splicedb-hdfs-dn-0 | awk '{print $1}') 9004:9004 &
curl -i -X PUT -T ~/1500000_Sales_Records.csv "http://localhost:9004/webhdfs/v1/tmp/curl/nik.csv?op=CREATE&namenoderpcaddress=hdfs&createflag=&createparent=true&overwrite=true"
⮑ curl result
Handling connection for 9004
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Location: hdfs://hdfs/tmp/curl/nik.csv
Content-Length: 0
Access-Control-Allow-Origin: *
Connection: close

Frontend

Lets begin with the very front. When the user comes to your site, you’ll want to be one of the cool kids and have both the ‘click to browse’ and ‘drag and drop’ interfaces available.

Drop Zone

<div onDropCapture={this._dropHandler} onDragOverCapture={this._dragOverHandler}>
    Drop files or <label htmlFor="fileUploadID">click to browse</label>
</div>
<input id="fileUploadID" type="file" name="fileUpload" accept="*/*" onChange={this._fileInputChangedHandler}/>

The <div> here is the zone that is sensitive to the drop event. You can see that action is bound in React to onDropCapture. The meat of _dropHandler is below, all it does is collect the file from the drop and pass them to the file handler.

There is also an input element that will bring up the file selection dialog in your given OS. This too just collects the file and passes it to the file handler.

Note that this input accepts all kinds of files. I didn’t do the inspection to see what kind of files were accepted by the drop zone because I accept all kinds of files. I’m sure you could do that discrimination in the drop handler.

Drop Handler

let files = [];
if (e.dataTransfer.items) {
    // Use DataTransferItemList interface to access the file(s)
    for (let i = 0; i < e.dataTransfer.items.length; i++) {
        // If dropped items aren't files, reject them
        if (e.dataTransfer.items[i].kind === 'file') {
            files.push(e.dataTransfer.items[i].getAsFile());
        }
    }
} else {
    // Use DataTransfer interface to access the file(s)
    for (let i = 0; i < e.dataTransfer.files.length; i++) {
        files.push(e.dataTransfer.files[i]);
    }
}

There are two APIs to consider for the drag operation. You might have a DataTransferItemList or simply a DataTransfer. This code just lets you get the files out of either of those cases. After you get the file(s), you can pass them along to the file handler.

File Handler

if (!(files instanceof FileList) && !Array.isArray(files)) {
    console.error("Unimplemented Browser API");
    return false;
}

if (files.length > 1) {
    // TODO: Just one file for now
    return false;
}

let file = files[0];

Here I’m just making sure that I get one file. Once you have that, you can pass it along to build up a simple fetch.

Fetch Object

let fetchObj = {
    redirect: 'follow',
    method: 'POST',
    // body: new Blob( [ file.data ], { type: 'text/plain' } ),
    body: file,
    headers: {}
}

Here’s the part that I found really nice. When sending this request to Express, if you don’t have a bunch of middleware in the way, you can just send a file. Everything about fetch, Node.js, and Express is built to handle this, just don’t get in the way. file instanceof File === file instanceof Blob… yay!

Backend - Conditional Middleware

This is a feature that I had completely forgotten about. In my day-to-day working with this application, I forget that middleware exists. When you’re working in a handler, it can feel like the req object is this immutable burden that you have to shape to accomplish your task. It’s easy to forget that when you’re working in Express, your perception is shaped by a string of middleware.

Notably, body-parser.

This is a really handy library, until it ruins your day by trying to exhaust a 500Mb Buffer in to memory. You can’t parse a file, you have to deal with it as a stream. But all your sibling routes need body-parser. You’re not going to go around and check for usages of req.body, potentially refactoring your application.

The solution is to create a little bit of logic and place that high up in your middleware chain.

// The only route that uses raw is the upload route.
router.use(function(req, res, next){
  if (req.url.includes("/import/upload")){
    bodyParser.raw({
      type: '*/*',
      limit: process.env.FILE_IMPORT_LIMIT_BYTES,
    })(req, res, next);
  } else {
  bodyParser.json()(req, res, next);
  }
});

// authorized routes....
router.use(accounts)
router.use(applications)
...
router.use(importData);

That’s it! Now you’ve just preserved the middleware for all the other routes and turned it off for a specific route.

The better pattern is to make new router entirely and push the body-parser middleware down the chain to only the routes that need it. But I found this to be my preferred solution because the rest of the /import/* routes really benefited from body-parser. It’s just the /upload where it got in the way.