Mercurial > ublox > ublox8
changeset 84:14fb501082c9
zeromq-log-{send,recv}: pub-sub each juf frame via zeromq
This is essentially a beefier version of broadcast-log. Beefier because it
should handle all frame sizes, not just those smaller than the MTU.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Wed, 08 Apr 2020 11:53:23 -0400 |
parents | c3e0cc003206 |
children | a4689cefafeb |
files | .hgignore CMakeLists.txt config.cmake zeromq-log-recv.c zeromq-log-send.c |
diffstat | 5 files changed, 331 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Sat Feb 29 12:26:40 2020 -0500 +++ b/.hgignore Wed Apr 08 11:53:23 2020 -0400 @@ -16,3 +16,5 @@ dump-ubx print-state reframe +zeromq-log-recv +zeromq-log-send
--- a/CMakeLists.txt Sat Feb 29 12:26:40 2020 -0500 +++ b/CMakeLists.txt Wed Apr 08 11:53:23 2020 -0400 @@ -125,3 +125,37 @@ target_link_libraries(reframe ${JEFFPC_LIBRARY} ) + +add_executable(zeromq-log-recv + zeromq-log-recv.c +) + +set_target_properties(zeromq-log-recv PROPERTIES + LINK_FLAGS "-L${ZEROMQ_LIBRARY_DIRS}" +) + +target_include_directories(zeromq-log-recv PRIVATE + ${ZEROMQ_INCLUDE_DIRS} +) + +target_link_libraries(zeromq-log-recv + ${JEFFPC_LIBRARY} + ${ZEROMQ_LIBRARIES} +) + +add_executable(zeromq-log-send + zeromq-log-send.c +) + +set_target_properties(zeromq-log-send PROPERTIES + LINK_FLAGS "-L${ZEROMQ_LIBRARY_DIRS}" +) + +target_include_directories(zeromq-log-send PRIVATE + ${ZEROMQ_INCLUDE_DIRS} +) + +target_link_libraries(zeromq-log-send + ${JEFFPC_LIBRARY} + ${ZEROMQ_LIBRARIES} +)
--- a/config.cmake Sat Feb 29 12:26:40 2020 -0500 +++ b/config.cmake Wed Apr 08 11:53:23 2020 -0400 @@ -1,5 +1,5 @@ # -# Copyright (c) 2019 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> +# Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -22,3 +22,6 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules") find_package(jeffpc) + +include(FindPkgConfig) +pkg_check_modules(ZEROMQ REQUIRED libzmq)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/zeromq-log-recv.c Wed Apr 08 11:53:23 2020 -0400 @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include <jeffpc/error.h> + +#include <zmq.h> + +#include "xstdio.h" +#include "frame.h" + +static const char *prog; + +static void usage(void) +{ + fprintf(stderr, "Usage: %s <endpoint>\n", prog); + fprintf(stderr, "\n"); + fprintf(stderr, " <stdout> received frames\n"); + fprintf(stderr, " <endpoint> address to receive messages from " + "(e.g., tcp://example.com:1234)\n"); + exit(99); +} + +int main(int argc, char **argv) +{ + const char *endpoint; + size_t frames; + void *sock; + void *ctx; + int ret; + + prog = argv[0]; + + if (argc != 2) + usage(); + + endpoint = argv[1]; + + ctx = zmq_ctx_new(); + if (!ctx) { + fprintf(stderr, "Error: failed to create context: %s\n", + strerror(errno)); + return 1; + } + + sock = zmq_socket(ctx, ZMQ_SUB); + if (!sock) { + fprintf(stderr, "Error: failed to create socket: %s\n", + strerror(errno)); + return 2; + } + + ret = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0); + if (ret) { + fprintf(stderr, "Error: failed to subscribe: %s\n", + strerror(errno)); + return 3; + } + + ret = zmq_connect(sock, endpoint); + if (ret) { + fprintf(stderr, "Error: failed to connect socket: %s\n", + strerror(errno)); + return 4; + } + + for (frames = 0;; frames++) { + uint8_t buf[(1u << 16) + sizeof(struct frame)]; + struct frame frame; + size_t payload_len; + + ret = zmq_recv(sock, buf, sizeof(buf), 0); + if (ret < 0) { + fprintf(stderr, "Error: failed to recv a message: %s\n", + strerror(errno)); + return 5; + } + if (ret > sizeof(buf)) { + fprintf(stderr, "Warning: received message truncated: " + "expected max %zu bytes, got %d\n", + sizeof(buf), ret); + continue; + } + + memcpy(&frame, buf, sizeof(frame)); + + if (be32_to_cpu(frame.magic) != FRAME_MAGIC) { + fprintf(stderr, "Warning: frame magic mismatch " + "(got %08x exp %08x)\n", + be32_to_cpu(frame.magic), FRAME_MAGIC); + continue; + } + + payload_len = be32_to_cpu(frame.len); + + if ((sizeof(struct frame) + payload_len) != ret) { + fprintf(stderr, "Warning: frame payload length mismatch: " + "received %d byte message, frame should have " + "%zu byte payload + %zu header (%zu total)\n", + ret, payload_len, sizeof(struct frame), + sizeof(struct frame) + payload_len); + continue; + } + + ret = xfwrite(stdout, buf, sizeof(struct frame) + payload_len); + if (ret) { + fprintf(stderr, "Error: failed to write frame to stdout\n"); + return 4; + } + + fflush(stdout); + } + + fprintf(stderr, "Info: processed %zu frames\n", frames); + + ret = zmq_close(sock); + if (ret) { + fprintf(stderr, "Error: failed to close socket: %s\n", + strerror(errno)); + return 8; + } + + ret = zmq_ctx_destroy(ctx); + if (ret) { + fprintf(stderr, "Error: failed to destroy context: %s\n", + strerror(errno)); + return 9; + } + + return 0; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/zeromq-log-send.c Wed Apr 08 11:53:23 2020 -0400 @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include <jeffpc/error.h> + +#include <zmq.h> + +#include "xstdio.h" +#include "frame.h" + +static const char *prog; + +static void usage(void) +{ + fprintf(stderr, "Usage: %s <endpoint>\n", prog); + fprintf(stderr, "\n"); + fprintf(stderr, " <stdin> input framed log\n"); + fprintf(stderr, " <stdout> copy of input\n"); + fprintf(stderr, " <endpoint> address to send messages to " + "(e.g., tcp://example.com:1234)\n"); + exit(99); +} + +int main(int argc, char **argv) +{ + const char *endpoint; + size_t frames; + void *sock; + void *ctx; + int ret; + + prog = argv[0]; + + if (argc != 2) + usage(); + + endpoint = argv[1]; + + ctx = zmq_ctx_new(); + if (!ctx) { + fprintf(stderr, "Error: failed to create context: %s\n", + strerror(errno)); + return 1; + } + + sock = zmq_socket(ctx, ZMQ_PUB); + if (!sock) { + fprintf(stderr, "Error: failed to create socket: %s\n", + strerror(errno)); + return 2; + } + + ret = zmq_bind(sock, endpoint); + if (ret) { + fprintf(stderr, "Error: failed to bind socket: %s\n", + strerror(errno)); + return 3; + } + + for (frames = 0;; frames++) { + uint8_t buf[(1u << 16) + sizeof(struct frame)]; + struct frame frame; + size_t payload_len; + + ret = xfread(stdin, &frame, sizeof(frame)); + if (ret) { + fprintf(stderr, "Error: failed to read frame header\n"); + break; + } + + if (be32_to_cpu(frame.magic) != FRAME_MAGIC) { + fprintf(stderr, "Error: frame magic mismatch " + "(got %08x exp %08x)\n", + be32_to_cpu(frame.magic), FRAME_MAGIC); + break; + } + + payload_len = be32_to_cpu(frame.len); + + memcpy(buf, &frame, sizeof(frame)); + + ret = xfread(stdin, &buf[sizeof(struct frame)], payload_len); + if (ret) { + fprintf(stderr, "Error: failed to read frame payload\n"); + break; + } + + ret = xfwrite(stdout, buf, sizeof(struct frame) + payload_len); + if (ret) { + fprintf(stderr, "Error: failed to write frame to stdout\n"); + return 4; + } + + fflush(stdout); + + ret = zmq_send(sock, buf, sizeof(struct frame) + payload_len, 0); + if (ret < 0) + fprintf(stderr, "Warning: failed to send frame: %s\n", + strerror(errno)); + else if (ret != (sizeof(struct frame) + payload_len)) + fprintf(stderr, "Warning: failed to send the whole " + "frame of %zu bytes (%d sent)\n", + sizeof(struct frame) + payload_len, ret); + } + + fprintf(stderr, "Info: processed %zu frames\n", frames); + + ret = zmq_close(sock); + if (ret) { + fprintf(stderr, "Error: failed to close socket: %s\n", + strerror(errno)); + return 8; + } + + ret = zmq_ctx_destroy(ctx); + if (ret) { + fprintf(stderr, "Error: failed to destroy context: %s\n", + strerror(errno)); + return 9; + } + + return 0; +}