Framed Msgpack RPC for node.js
npm install framed-msgpack-rpcFramed Msgpack RPC (FMPRPC) is an RPC system for node.js. It allows
clients to call remote procedures on servers. An RPC consists of: (1)
a simple string name; (2) an argument that is a single JSON object;
(3) a reply that is also a single JSON object. Of course, those
objects can be arrays, or dictionaries, so arguments and return values
can be complex and interesting.
FMPRPC is a variant of the
Msgpack-RPC
protocol specification for node.js. Msgpack-RPC communicates
binary JSON objects that are efficiently encoded and decoded with the
MessagePack serialization format.
"Framed" Msgpack-RPC differs from standard Msgpack-RPC in a small way:
the encoding of the length of the packet is prepended to each
packet. This way, receivers can efficiently buffer data until a full
packet is available to decode. In an event-based context like node.js,
framing simplifies implementation, and yields a faster decoder,
especially for very large messages.
By convention, RPCs are grouped into _programs_, which can have
one or more _versions_. Each (prog,vers) pair then has a collection
of procedures, meaning an RPC is identified unabmiguously by a
(prog,vers,proc) triple. In practice, these three strings are
joined with "." characters, and the dotted triple is the RPC name.
Due to framing, this protocol is not compatible with existing
Msgpack-RPC systems. This implementation supports TCP transports only
at the current time.
The simplest way to write a server is with the Server
class as below:
``javascript`
var rpc = require('framed-msgpack-rpc');
var srv= new rpc.Server ({
programs : {
"myprog.1" : {
add : function(arg, response) {
response.result(arg.a + arg.b);
}
}
},
port : 8000
});
srv.listen(function (err) {
if (err) {
console.log("Error binding: " + err);
} else {
console.log("Listening!");
}
});
a corresponding client might look like:
`javascript`
var x = rpc.createTransport({ host: '127.0.0.1', port : 8000 });
x.connect(function (err) {
if (err) {
console.log("error connecting");
} else {
var c = new rpc.Client(x, "myprog.1");
c.invoke('add', { a : 5, b : 4}, function(err, response) {
if (err) {
console.log("error in RPC: " + err);
} else {
assert.equal(9, response);
}
x.close();
});
}
});
Or, equivalently, in beautiful
IcedCoffeeScript:
`coffee`
x = rpc.createTransport { host: '127.0.0.1', port : 8000 }
await x.connect defer err
if err
console.log "error connecting"
else
c = new rpc.Client x, "myprog.1"
await c.invoke 'add', { a : 5, b : 4}, defer err, response
if err? then console.log "error in RPC: #{err}"
else assert.equal 9, response
x.close()
It should work to just install with npm:
npm install -g framed-msgpack-rpc
If you install by hand, you will need to install the one dependency,
which is the the Purepack Msgpack library,
available as purepack on npm:
npm install -g purepack
If you are building real applications, it's good to look deeper than
the simple API introduced above. The full library is based on an
abstraction called an FMPRPC Transport. This class represents a
stream of FMPRPC packets. Clients and servers are built on top of
these streams, but not in one-to-one correspondence. That is, several
clients and several servers can share the same Transport object. Thus,
FMPRPC supports multiplexing of many logically separated
application-level streams over the same underlying TCP stream.
The transport mechanics are available via the submodule transport:
`javascript`
var rpc = require('framed-msgpack-rpc');
var transport = rpc.transport;
Transports are auto-allocated in the case of servers (as part of the listen
and connect process), but for clients, you'll find yourself allocating and
connecting them explicitly.
All transports are stream transports and for now are built atop TCP
streams. Eventually we'll roll out support for Unix domain sockets, but there
is no plan for UDP support right now.
#### transport.Transport
`javascript`
var x = new transport.Transport(opts);opts
Make a new TCP transport, where are:
* port - the port to connect tohost
* - the host to connect to, or localhost if none was givenpath
* - the path to connect to, if using Unix domain socketstcp_opts
* - TCP options to pass to node's net.connect method, which {}
is by defaultlog_obj
* - An object to use to log info, warnings, and errors on this console.log
transport. By default, the default logging to will be used.do_tcp_delay
See Logging below.
* - By default, the Transport will setNoDelay onhooks
TCP streams, but if you specify this flag as true, that behavior will
be suppressed.
* - Hooks to be called on connection error and EOF. EspeciallyRobustTransport
useful for s (see below). The known hooks arehooks.connected
* - Called when a transport is connectedhooks.eof
* - Called when a transport hits EOF.dbgr
* - A debugging object. If set, it will turn on RPC tracingdebugger
via the given debugging object. See _Debugging_ below. I would have liked
to call this a , but that's a reserved keyword in node.
The following two options are used internally by Server and Listenertcp_stream
classes, and should not be accessed directly:
* - Wrap an existing TCP stream parent
* - A parent listener object
#### transport.RobustTransport
`javascript`
var x = new transport.RobustTransport(opts, ropts);
A subclass of the above; with some more features:
* If disconnected, will attempt to reconnect until successful.
* Will queue calls issued in between a disconnect and a reconnect.
* Will warn of RPCs that are outstanding for more than the given
threshholds.
The opts dictionary is as in Transport, but there are additionalropts
options that can be specified via :
* reconnect_delay - a float - the number of seconds to wait betweenqueue_max
connection attempts.
* - the maximum number of RPCs to queue while reconnectingwarn_threshhold
* - RPCs that take more than this number of secondserror_threshhold
are warned about via the logging object.
* - RPCs that take more than this number of seconds
are errored about via the logging object. Also, a timer will be set
up to warn after this many seconds if the RPC isn't completed in time,
while the RPC is still outstanding.
#### transport.Transport.connect
`javascript`
x.connect(function (err) { if (!err) { console.log("connected!") } });
Connect a transport if it's not already connected. Takes a single callback,
which takes one parameter --- an error that's null in the case of a
success, and non-null otherwise. In the case of a RobustTransport, theinfo
callback will be fired after the initial connection attempt, but will continue
to reconnect in the background. Additional error and warnings are issued
via the logger object, and an is issued when a connection succeeds.hooks.connected
Also, if a was passed, it will be called on a successful
connection, both the first time, and after any subsequent reconnect.
#### transport.Transport.is_connected
`javascript`
var b = x.is_connected();
Returns a bool, which is true if the transport is currently connected,false
and otherwise.
#### transport.Transport.close
`javascript`
x.close()
Call to actively close the given connection. It will trigger all of the
regular hooks and warnings that an implicit close would. In the case
of a RobustTransport, the transport will not attempt a reconnection.
#### transport.Transport.remote_address
`javascript`
var ip = x.remote_address();
Get the IP address of the remote side of the connection. Note that this
can change for a RobustTransport, if the DNS resolution for the given
hostname was updated and the connection was reestablished. Will
return a string in dotted-quad notation.
#### transport.Transport.get_generation
`javascript`
var g = x.get_generation()
Get the generation number of this stream connection. In the case of a
regular Transport, it's always going to be 1. In the case of a
RobustTransport, this number is incremented every time the
connection is reestablished.
#### transport.Transport.get_logger
`javascript`
var l = x.get_logger()
If you want to grab to the logger on the given transport, use this
method. For instance, you can change the verbosity level with
x.get_logger().set_level(2) if you are using the standard logging
object.
#### transport.Transport.set_logger
`javascript`
x.set_logger(new logger.Logger({prefix : ">", level : logger.WARN}));
Set the logger object on this Transport to be the passed logger.
You can pass a subclass of the given Logger class if you need
custom behavior to fit in with your logging system.
#### transport.Transport.set_debugger
`javascript`
x.set_debugger(obj)
Set a debugging object on a transport. After this is done, the core
will report that an RPC call was made or answered, either on the
server or client. See Debugging below for more details.
#### transport.Transport.set_debug_flags
`javascript`
x.set_debug_flags(flags)
Call set_debugger as above but with an object that will be allocated.debug.Debugger
The object is of type , and is initialized with flagsflags
given by . All debug traces are set to transport's logger objectlog.levels.DEBUG
at the level.
These flags can either be in numerical form (e.g., 0xfff ) or string"a1m"
literal form (e.g., ). If in the latter form, the flags willsflags_to_flags
be converted into the numerical form via .
#### transport.createTransport or rpc.createTransport
`javascript`
var x = rpc.createTransport(opts)
Create either a new Transport or RobustTransport with just one call.opts
The array is as above, but with a few differences. First, theopts here is the merge of the opts and ropts above for the caseRobustTransport
of s; and second, an option of robust : true will
enable the robust variety of the transport.
Note that by default, I like function to use underscores rather than
camel case, but there's a lot of functions like createConnection
in the standard library, so this particular function is in camel
case. Sorry for the inconsistency.
Clients are thin wrappers around Transports, allowing RPC client
calls. Several clients can share the same Transport. Import the
client libraries as a submodule:
`javascript`
var client = require('framed-msgpack-rpc').client;
The API is as follows:
#### client.Client
Make a new RPC client:
``
var c = new client.Client(x, prog);
Where x is a transport.Transport and prog is the name of an RPCprog
program. Examples for are of the form myprog.1, meaning themyprog
program is called and the version is 1.
Given a client, you can now make RPC calls over the specified connection:
#### client.Client.invoke
Use a Client to invoke an RPC as follows:
`javscript`
c.invoke(proc, arg, function(err, res) {});
The parameters are:
* proc - The name of the RPC procedure. It is joined with the
RPC program.version specified when the client was allocated, yieldingerr
a dotted triple that's sent over the wire.
* arg - A JSON object that's the argument to the RPC.
* cb - A callback that's fired once there is a reply to the RPC. null
is in the success case, and non-null otherwise. The res object isnull
optionally returned in a success case, giving the reply to the RPC. If
the server supplied a result, then res can still be null in
the case of success.
#### client.Client.notify
As above, but don't wait for a reply:
`javscript`
c.notify(proc, arg);
Here, there is no callback, and no way to check if the sever received
the message (or got an error). Notifying seems weird to me, but it
was in the original MsgpackRpc system, so it's reproduced here.
To write a server, the programmer must specify a series of hooks
that handle individual RPCs. There are a few ways to achieve these
ends with this library. The big difference is what is the thisserver.Server
object for the hook. In the case of the andserver.SimpleServer classes, the this object is the server itself.server.ContextualServer
In the class, the this object is a
per-connection context object. The first two are good for most cases.
You can get the server library through the submodule server:
`javascript`
var server = require('framed-msgpack-rpc').server;
But most of the classes are also rexported from the top-level module.
#### server.Server
Create a new server object; specify a port to bind to, a host IP
address to bind to, and also a set of RPC handlers.
`javascript`
var s = new server.Server(opts);
For opts, the fields are:
* port - A port to bind tohost
* - A host IP to bind topath
* - A socket path to bind to, if being run on as Unix domain socket.TransportClass
* - A transport class to use when allocating a newtransport.Transport
Transport for an incoming connection. By default, it's log_obj
* - A log object to log errors, and also to assign to make_child
(via ) to child connections. Use the default log classconsole.log
(which logs to ) if unspecified.programs
* - Programs to support, following this JSON schema:
`javascript`
{
prog_1 : {
proc_1 : function (arg, res, x) { / ... / },
proc_2 : function (arg, res, x) { / ... / },
/ etc ... /
},
prog_2 : {
proc_1 : function (arg, res, x) { / ... / }
}
}
Each hook in the object is called once per RPC. The arg argument isres
the argument specified by the remote client. The argument isres.result(some_object)
what the hook should call to send its reply to the client (by calling). A server can also reject the RPC viares.error(some_error_string)). The final argument, x, is thex.remote_address()
transport over which the RPC came in to the server. For instance, the
server can call to figure out who the remote
client is.
#### server.SimpleServer
A SimpleServer behaves like a Server but is simplified in someh_foo
ways. First off, it only handles one program, which is typically
set on object construction. Second off, it depends on inheritance;
I've used CoffeeScript here, but you can use hand-rolled JavaScript
style inheritance too. Finally, it infers your method hooks: on
construction, it iterates over all methods in the current object,
and infers that a hook of the form handles the RPC foo.
Here's an example:
`coffeescript
class MyServer extends server.SimpleServer
constructor : (d) ->
super d
@set_program_name "myprog.1"
h_reflect : (arg, res, x) -> res.result arg
h_null : (arg, res, x) -> res.result null
h_add : (arg, res, x) -> res.result { sum : arg.x + arg.y }
`
Most methods below are good for both SimpleServer and Server.
The former has a few extra; see the code in server.iced.
#### server.ContextualServer
Here's an example:
`coffeescript
class Prog1 extends server.Handler
h_foo : (arg, res) ->
console.log "RPC to foo() from #{@transport.remote_address()}"
res.result { y : arg.i + 2 }
h_bar : (arg, res) -> res.result { y : arg.j * arg.k }
class Prog2 extends server.Handler
h_foo : (arg, res) -> res.error "not yet implemented"
h_bar : (arg, res) -> res.error "not yet implemented"
s = new server.ContextualServer
port : 8881
classes :
"prog.1" : Prog1
"prog.2" : Prog2
await s.listen defer err
console.log "Error: #{err}" if err?
`
This code constructs a server.ContextualServer with a classesProg1
object that maps program names to classes. Whenever a new connection
is established in the above example, a new object and a newProg2 object is created. The former will handle all RPCs toprog.1 on that connection; the latter will handle all RPCs toprog.2. Note that the this object here is per-connection, not
per-server. This allows you to store all sorts of interesting
per-connection state. For more info, please see
server.iced.
#### server.Server.listen
Bind to a port, and listen for incoming connections
`javascript`
s.listen(function(err) {});
On success, the callback is fired with null, and otherwise,
an error object is passed.
#### server.Server.listen_retry
As above, but keep retrying if binding failed:
`javascript`
s.listen_retry(delay, function(err) {});
The retry happens every delay seconds. The given function is callednull
back with once the reconnection happens, or with the actualerr.code = 'EADDRINUSE'
error if it was other than .
#### server.Server.close
Close a server, and give back its port to the OS.
#### server.Server.set_port
Before calling listen, you can use this method to set the portServer
that the is going to bind to.
#### server.Server.walk_children
Walk the list of children, calling the specified function on each
child connection in the list:
`javascript`
s.walk_children (function(ch) {});
As you could imagine, an RPC can generate a lot of errors, warnings, and
informational messages. Examples include: unmarshalling failures,
unexpected EOFs, connection breaking, unhandled RPCs, etc.
This package has an extensible logging system to fit in with your
application, and a default logging system that should work for a lot
of cases too.
The basic classes can be found in the log submodule, accessible as:
`javascript`
var log = require('framed-msgpack-rpc').log;
When a new Listener or Transport class is instantiated, it willListener
need a new logger object (note that is the base class for theServer
various classes). It will try the following steps to pick alog.Logger object:
1. Access the opt.log_obj passed to the Transport or Listener log.Logger
constructor. This is often times an object of a custom subclass
of .log.Logger
1. If that is was not specified, allocate a new object:log.set_default_logger_class
1. If was previous called, allocatelog.Logger
one of those objects.
1. Otherwise, allocate a object.
Once this log.Logger object is allocated, the Transport orListener class will call set_remote on it, so that subsequent log
lines will show which client or server generated the message.
Logging is via the following methods, in ascending order of severity:
`javascript`
log.Logger.debug(msg)
log.Logger.info(msg)
log.Logger.warn(msg)
log.Logger.error(msg)
log.Logger.fatal(msg)
They all, by default, write the message msg to console.log whileremote_address
prepending the supplied above. The default log levellog.levels.INFO
is set to , but you can set it to log.levels.WARN,log.levels.ERRORS, etc. Warnings at lower levels will be silentlylog.Logger.set_level
swallowed. For the default logger object, the method can be used to set the logging level as desired.
To make a custom logger class, you can subclass log.Logger, or useset_remote
duck-typing: just make sure your class implements and the
five leveled logging methods above.
See VLogger in test/all.iced for one example of a different logger
--- it's used to make the regression tests look pretty.
See log.iced for more details.
An debugger is a JavaScript object that is passed into the FMPRPC core,
and if available, is used to dump RPC debug traces. These debuggers can
be installed when a Transport is allocated, by specifying theopts.dbgr option, or by calling set_debugger on most
FMPRPC objects.
If a debugging object is active, it is called with debug.Messagedebug.Message
object when an RPC comes in or goes out. The object
contains a bunch of fields:
`coffeescript`
F =
METHOD : 0x1
REMOTE : 0x2
SEQID : 0x4
TIMESTAMP : 0x8
ERR : 0x10
ARG : 0x20
RES : 0x40
TYPE : 0x80
DIR : 0x100 # which direction -- incoming or outgoing?
Debugging objects can choose to spam some or all of these fields,
depending on how bad the bug is. For most purposes, the supplied
debug.Debugger makes a nice debugger object that you can easily tuneflags
to print only the fields of your choosing (via the parameter).
See debug.iced for more details.
```
npm i
./node_modules/.bin/icake build
To come. See packetizer.iced for details.
To come. See dispatch.iced for details.