zsock(3)

zsock(3)

CZMQ Manual - CZMQ/3.0.1

Name

zsock - high-level socket API that hides libzmq contexts and sockets

Synopsis

//  Create a new socket. Returns the new socket, or NULL if the new socket
//  could not be created. Note that the symbol zsock_new (and other
//  constructors/destructors for zsock) are redirected to the *_checked
//  variant, enabling intelligent socket leak detection. This can have
//  performance implications if you use a LOT of sockets. To turn off this
//  redirection behaviour, define ZSOCK_NOCHECK.
CZMQ_EXPORT zsock_t *
    zsock_new (int type);

//  Destroy the socket. You must use this for any socket created via the
//  zsock_new method.
CZMQ_EXPORT void
    zsock_destroy (zsock_t **self_p);

//  Create a PUB socket. Default action is bind.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_pub (const char *endpoint);

//  Create a SUB socket, and optionally subscribe to some prefix string. Default
//  action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_sub (const char *endpoint, const char *subscribe);

//  Create a REQ socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_req (const char *endpoint);

//  Create a REP socket. Default action is bind.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_rep (const char *endpoint);

//  Create a DEALER socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_dealer (const char *endpoint);

//  Create a ROUTER socket. Default action is bind.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_router (const char *endpoint);

//  Create a PUSH socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_push (const char *endpoint);

//  Create a PULL socket. Default action is bind.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_pull (const char *endpoint);

//  Create an XPUB socket. Default action is bind.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_xpub (const char *endpoint);

//  Create an XSUB socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_xsub (const char *endpoint);

//  Create a PAIR socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_pair (const char *endpoint);

//  Create a STREAM socket. Default action is connect.
//  The caller is responsible for destroying the return value when finished with it.
CZMQ_EXPORT zsock_t *
    zsock_new_stream (const char *endpoint);

//  Bind a socket to a formatted endpoint. For tcp:// endpoints, supports
//  ephemeral ports, if you specify the port number as "*". By default
//  zsock uses the IANA designated range from C000 (49152) to FFFF (65535).
//  To override this range, follow the "*" with "[first-last]". Either or
//  both first and last may be empty. To bind to a random port within the
//  range, use "!" in place of "*".
//
//  Examples:
//      tcp://127.0.0.1:*           bind to first free port from C000 up
//      tcp://127.0.0.1:!           bind to random port from C000 to FFFF
//      tcp://127.0.0.1:*[60000-]   bind to first free port from 60000 up
//      tcp://127.0.0.1:![-60000]   bind to random port from C000 to 60000
//      tcp://127.0.0.1:![55000-55999]
//                                  bind to random port from 55000 to 55999
//
//  On success, returns the actual port number used, for tcp:// endpoints,
//  and 0 for other transports. On failure, returns -1. Note that when using
//  ephemeral ports, a port may be reused by different services without
//  clients being aware. Protocols that run on ephemeral ports should take
//  this into account.
CZMQ_EXPORT int
    zsock_bind (zsock_t *self, const char *format, ...) CHECK_PRINTF (2);

//  Returns last bound endpoint, if any.
CZMQ_EXPORT const char *
    zsock_endpoint (zsock_t *self);

//  Unbind a socket from a formatted endpoint.
//  Returns 0 if OK, -1 if the endpoint was invalid or the function
//  isn't supported.
CZMQ_EXPORT int
    zsock_unbind (zsock_t *self, const char *format, ...) CHECK_PRINTF (2);

//  Connect a socket to a formatted endpoint
//  Returns 0 if OK, -1 if the endpoint was invalid.
CZMQ_EXPORT int
    zsock_connect (zsock_t *self, const char *format, ...) CHECK_PRINTF (2);

//  Disconnect a socket from a formatted endpoint
//  Returns 0 if OK, -1 if the endpoint was invalid or the function
//  isn't supported.
CZMQ_EXPORT int
    zsock_disconnect (zsock_t *self, const char *format, ...) CHECK_PRINTF (2);

//  Attach a socket to zero or more endpoints. If endpoints is not null,
//  parses as list of ZeroMQ endpoints, separated by commas, and prefixed by
//  '@' (to bind the socket) or '>' (to connect the socket). Returns 0 if all
//  endpoints were valid, or -1 if there was a syntax error. If the endpoint
//  does not start with '@' or '>', the serverish argument defines whether
//  it is used to bind (serverish = true) or connect (serverish = false).
CZMQ_EXPORT int
    zsock_attach (zsock_t *self, const char *endpoints, bool serverish);

//  Returns socket type as printable constant string.
CZMQ_EXPORT const char *
    zsock_type_str (zsock_t *self);

//  Send a 'picture' message to the socket (or actor). The picture is a
//  string that defines the type of each frame. This makes it easy to send
//  a complex multiframe message in one call. The picture can contain any
//  of these characters, each corresponding to one or two arguments:
//
//      i = int (signed)
//      1 = uint8_t
//      2 = uint16_t
//      4 = uint32_t
//      8 = uint64_t
//      s = char *
//      b = byte *, size_t (2 arguments)
//      c = zchunk_t *
//      f = zframe_t *
//      h = zhashx_t *
//      U = zuuid_t *
//      p = void * (sends the pointer value, only meaningful over inproc)
//      m = zmsg_t * (sends all frames in the zmsg)
//      z = sends zero-sized frame (0 arguments)
//      u = uint (deprecated)
//
//  Note that s, b, c, and f are encoded the same way and the choice is
//  offered as a convenience to the sender, which may or may not already
//  have data in a zchunk or zframe. Does not change or take ownership of
//  any arguments. Returns 0 if successful, -1 if sending failed for any
//  reason.
CZMQ_EXPORT int
    zsock_send (void *self, const char *picture, ...);

//  Send a 'picture' message to the socket (or actor). This is a va_list
//  version of zsock_send (), so please consult its documentation for the
//  details.
CZMQ_EXPORT int
    zsock_vsend (void *self, const char *picture, va_list argptr);

//  Receive a 'picture' message to the socket (or actor). See zsock_send for
//  the format and meaning of the picture. Returns the picture elements into
//  a series of pointers as provided by the caller:
//
//      i = int * (stores signed integer)
//      4 = uint32_t * (stores 32-bit unsigned integer)
//      8 = uint64_t * (stores 64-bit unsigned integer)
//      s = char ** (allocates new string)
//      b = byte **, size_t * (2 arguments) (allocates memory)
//      c = zchunk_t ** (creates zchunk)
//      f = zframe_t ** (creates zframe)
//      U = zuuid_t * (creates a zuuid with the data)
//      h = zhashx_t ** (creates zhashx)
//      p = void ** (stores pointer)
//      m = zmsg_t ** (creates a zmsg with the remaing frames)
//      z = null, asserts empty frame (0 arguments)
//      u = uint * (stores unsigned integer, deprecated)
//
//  Note that zsock_recv creates the returned objects, and the caller must
//  destroy them when finished with them. The supplied pointers do not need
//  to be initialized. Returns 0 if successful, or -1 if it failed to recv
//  a message, in which case the pointers are not modified. When message
//  frames are truncated (a short message), sets return values to zero/null.
//  If an argument pointer is NULL, does not store any value (skips it).
//  An 'n' picture matches an empty frame; if the message does not match,
//  the method will return -1.
CZMQ_EXPORT int
    zsock_recv (void *self, const char *picture, ...);

//  Receive a 'picture' message from the socket (or actor). This is a
//  va_list version of zsock_recv (), so please consult its documentation
//  for the details.
CZMQ_EXPORT int
    zsock_vrecv (void *self, const char *picture, va_list argptr);

//  Send a binary encoded 'picture' message to the socket (or actor). This
//  method is similar to zsock_send, except the arguments are encoded in a
//  binary format that is compatible with zproto, and is designed to reduce
//  memory allocations. The pattern argument is a string that defines the
//  type of each argument. Supports these argument types:
//
//   pattern    C type                  zproto type:
//      1       uint8_t                 type = "number" size = "1"
//      2       uint16_t                type = "number" size = "2"
//      4       uint32_t                type = "number" size = "3"
//      8       uint64_t                type = "number" size = "4"
//      s       char *, 0-255 chars     type = "string"
//      S       char *, 0-2^32-1 chars  type = "longstr"
//      c       zchunk_t *              type = "chunk"
//      f       zframe_t *              type = "frame"
//      u       zuuid_t *               type = "uuid"
//      m       zmsg_t *                type = "msg"
//      p       void *, sends pointer value, only over inproc
//
//  Does not change or take ownership of any arguments. Returns 0 if
//  successful, -1 if sending failed for any reason.
CZMQ_EXPORT int
    zsock_bsend (void *self, const char *picture, ...);

//  Receive a binary encoded 'picture' message from the socket (or actor).
//  This method is similar to zsock_recv, except the arguments are encoded
//  in a binary format that is compatible with zproto, and is designed to
//  reduce memory allocations. The pattern argument is a string that defines
//  the type of each argument. See zsock_bsend for the supported argument
//  types. All arguments must be pointers; this call sets them to point to
//  values held on a per-socket basis. Do not modify or destroy the returned
//  values. Returns 0 if successful, or -1 if it failed to read a message.
CZMQ_EXPORT int
    zsock_brecv (void *self, const char *picture, ...);

//  Set socket to use unbounded pipes (HWM=0); use this in cases when you are
//  totally certain the message volume can fit in memory. This method works
//  across all versions of ZeroMQ. Takes a polymorphic socket reference.
CZMQ_EXPORT void
    zsock_set_unbounded (void *self);

//  Send a signal over a socket. A signal is a short message carrying a
//  success/failure code (by convention, 0 means OK). Signals are encoded
//  to be distinguishable from "normal" messages. Accepts a zock_t or a
//  zactor_t argument, and returns 0 if successful, -1 if the signal could
//  not be sent. Takes a polymorphic socket reference.
CZMQ_EXPORT int
    zsock_signal (void *self, byte status);

//  Wait on a signal. Use this to coordinate between threads, over pipe
//  pairs. Blocks until the signal is received. Returns -1 on error, 0 or
//  greater on success. Accepts a zsock_t or a zactor_t as argument.
//  Takes a polymorphic socket reference.
CZMQ_EXPORT int
    zsock_wait (void *self);

//  If there is a partial message still waiting on the socket, remove and
//  discard it. This is useful when reading partial messages, to get specific
//  message types.
CZMQ_EXPORT void
    zsock_flush (void *self);

//  Probe the supplied object, and report if it looks like a zsock_t.
//  Takes a polymorphic socket reference.
CZMQ_EXPORT bool
    zsock_is (void *self);

//  Probe the supplied reference. If it looks like a zsock_t instance, return
//  the underlying libzmq socket handle; else if it looks like a file
//  descriptor, return NULL; else if it looks like a libzmq socket handle,
//  return the supplied value. Takes a polymorphic socket reference.
CZMQ_EXPORT void *
    zsock_resolve (void *self);

//  Self test of this class
CZMQ_EXPORT void
    zsock_test (bool verbose);

Description

The zsock class wraps the libzmq socket handle (a void *) with a proper structure that follows the CLASS rules for construction and destruction. Some zsock methods take a void * "polymorphic" reference, which can be either a zsock_t or a zactor_t reference, or a libzmq void *.

Please add @discuss section in ../src/zsock.c.

Example

From zsock_test method

zsock_t *writer = zsock_new_push ("@tcp://127.0.0.1:5560");
assert (writer);
assert (zsock_resolve (writer) != writer);
assert (streq (zsock_type_str (writer), "PUSH"));

int rc;
#if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (3, 2, 0))
// Check unbind
rc = zsock_unbind (writer, "tcp://127.0.0.1:%d", 5560);
assert (rc == 0);

// In some cases and especially when running under Valgrind, doing
// a bind immediately after an unbind causes an EADDRINUSE error.
// Even a short sleep allows the OS to release the port for reuse.
zclock_sleep (100);

// Bind again
rc = zsock_bind (writer, "tcp://127.0.0.1:%d", 5560);
assert (rc == 5560);
assert (streq (zsock_endpoint (writer), "tcp://127.0.0.1:5560"));
#endif

zsock_t *reader = zsock_new_pull (">tcp://127.0.0.1:5560");
assert (reader);
assert (zsock_resolve (reader) != reader);
assert (streq (zsock_type_str (reader), "PULL"));

// Basic Hello, World
zstr_send (writer, "Hello, World");
zmsg_t *msg = zmsg_recv (reader);
assert (msg);
char *string = zmsg_popstr (msg);
assert (streq (string, "Hello, World"));
free (string);
zmsg_destroy (&msg);

// Test resolve FD
SOCKET fd = zsock_fd (reader);
assert (zsock_resolve ((void *) &fd) == NULL);

// Test binding to ephemeral ports, sequential and random
int port = zsock_bind (writer, "tcp://127.0.0.1:*");
assert (port >= DYNAMIC_FIRST && port <= DYNAMIC_LAST);
port = zsock_bind (writer, "tcp://127.0.0.1:*[50000-]");
assert (port >= 50000 && port <= DYNAMIC_LAST);
port = zsock_bind (writer, "tcp://127.0.0.1:*[-50001]");
assert (port >= DYNAMIC_FIRST && port <= 50001);
port = zsock_bind (writer, "tcp://127.0.0.1:*[60000-60050]");
assert (port >= 60000 && port <= 60050);

port = zsock_bind (writer, "tcp://127.0.0.1:!");
assert (port >= DYNAMIC_FIRST && port <= DYNAMIC_LAST);
port = zsock_bind (writer, "tcp://127.0.0.1:![50000-]");
assert (port >= 50000 && port <= DYNAMIC_LAST);
port = zsock_bind (writer, "tcp://127.0.0.1:![-50001]");
assert (port >= DYNAMIC_FIRST && port <= 50001);
port = zsock_bind (writer, "tcp://127.0.0.1:![60000-60050]");
assert (port >= 60000 && port <= 60050);

// Test zsock_attach method
zsock_t *server = zsock_new (ZMQ_DEALER);
assert (server);
rc = zsock_attach (server, "@inproc://myendpoint,tcp://127.0.0.1:5556,inproc://others", true);
assert (rc == 0);
rc = zsock_attach (server, "", false);
assert (rc == 0);
rc = zsock_attach (server, NULL, true);
assert (rc == 0);
rc = zsock_attach (server, ">a,@b, c,, ", false);
assert (rc == -1);
zsock_destroy (&server);

// Test zsock_endpoint method
rc = zsock_bind (writer, "inproc://test.%s", "writer");
assert (rc == 0);
assert (streq (zsock_endpoint (writer), "inproc://test.writer"));

// Test error state when connecting to an invalid socket type
// ('txp://' instead of 'tcp://', typo intentional)
rc = zsock_connect (reader, "txp://127.0.0.1:5560");
assert (rc == -1);

// Test signal/wait methods
rc = zsock_signal (writer, 123);
assert (rc == 0);
rc = zsock_wait (reader);
assert (rc == 123);

// Test zsock_send/recv pictures
uint8_t number1 = 123;
uint16_t number2 = 123 * 123;
uint32_t number4 = 123 * 123 * 123;
uint64_t number8 = 123 * 123 * 123 * 123;

zchunk_t *chunk = zchunk_new ("HELLO", 5);
assert (chunk);
zframe_t *frame = zframe_new ("WORLD", 5);
assert (frame);
zhashx_t *hash = zhashx_new ();
assert (hash);
zuuid_t *uuid = zuuid_new ();
assert (uuid);
zhashx_autofree (hash);
zhashx_insert (hash, "1", "value A");
zhashx_insert (hash, "2", "value B");
char *original = "pointer";

// Test zsock_recv into each supported type
zsock_send (writer, "i1248zsbcfUhp",
 -12345, number1, number2, number4, number8,
 "This is a string", "ABCDE", 5,
 chunk, frame, uuid, hash, original);
char *uuid_str = strdup (zuuid_str (uuid));
zchunk_destroy (&chunk);
zframe_destroy (&frame);
zuuid_destroy (&uuid);
zhashx_destroy (&hash);

int integer;
byte *data;
size_t size;
char *pointer;
number8 = number4 = number2 = number1 = 0;
rc = zsock_recv (reader, "i1248zsbcfUhp",
 &integer, &number1, &number2, &number4, &number8,
 &string, &data, &size, &chunk, &frame, &uuid, &hash, &pointer);
assert (rc == 0);
assert (integer == -12345);
assert (number1 == 123);
assert (number2 == 123 * 123);
assert (number4 == 123 * 123 * 123);
assert (number8 == 123 * 123 * 123 * 123);
assert (streq (string, "This is a string"));
assert (memcmp (data, "ABCDE", 5) == 0);
assert (size == 5);
assert (memcmp (zchunk_data (chunk), "HELLO", 5) == 0);
assert (zchunk_size (chunk) == 5);
assert (streq (uuid_str, zuuid_str (uuid)));
assert (memcmp (zframe_data (frame), "WORLD", 5) == 0);
assert (zframe_size (frame) == 5);
char *value = (char *) zhashx_lookup (hash, "1");
assert (streq (value, "value A"));
value = (char *) zhashx_lookup (hash, "2");
assert (streq (value, "value B"));
assert (original == pointer);
free (string);
free (data);
free (uuid_str);
zframe_destroy (&frame);
zchunk_destroy (&chunk);
zhashx_destroy (&hash);

// Test zsock_recv of short message; this lets us return a failure
// with a status code and then nothing else; the receiver will get
// the status code and NULL/zero for all other values
zsock_send (writer, "i", -1);
zsock_recv (reader, "izsbcfp",
 &integer, &string, &data, &size, &chunk, &frame, &pointer);
assert (integer == -1);
assert (string == NULL);
assert (data == NULL);
assert (size == 0);
assert (chunk == NULL);
assert (frame == NULL);
assert (pointer == NULL);

msg = zmsg_new ();
zmsg_addstr (msg, "frame 1");
zmsg_addstr (msg, "frame 2");
zsock_send (writer, "szm", "header", msg);
zmsg_destroy (&msg);

zsock_recv (reader, "szm", &string, &msg);

assert (streq ("header", string));
assert (zmsg_size (msg) == 2);
assert (zframe_streq (zmsg_first (msg), "frame 1"));
assert (zframe_streq (zmsg_next (msg), "frame 2"));
zstr_free (&string);
zmsg_destroy (&msg);

// Test zsock_recv with null arguments
chunk = zchunk_new ("HELLO", 5);
assert (chunk);
frame = zframe_new ("WORLD", 5);
assert (frame);
zsock_send (writer, "izsbcfp",
 -12345, "This is a string", "ABCDE", 5, chunk, frame, original);
zframe_destroy (&frame);
zchunk_destroy (&chunk);
zsock_recv (reader, "izsbcfp", &integer, NULL, NULL, NULL, &chunk, NULL, NULL);
assert (integer == -12345);
assert (memcmp (zchunk_data (chunk), "HELLO", 5) == 0);
assert (zchunk_size (chunk) == 5);
zchunk_destroy (&chunk);

// Test zsock_bsend/brecv pictures with binary encoding
frame = zframe_new ("Hello", 5);
chunk = zchunk_new ("World", 5);

msg = zmsg_new ();
zmsg_addstr (msg, "Hello");
zmsg_addstr (msg, "World");

zsock_bsend (writer, "1248sSpcfm",
 number1, number2, number4, number8,
 "Hello, World",
 "Goodbye cruel World!",
 original,
 chunk, frame, msg);
zchunk_destroy (&chunk);
zframe_destroy (&frame);
zmsg_destroy (&msg);

number8 = number4 = number2 = number1 = 0;
char *longstr;
zsock_brecv (reader, "1248sSpcfm",
 &number1, &number2, &number4, &number8,
 &string, &longstr,
 &pointer,
 &chunk, &frame, &msg);
assert (number1 == 123);
assert (number2 == 123 * 123);
assert (number4 == 123 * 123 * 123);
assert (number8 == 123 * 123 * 123 * 123);
assert (streq (string, "Hello, World"));
assert (streq (longstr, "Goodbye cruel World!"));
assert (pointer == original);
zstr_free (&longstr);
zchunk_destroy (&chunk);
zframe_destroy (&frame);
zmsg_destroy (&msg);

// Check that we can send a zproto format message
zsock_bsend (writer, "1111sS4", 0xAA, 0xA0, 0x02, 0x01, "key", "value", 1234);
zgossip_msg_t *gossip = zgossip_msg_new ();
zgossip_msg_recv (gossip, reader);
assert (zgossip_msg_id (gossip) == ZGOSSIP_MSG_PUBLISH);
zgossip_msg_destroy (&gossip);

zsock_destroy (&reader); zsock_destroy (&writer);

Authors

The czmq manual was written by the authors in the AUTHORS file.

Resources

Main web site:

Report bugs to the email <gro.qmorez.stsil|ved-qmorez#gro.qmorez.stsil|ved-qmorez>

Copyright

Copyright (c) 1991-2012 iMatix Corporation — http://www.imatix.com Copyright other contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for ØMQ: http://czmq.zeromq.org This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.