Uploading measured data using Socket.IO

Task

The current task is it to upload a .csv file to the web server using Socket.IO. Socket.IO is a realtime framework for web applications, but can be used for communication between two NodeJS instances as well. It provides improved communication mechanisms over websockets and some fallback options as well.

What we want to implement will consist of a client, which will read data from a .csv file and transmit this data row by to the web server, which will receive the data, parse and transform it into JSON and then save it in the database.

The Client

The client consists only of a few lines of code. A connection to the server will be established. After that, the file will be read and the content will be split up into rows (by splitting between each new line: ‘\n’ ). Then each row will be sent via the websocket to the server. After sending each row, a ‘done’ sequence will be sent as a simple workaround for the server to see whether the whole file was sent.

var io = require('socket.io-client'),
 fs = require('fs');
 _ = require('lodash');

var socket = io.connect('http://localhost:3002');

fs.readFile('test_data/newFormat.csv', function(err, data) {
   data = data.toString('utf-8');
   data = data.split('\n');
   console.log('Scanned file with '+data.length+' rows');
   _.each(data, function(row) {
     socket.emit('upload', row);
   });
   socket.emit('upload', "####done####");
});

The Server

The server consists of a simple http server, on which the Socket.IO framework will listen for incoming connections. For each connection, a new socket will be opened and each it will receive the data row by row. The data is then parsed into JSON and stored in an array in order to buffer. Only if 1000 elements are in our buffer, the content will be saved in the database. This is because MongoDBs maximum bulk insert size is 1000 objects, and these settings resulted in the fastest execution of both receiving and storing the objects.

'use strict';
var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);
var _ = require('lodash');
var Promise = require('bluebird');
var mongoose = Promise.promisifyAll(require('mongoose')),
Schema = mongoose.Schema,
Measurement = Promise.promisifyAll(require('./model/Measurement'));

mongoose.connectAsync('mongodb://localhost/roadstar_csv')
.then(server.listen(3002));

io.on('connection', function (socket) {
    socket.on('upload', receiveData);
});

var buffer = [];
var ops = [];
var firstTime = true;

function receiveData(chunk) {
     var op;

    if(firstTime) {
       console.time('receiving rows');
       console.time('writing to db');
       firstTime = false;
    } else if(chunk === '####done####') {
       op = Measurement.collection.insertAsync(buffer);
       ops.push(op);
       buffer = [];
 
       console.timeEnd('receiving rows');
       Promise.all(ops)
       .then(function() {
         console.timeEnd('writing to db');
         firstTime = true;
     });
   } else {
         try {
             chunk = dataToJSON(chunk);
             if(buffer.length < 1000) {
                 buffer.push(chunk);
             } else {
                 op = Measurement.collection.insertAsync(buffer);
                 ops.push(op);
                 buffer = [];
                 buffer.push(chunk);
            }
         } catch (err) {
             console.log(err);
         }
     }
}