module.exports = function (context, IoTHubMessages) { var i64={}; Int64Buffer(i64); var Uint64BE, Int64BE, Uint64LE, Int64LE; const cosmosdbEnabled = true; items = []; IoTHubMessages.forEach(message => { var packet = decode(message,message.toString('hex')); packet.records.map((record)=>{ record.deviceId=packet.deviceId; return record; }).forEach(processSingleRecord); context.bindings.outputDocument = JSON.stringify(items); }); function processSingleRecord(record) { if (cosmosdbEnabled) { // save all records to cosmosdb table var item = { "deviceId" : record.deviceId, "tag" : record.tag, "value" : record.value, "type" : getDataTypeString(record.type), "timestamp" : convertTimestamp(record.timestamp), "timezone" : getTimezoneString(record.timezone) }; items.push(item); context.log('Item: ', item); } }; function decode(buffer,s) { var packet = {}; var readIndex = 0; //context.log('bytes', buffer.toString('hex')); packet.version = buffer.readInt8(readIndex); readIndex++ let deviceIdLength = buffer.readInt16LE(readIndex); readIndex += 2; packet.deviceId = buffer.toString('utf-8', readIndex, readIndex + deviceIdLength); readIndex += deviceIdLength; //context.log("deviceId: ",packet.deviceId); packet.records = []; const advance=(bytesCount)=>{ // console.log("payload :",s.substring(readIndex*2,(readIndex+n)*2)); readIndex+=bytesCount; }; while (readIndex < buffer.length) { //context.log("---next packet"); var record = {}; // record.buffertype = buffer.readInt8(readIndex); // for push packets it's always 2 readIndex++; // context.log("bufferType :",record.buffertype); let tagLength = buffer.readInt16LE(readIndex); readIndex += 2; // context.log("tagLength :",tagLength); record.tag = buffer.toString('utf-8', readIndex, readIndex + tagLength); readIndex += tagLength; // context.log("tag :",record.tag); record.timestamp = winFileTimeToUnixTime(new i64.Int64LE(buffer, readIndex).toString()); readIndex += 8; // context.log("timestamp :",new Date(record.fileTime).toLocaleString()); record.timezone = buffer.readInt16LE(readIndex); readIndex += 2; // context.log("timezone :",record.timezone); record.type = buffer.readInt8(readIndex); readIndex++; // context.log("type:",record.type); let isArray = buffer.readInt8(readIndex); readIndex++; // context.log("isArray :",record.isArray); // arrays are not supported yet record.value=getValue(record.type,buffer,readIndex,advance); // context.log("value :",record.value); packet.records.push(record); } //context.log("Packet: "", packet); return packet; } function winFileTimeToUnixTime(time) { return time / 10000 - 11644473600000; } function getValue(type,buffer,index,advance) { let result; switch (type) { case 0: result=null; break; case 1: result = buffer.readInt8(index); advance(1); break; case 2: result = buffer.readInt16LE(index); advance(2); break; case 3: result = buffer.readInt32LE(index); advance(4); break; case 4: result = new i64.Int64LE(buffer, index).toString(); advance(8); break; case 5: result = buffer.readUInt8(index); advance(1); break; case 6: result = buffer.readUInt16LE(index); advance(2); break; case 7: result = buffer.readUInt32LE(index); advance(4); break; case 8: result = new i64.UInt64LE(buffer, index).toString(); advance(8); break; case 9: result = buffer.readInt8(index); advance(1); break; case 10: result = buffer.readFloatLE(index); advance(4); break; case 11: result = buffer.readDoubleLE(index); advance(8); break; case 12: var length = buffer.readInt32LE(index); index += 4; result = buffer.toString('utf-8', index, index + length); advance(4 + result.length); break; case 14: result = new Int64Buffer.Int64LE(buffer, index).toString(); advance(8); break; case 15: // GUID type result = {}; result['Data1']=buffer.readUInt32LE(index); index+=4; result['Data2']=buffer.readUInt16LE(index); index+=2; result['Data3']=buffer.readUInt16LE(index); index+=2; result['Data4']=[]; for (let i=0;i<8;i++) result['Data4'].push(buffer.readUInt8(index+i)); advance(16); break; default: break; } return result; } /* The following code is extracted and adapted from the library int64-buffer, available at the address https://github.com/kawanet/int64-buffer The MIT License (MIT) Copyright (c) 2015-2016 Yusuke Kawasaki Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ // int64-buffer.js /*jshint -W018 */ // Confusing use of '!'. /*jshint -W030 */ // Expected an assignment or function call and instead saw an expression. /*jshint -W093 */ // Did you mean to return a conditional instead of an assignment? function Int64Buffer(exports) { // constants var UNDEFINED = "undefined"; var BUFFER = (UNDEFINED !== typeof Buffer) && Buffer; var UINT8ARRAY = (UNDEFINED !== typeof Uint8Array) && Uint8Array; var ARRAYBUFFER = (UNDEFINED !== typeof ArrayBuffer) && ArrayBuffer; var ZERO = [0, 0, 0, 0, 0, 0, 0, 0]; var isArray = Array.isArray || _isArray; var BIT32 = 4294967296; var BIT24 = 16777216; // storage class var storage; // Array; // generate classes Uint64BE = factory("Uint64BE", true, true); Int64BE = factory("Int64BE", true, false); Uint64LE = factory("Uint64LE", false, true); Int64LE = factory("Int64LE", false, false); // class factory function factory(name, bigendian, unsigned) { var posH = bigendian ? 0 : 4; var posL = bigendian ? 4 : 0; var pos0 = bigendian ? 0 : 3; var pos1 = bigendian ? 1 : 2; var pos2 = bigendian ? 2 : 1; var pos3 = bigendian ? 3 : 0; var fromPositive = bigendian ? fromPositiveBE : fromPositiveLE; var fromNegative = bigendian ? fromNegativeBE : fromNegativeLE; var proto = Int64.prototype; var isName = "is" + name; var _isInt64 = "_" + isName; // properties proto.buffer = void 0; proto.offset = 0; proto[_isInt64] = true; // methods proto.toNumber = toNumber; proto.toString = toString; proto.toJSON = toNumber; proto.toArray = toArray; // add .toBuffer() method only when Buffer available if (BUFFER) proto.toBuffer = toBuffer; // add .toArrayBuffer() method only when Uint8Array available if (UINT8ARRAY) proto.toArrayBuffer = toArrayBuffer; // isUint64BE, isInt64BE Int64[isName] = isInt64; // CommonJS exports[name] = Int64; return Int64; // constructor function Int64(buffer, offset, value, raddix) { if (!(this instanceof Int64)) return new Int64(buffer, offset, value, raddix); return init(this, buffer, offset, value, raddix); } // isUint64BE, isInt64BE function isInt64(b) { return !!(b && b[_isInt64]); } // initializer function init(that, buffer, offset, value, raddix) { if (UINT8ARRAY && ARRAYBUFFER) { if (buffer instanceof ARRAYBUFFER) buffer = new UINT8ARRAY(buffer); if (value instanceof ARRAYBUFFER) value = new UINT8ARRAY(value); } // Int64BE() style if (!buffer && !offset && !value && !storage) { // shortcut to initialize with zero that.buffer = newArray(ZERO, 0); return; } // Int64BE(value, raddix) style if (!isValidBuffer(buffer, offset)) { var _storage = storage || Array; raddix = offset; value = buffer; offset = 0; buffer = new _storage(8); } that.buffer = buffer; that.offset = offset |= 0; // Int64BE(buffer, offset) style if (UNDEFINED === typeof value) return; // Int64BE(buffer, offset, value, raddix) style if ("string" === typeof value) { fromString(buffer, offset, value, raddix || 10); } else if (isValidBuffer(value, raddix)) { fromArray(buffer, offset, value, raddix); } else if ("number" === typeof raddix) { writeInt32(buffer, offset + posH, value); // high writeInt32(buffer, offset + posL, raddix); // low } else if (value > 0) { fromPositive(buffer, offset, value); // positive } else if (value < 0) { fromNegative(buffer, offset, value); // negative } else { fromArray(buffer, offset, ZERO, 0); // zero, NaN and others } } function fromString(buffer, offset, str, raddix) { var pos = 0; var len = str.length; var high = 0; var low = 0; if (str[0] === "-") pos++; var sign = pos; while (pos < len) { var chr = parseInt(str[pos++], raddix); if (!(chr >= 0)) break; // NaN low = low * raddix + chr; high = high * raddix + Math.floor(low / BIT32); low %= BIT32; } if (sign) { high = ~high; if (low) { low = BIT32 - low; } else { high++; } } writeInt32(buffer, offset + posH, high); writeInt32(buffer, offset + posL, low); } function toNumber() { var buffer = this.buffer; var offset = this.offset; var high = readInt32(buffer, offset + posH); var low = readInt32(buffer, offset + posL); if (!unsigned) high |= 0; // a trick to get signed return high ? (high * BIT32 + low) : low; } function toString(radix) { var buffer = this.buffer; var offset = this.offset; var high = readInt32(buffer, offset + posH); var low = readInt32(buffer, offset + posL); var str = ""; var sign = !unsigned && (high & 0x80000000); if (sign) { high = ~high; low = BIT32 - low; } radix = radix || 10; while (1) { var mod = (high % radix) * BIT32 + low; high = Math.floor(high / radix); low = Math.floor(mod / radix); str = (mod % radix).toString(radix) + str; if (!high && !low) break; } if (sign) { str = "-" + str; } return str; } function writeInt32(buffer, offset, value) { buffer[offset + pos3] = value & 255; value = value >> 8; buffer[offset + pos2] = value & 255; value = value >> 8; buffer[offset + pos1] = value & 255; value = value >> 8; buffer[offset + pos0] = value & 255; } function readInt32(buffer, offset) { return (buffer[offset + pos0] * BIT24) + (buffer[offset + pos1] << 16) + (buffer[offset + pos2] << 8) + buffer[offset + pos3]; } } function toArray(raw) { var buffer = this.buffer; var offset = this.offset; storage = null; // Array if (raw !== false && offset === 0 && buffer.length === 8 && isArray(buffer)) return buffer; return newArray(buffer, offset); } function toBuffer(raw) { var buffer = this.buffer; var offset = this.offset; storage = BUFFER; if (raw !== false && offset === 0 && buffer.length === 8 && Buffer.isBuffer(buffer)) return buffer; var dest = new BUFFER(8); fromArray(dest, 0, buffer, offset); return dest; } function toArrayBuffer(raw) { var buffer = this.buffer; var offset = this.offset; var arrbuf = buffer.buffer; storage = UINT8ARRAY; if (raw !== false && offset === 0 && (arrbuf instanceof ARRAYBUFFER) && arrbuf.byteLength === 8) return arrbuf; var dest = new UINT8ARRAY(8); fromArray(dest, 0, buffer, offset); return dest.buffer; } function isValidBuffer(buffer, offset) { var len = buffer && buffer.length; offset |= 0; return len && (offset + 8 <= len) && ("string" !== typeof buffer[offset]); } function fromArray(destbuf, destoff, srcbuf, srcoff) { destoff |= 0; srcoff |= 0; for (var i = 0; i < 8; i++) { destbuf[destoff++] = srcbuf[srcoff++] & 255; } } function newArray(buffer, offset) { return Array.prototype.slice.call(buffer, offset, offset + 8); } function fromPositiveBE(buffer, offset, value) { var pos = offset + 8; while (pos > offset) { buffer[--pos] = value & 255; value /= 256; } } function fromNegativeBE(buffer, offset, value) { var pos = offset + 8; value++; while (pos > offset) { buffer[--pos] = ((-value) & 255) ^ 255; value /= 256; } } function fromPositiveLE(buffer, offset, value) { var end = offset + 8; while (offset < end) { buffer[offset++] = value & 255; value /= 256; } } function fromNegativeLE(buffer, offset, value) { var end = offset + 8; value++; while (offset < end) { buffer[offset++] = ((-value) & 255) ^ 255; value /= 256; } } // https://github.com/retrofox/is-array function _isArray(val) { return !!val && "[object Array]" == Object.prototype.toString.call(val); } } // Convert unix timestamp to timestamp with format MMM-DD-YYYY HH:MM:SS function convertTimestamp(unixtimestamp){ // Months array var months_arr = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']; // Convert timestamp var date = new Date(unixtimestamp); // Year var year = date.getFullYear(); // Month var month = months_arr[date.getMonth()]; // Day var day = date.getDate(); // Hours var hours = date.getHours(); // Minutes var minutes = "0" + date.getMinutes(); // Seconds var seconds = "0" + date.getSeconds(); // Display date time in MMM-dd-yyyy h:m:s format var convdataTime = month+'-'+day+'-'+year+' '+hours + ':' + minutes.substr(-2) + ':' + seconds.substr(-2); return convdataTime; } // Get descriptive name of data type associated with integer from IoT device message function getDataTypeString(typeVal){ var dataType; switch (typeVal) { case 0: dataType = "Null"; break; case 1: dataType = "Int8"; break; case 2: dataType = "Int16"; break; case 3: dataType = "Int32"; break; case 4: dataType = "Int64"; break; case 5: dataType = "UInt8"; break; case 6: dataType = 'UInt16'; case 7: dataType = "UInt32"; break; case 8: dataType = "UInt64"; break; case 9: dataType = "Bool"; break; case 10: dataType = "Float"; break; case 11: dataType = "Double"; break; case 12: dataType = "String"; break; case 14: dataType = 'DateTime'; case 15: dataType = 'GUID' } return dataType; } // Convert minute UTC timezone offset to format UTC-HH:MM function getTimezoneString(timezone){ var timezoneString; var hourString; var minuteString; var hours = Math.floor(timezone/60); var minutes = timezone % 60; if ((hours < 10) && (hours > 0)){ hourString = '+0' + hours.toString(10); } else if (hours >= 10) { hourString = '+' + hours.toString(10); } else if ((hours > -10) && (hours < 0)){ hourString = '-0' + (-1*hours).toString(10); } else if (hours <= -10){ hourString = hours.toString(10); } if (minutes == 0){ minuteString = '00'; } else { minuteString = minutes.toString(10); } if (hours > 0){ var timezoneString = 'UTC' + hourString + ':' + minuteString ; } else if (hours < 0){ var timezoneString = 'UTC' + hourString + ':' + minuteString ; } else { var timezoneString = 'UTC+00:00'; } return timezoneString; } context.done(); };