zmq_ctx_socket_monitor(3)

zmq_ctx_socket_monitor(3)

ØMQ Manual - ØMQ/3.2.6

Name

zmq_socket_monitor - register a monitoring callback

Synopsis

int zmq_socket_monitor (void *socket, char * *addr, int events);

Description

The zmq_socket_monitor() function shall spawn a PAIR socket that publishes socket state changes (events) over the inproc:// transport to a given endpoint. Messages are zmq_event_t structs. It's recommended to connect via a PAIR socket in another application thread and handle monitoring events there. It's possible to also supply a bitmask (ZMQ_EVENT_ALL or any combination of the ZMQ_EVENT_* constants) of the events you're interested in.
\/\/ monitoring thread
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;

void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);

rc = zmq_connect (s, "inproc:\/\/monitor.req");
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
\/\/ handle socket connected event
break;
case ZMQ_EVENT_CLOSED:
\/\/ handle socket closed event
break;
}
}
zmq_close (s);
return NULL;
}

\/\/ register a monitor endpoint for all socket events
rc = zmq_socket_monitor (req, "inproc:\/\/monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0);

\/\/ spawn a monitoring thread
rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx);
assert (rc == 0);

// monitoring thread
static void *req_socket_monitor (void *ctx)
{
    zmq_event_t event;
    int rc;

    void *s = zmq_socket (ctx, ZMQ_PAIR);
    assert (s);

    rc = zmq_connect (s, "inproc://monitor.req");
    assert (rc == 0);
    while (true) {
        zmq_msg_t msg;
        zmq_msg_init (&msg);
        rc = zmq_recvmsg (s, &msg, 0);
        if (rc == -1 && zmq_errno() == ETERM) break;
        assert (rc != -1);
        memcpy (&event, zmq_msg_data (&msg), sizeof (event));
        switch (event.event) {
        case ZMQ_EVENT_CONNECTED:
            // handle socket connected event
            break;
        case ZMQ_EVENT_CLOSED:
            // handle socket closed event
            break;
        }
    }
    zmq_close (s);
    return NULL;
}

// register a monitor endpoint for all socket events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0);

// spawn a monitoring thread
rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx);
assert (rc == 0);

Only connection oriented (tcp and ipc) transports are supported in this initial implementation.

Supported events are:

ZMQ_EVENT_CONNECTED: connection established

The ZMQ_EVENT_CONNECTED event triggers when a connection has been established to a remote peer. This can happen either synchronous or asynchronous.

Event metadata:

data.connected.addr // peer address data.connected.fd // socket descriptor

ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled

The ZMQ_EVENT_CONNECT_DELAYED event triggers when an immediate connection attempt is delayed and it's completion's being polled for.

Event metadata:

data.connect_delayed.addr // peer address data.connect_delayed.err // errno value

ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt

The ZMQ_EVENT_CONNECT_RETRIED event triggers when a connection attempt is being handled by reconnect timer. The reconnect interval's recomputed for each attempt.

Event metadata:

data.connect_retried.addr // peer address data.connect_retried.interval // computed reconnect interval

ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections

The ZMQ_EVENT_LISTENING event triggers when a socket's successfully bound to a an interface.

Event metadata:

data.listening.addr //  listen address data.listening.fd // socket descriptor

ZMQ_EVENT_BIND_FAILED: socket could not bind to an address

The ZMQ_EVENT_BIND_FAILED event triggers when a socket could not bind to a given interface.

Event metadata:

data.bind_failed.addr // listen address data.bind_failed.err // errno value

ZMQ_EVENT_ACCEPTED: connection accepted to bound interface

The ZMQ_EVENT_ACCEPTED event triggers when a connection from a remote peer has been established with a socket's listen address.

Event metadata:

data.accepted.addr // listen address data.accepted.fd // socket descriptor

ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection

The ZMQ_EVENT_ACCEPT_FAILED event triggers when a connection attempt to a socket's bound address fails.

Event metadata:

data.accept_failed.addr // listen address data.accept_failed.err // errno value

ZMQ_EVENT_CLOSED: connection closed

The ZMQ_EVENT_CLOSED event triggers when a connection's underlying descriptor has been closed.

Event metadata:

data.closed.addr // address data.closed.fd // socket descriptor

ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed

The ZMQ_EVENT_CLOSE_FAILED event triggers when a descriptor could not be released back to the OS.

Event metadata:

data.close_failed.addr // address data.close_failed.err // errno value

ZMQ_EVENT_DISCONNECTED: broken session

The ZMQ_EVENT_DISCONNECTED event triggers when the stream engine (tcp and ipc specific) detects a corrupted / broken session.

Event metadata:

data.disconnected.addr // address data.disconnected.fd // socket descriptor

Return value

The zmq_socket_monitor() function returns a value of 0 or greater if successful. Otherwise it returns -1 and sets errno to one of the values defined below.

Errors

ETERM
The ØMQ context associated with the specified socket was terminated.
EPROTONOSUPPORT
The requested transport protocol is not supported. Monitor sockets are required to use the inproc:// transport.
EINVAL
The endpoint supplied is invalid.

Example

Observing a rep socket's connection state

// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
    zmq_event_t event;
    int rc;

    void *s = zmq_socket (ctx, ZMQ_PAIR);
    assert (s);

    rc = zmq_connect (s, "inproc://monitor.rep");
    assert (rc == 0);
    while (true) {
        zmq_msg_t msg;
        zmq_msg_init (&msg);
        rc = zmq_recvmsg (s, &msg, 0);
        if (rc == -1 && zmq_errno() == ETERM) break;
        assert (rc != -1);
        memcpy (&event, zmq_msg_data (&msg), sizeof (event));
        switch (event.event) {
        case ZMQ_EVENT_LISTENING:
            printf ("listening socket descriptor %d\n", event.data.listening.fd);
            printf ("listening socket address %s\n", event.data.listening.addr);
            break;
        case ZMQ_EVENT_ACCEPTED:
            printf ("accepted socket descriptor %d\n", event.data.accepted.fd);
            printf ("accepted socket address %s\n", event.data.accepted.addr);
            break;
        case ZMQ_EVENT_CLOSE_FAILED:
            printf ("socket close failure error code %d\n", event.data.close_failed.err);
            printf ("socket address %s\n", event.data.close_failed.addr);
            break;
        case ZMQ_EVENT_CLOSED:
            printf ("closed socket descriptor %d\n", event.data.closed.fd);
            printf ("closed socket address %s\n", event.data.closed.addr);
            break;
        case ZMQ_EVENT_DISCONNECTED:
            printf ("disconnected socket descriptor %d\n", event.data.disconnected.fd);
            printf ("disconnected socket address %s\n", event.data.disconnected.addr);
            break;
        }
        zmq_msg_close (&msg);
    }
    zmq_close (s);
    return NULL;
}

//  Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);

// REP socket
rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);

// REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
assert (rc == 0);

rc = zmq_bind (rep, addr);
assert (rc == 0);

// Allow some time for event detection
zmq_sleep (1);

// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0);
 zmq_term (ctx);

See also

zmq(7)

Authors

This ØMQ manual page was written by Lourens Naudé <moc.gnissimdohtem|sneruol#moc.gnissimdohtem|sneruol>