zmq_ctx_socket_monitor(3)

zmq_ctx_socket_monitor(3)

ØMQ Manual - ØMQ/4.0.8

Name

zmq_socket_monitor - register a monitoring callback

Synopsis

int zmq_socket_monitor (void *socket, char * *addr, int events);

Description

The zmq_socket_monitor() function shall spawn a PAIR socket that publishes socket state changes (events) over the inproc:// transport to a given endpoint.

Messages consist of 2 Frames, the first containing the event-id and the associated value. The second frame holds the affected endpoint as string.

The layout of the first Frame is: 16 bit event id 32 bit event value

event id and value are in the native byte order (for the machine the application is running on). There is no padding between the fields.

The event value has to be interpreted in the context of the event id. See Supported events below for details.
Only connection oriented (tcp and ipc) transports are supported in this initial
implementation.

Only connection oriented (tcp and ipc) transports are supported in this initial
implementation.

Supported events

ZMQ_EVENT_CONNECTED: connection established

The ZMQ_EVENT_CONNECTED event triggers when a connection has been established to a remote peer. This can happen either synchronous or asynchronous. Value is the FD of the newly connected socket.

ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled

The ZMQ_EVENT_CONNECT_DELAYED event triggers when an immediate connection attempt is delayed and its completion is being polled for. Value has no meaning.

ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt

The ZMQ_EVENT_CONNECT_RETRIED event triggers when a connection attempt is being handled by reconnect timer. The reconnect interval's recomputed for each attempt. Value is the reconnect interval.

ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections

The ZMQ_EVENT_LISTENING event triggers when a socket's successfully bound to a an interface. Value is the FD of the newly bound socket.

ZMQ_EVENT_BIND_FAILED: socket could not bind to an address

The ZMQ_EVENT_BIND_FAILED event triggers when a socket could not bind to a given interface. Value is the errno generated by the bind call.

ZMQ_EVENT_ACCEPTED: connection accepted to bound interface

The ZMQ_EVENT_ACCEPTED event triggers when a connection from a remote peer has been established with a socket's listen address. Value is the FD of the accepted socket.

ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection

The ZMQ_EVENT_ACCEPT_FAILED event triggers when a connection attempt to a socket's bound address fails. Value is the errno generated by accept.

ZMQ_EVENT_CLOSED: connection closed

The ZMQ_EVENT_CLOSED event triggers when a connection's underlying descriptor has been closed. Value is the former FD of the for the closed socket. FD has been closed already!

ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed

The ZMQ_EVENT_CLOSE_FAILED event triggers when a descriptor could not be released back to the OS. Implementation note: ONLY FOR IPC SOCKETS. Value is the errno generated by unlink.

ZMQ_EVENT_DISCONNECTED: broken session

The ZMQ_EVENT_DISCONNECTED event triggers when the stream engine (tcp and ipc specific) detects a corrupted / broken session. Value is the FD of the socket.

Return value

The zmq_socket_monitor() function returns a value of 0 or greater if successful. Otherwise it returns -1 and sets errno to one of the values defined below.

Errors

ETERM
The ØMQ context associated with the specified socket was terminated.
EPROTONOSUPPORT
The requested transport protocol is not supported. Monitor sockets are required to use the inproc:// transport.
EINVAL
The endpoint supplied is invalid.

Example

Observing a rep socket's connection state

#include <stdio.h>
#include <zmq.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>

static int read_msg(void* s, zmq_event_t* event, char* ep)
{
    int rc ;
    zmq_msg_t msg1;  // binary part
    zmq_msg_init (&msg1);
    zmq_msg_t msg2;  //  address part
    zmq_msg_init (&msg2);
    rc = zmq_msg_recv (&msg1, s, 0);
    if (rc == -1 && zmq_errno() == ETERM)
        return 1 ;
    assert (rc != -1);
    assert (zmq_msg_more(&msg1) != 0);
    rc = zmq_msg_recv (&msg2, s, 0);
    if (rc == -1 && zmq_errno() == ETERM)
        return 1;
    assert (rc != -1);
    assert (zmq_msg_more(&msg2) == 0);
    // copy binary data to event struct
    const char* data = (char*)zmq_msg_data(&msg1);
    memcpy(&(event->event), data, sizeof(event->event));
    memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value));
    // copy address part
    const size_t len = zmq_msg_size(&msg2) ;
    ep = memcpy(ep, zmq_msg_data(&msg2), len);
    *(ep + len) = 0 ;
    return 0 ;
}

// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
    zmq_event_t event;
    static char addr[1025] ;
    int rc;

    printf("starting monitor...\n");

    void *s = zmq_socket (ctx, ZMQ_PAIR);
    assert (s);

    rc = zmq_connect (s, "inproc://monitor.rep");
    assert (rc == 0);
    while (!read_msg(s, &event, addr)) {
        switch (event.event) {
        case ZMQ_EVENT_LISTENING:
            printf ("listening socket descriptor %d\n", event.value);
            printf ("listening socket address %s\n", addr);
            break;
        case ZMQ_EVENT_ACCEPTED:
            printf ("accepted socket descriptor %d\n", event.value);
            printf ("accepted socket address %s\n", addr);
            break;
        case ZMQ_EVENT_CLOSE_FAILED:
            printf ("socket close failure error code %d\n", event.value);
            printf ("socket address %s\n", addr);
            break;
        case ZMQ_EVENT_CLOSED:
            printf ("closed socket descriptor %d\n", event.value);
            printf ("closed socket address %s\n", addr);
            break;
        case ZMQ_EVENT_DISCONNECTED:
            printf ("disconnected socket descriptor %d\n", event.value);
            printf ("disconnected socket address %s\n", addr);
            break;
        }
    }
    zmq_close (s);
    return NULL;
}

int main()
{
    const char* addr = "tcp://127.0.0.1:6666" ;
    pthread_t thread ;

    //  Create the infrastructure
    void *ctx = zmq_init (1);
    assert (ctx);

    // REP socket
    void* rep = zmq_socket (ctx, ZMQ_REP);
    assert (rep);

    // REP socket monitor, all events
    int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
    assert (rc == 0);
    rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);
    assert (rc == 0);

    rc = zmq_bind (rep, addr);
    assert (rc == 0);

    // Allow some time for event detection
    zmq_sleep (1);

    // Close the REP socket
    rc = zmq_close (rep);
    assert (rc == 0);

    zmq_term (ctx);

    return 0 ; }

See also

zmq(7)

Authors

This page was written by the ØMQ community. To make a change please read the ØMQ Contribution Policy at http://www.zeromq.org/docs:contributing.