Mercurial > ublox > ublox8
changeset 91:80059db17cc5
sink-log: save juf frames in specified log file and publish them via zmq
The log file is stored in a per-serial number directory.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Mon, 22 Feb 2021 10:03:09 -0500 |
parents | 6f668b9005aa |
children | c71c6a44e69f |
files | .hgignore CMakeLists.txt sink-log.c |
diffstat | 3 files changed, 298 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Mon Feb 22 11:12:08 2021 -0500 +++ b/.hgignore Mon Feb 22 10:03:09 2021 -0500 @@ -16,5 +16,6 @@ dump-ubx print-state reframe +sink-log zeromq-log-recv zeromq-log-send
--- a/CMakeLists.txt Mon Feb 22 11:12:08 2021 -0500 +++ b/CMakeLists.txt Mon Feb 22 10:03:09 2021 -0500 @@ -1,5 +1,5 @@ # -# Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> +# Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -126,6 +126,24 @@ ${JEFFPC_LIBRARY} ) +add_executable(sink-log + sink-log.c +) + +set_target_properties(sink-log PROPERTIES + LINK_FLAGS "-L${ZEROMQ_LIBRARY_DIRS}" +) + +target_include_directories(sink-log PRIVATE + ${ZEROMQ_INCLUDE_DIRS} +) + +target_link_libraries(sink-log + ${JEFFPC_LIBRARY} + ${ZEROMQ_LIBRARIES} + ublox8 +) + add_executable(zeromq-log-recv zeromq-log-recv.c )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sink-log.c Mon Feb 22 10:03:09 2021 -0500 @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include <unistd.h> + +#include <jeffpc/error.h> +#include <jeffpc/hexdump.h> +#include <jeffpc/list.h> +#include <jeffpc/io.h> + +#include <zmq.h> + +#include "xstdio.h" +#include "frame.h" +#include "ubx.h" + +struct logentry { + uint8_t buf[(1u << 16) + sizeof(struct frame)]; + size_t payload_len; + struct list_node node; +}; + +static const char *prog; +static const char *logname; +static const char *endpoint; +static FILE *logfile; +static bool initialized = false; +static void *zsock; +static void *zctx; +static struct list queue; + +static void usage(void) +{ + fprintf(stderr, "Usage: %s <basedir> <logname> <endpoint>\n", prog); + fprintf(stderr, "\n"); + fprintf(stderr, " <stdin> input framed log\n"); + fprintf(stderr, " <basedir> base directory for per-serial dirs\n"); + fprintf(stderr, " <logname> name of file to use for logging\n"); + fprintf(stderr, " <endpoint> address to send messages to " + "(e.g., tcp://example.com:1234)\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "The effective log file path is " + "<basedir>/<serial>/<logname>\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "Note that %s creates and changes to the <serial> " + "subdirectory.\n", prog); + fprintf(stderr, "Therefore, it is possible to make a per-serial " + "zeromq socket via:\n"); + fprintf(stderr, "\n"); + fprintf(stderr, " %s <basedir> <logname> ipc://sock\n", prog); + fprintf(stderr, "\n"); + fprintf(stderr, "This will publish to:\n"); + fprintf(stderr, "\n"); + fprintf(stderr, " ipc://<basedir>/<serial>/sock\n"); + exit(99); +} + +static void queuemsg(uint8_t *buf, size_t payload_len) +{ + struct logentry *entry; + + entry = malloc(sizeof(struct logentry)); + if (!entry) + panic("Error: Failed to allocate queue entry: %s", + strerror(errno)); + + memcpy(entry->buf, buf, payload_len); + entry->payload_len = payload_len; + + list_insert_tail(&queue, entry); +} + +static void logmsg(uint8_t *buf, size_t payload_len) +{ + int ret; + + /* + * log to file + */ + ret = xfwrite(logfile, buf, sizeof(struct frame) + payload_len); + if (ret) + panic("Error: failed to write frame to stdout"); + + fflush(logfile); + + /* + * Publish to MQ + */ + ret = zmq_send(zsock, buf, sizeof(struct frame) + payload_len, 0); + if (ret < 0) + fprintf(stderr, "Warning: failed to send frame: %s\n", + strerror(errno)); + else if (ret != (sizeof(struct frame) + payload_len)) + fprintf(stderr, "Warning: failed to send the whole " + "frame of %zu bytes (%d sent)\n", + sizeof(struct frame) + payload_len, ret); +} + +static void initialize(const struct ubx_header *hdr, const uint8_t *ptr, + size_t len, enum ubx_msg_id id, uint64_t dummy) +{ + const struct ubx_sec_uniqid *data = (const void *) ptr; + char serial[sizeof(data->unique) * 2 + 1]; + struct logentry *cur, *tmp; + int ret; + + /* wait for UBX-SEC-UNIQID, which we need for the serial */ + if (id != UBX_SEC_UNIQID) + return; + + ASSERT3U(len, ==, 9); + ASSERT3U(data->version, ==, 1); + + hexdumpz(serial, data->unique, sizeof(data->unique), false); + + /* mkdir the per-serial dir */ + ret = xmkdir(serial, 0755); + if (ret && (ret != -EEXIST)) + panic("Error: failed to create per-serial directory (%s): %s", + serial, xstrerror(ret)); + + /* chdir to it */ + if (chdir(serial) != 0) + panic("Error: failed to change into per-serial directory " + "(%s): %s", serial, xstrerror(-errno)); + + /* open log file */ + logfile = fopen(logname, "wb"); + if (!logfile) + panic("Error: failed to open log file (%s): %s", logname, + xstrerror(-errno)); + + /* bind zmq sock */ + ret = zmq_bind(zsock, endpoint); + if (ret) + panic("Error: failed to bind socket: %s\n", strerror(errno)); + + /* log & publish everything that was queued */ + list_for_each_safe(cur, tmp, &queue) { + logmsg(cur->buf, cur->payload_len); + + list_remove(&queue, cur); + free(cur); + } + + initialized = true; +} + +int main(int argc, char **argv) +{ + size_t frames; + int ret; + + prog = argv[0]; + + if (argc != 4) + usage(); + + if (chdir(argv[1]) != 0) { + fprintf(stderr, "Error: cannot change to basedir: %s\n", + xstrerror(-errno)); + return 1; + } + + logname = argv[2]; + endpoint = argv[3]; + + list_create(&queue, sizeof(struct logentry), + offsetof(struct logentry, node)); + + zctx = zmq_ctx_new(); + if (!zctx) { + fprintf(stderr, "Error: failed to create context: %s\n", + strerror(errno)); + return 2; + } + + zsock = zmq_socket(zctx, ZMQ_PUB); + if (!zsock) { + fprintf(stderr, "Error: failed to create socket: %s\n", + strerror(errno)); + return 3; + } + + for (frames = 0;; frames++) { + uint8_t buf[(1u << 16) + sizeof(struct frame)]; + struct frame frame; + size_t payload_len; + + ret = xfread(stdin, &frame, sizeof(frame)); + if (ret) { + fprintf(stderr, "Error: failed to read frame header\n"); + break; + } + + if (be32_to_cpu(frame.magic) != FRAME_MAGIC) { + fprintf(stderr, "Error: frame magic mismatch " + "(got %08x exp %08x)\n", + be32_to_cpu(frame.magic), FRAME_MAGIC); + break; + } + + payload_len = be32_to_cpu(frame.len); + + memcpy(buf, &frame, sizeof(frame)); + + ret = xfread(stdin, &buf[sizeof(struct frame)], payload_len); + if (ret) { + fprintf(stderr, "Error: failed to read frame payload\n"); + break; + } + + /* + * Initialize if needed + */ + if (!initialized) { + const uint8_t *ptr = &buf[sizeof(struct frame)]; + + if (!parse_and_process_ubx_message(ptr, payload_len, + 0 /* dummy */, + initialize)) { + /* malformed message, queue it up */ + queuemsg(buf, payload_len); + continue; + } + + if (!initialized) { + /* + * valid message, but we're still not + * initialized - queue it up + */ + queuemsg(buf, payload_len); + continue; + } + } + + /* log & publish */ + ASSERT3P(list_head(&queue), ==, NULL); + logmsg(buf, payload_len); + } + + fprintf(stderr, "Info: processed %zu frames\n", frames); + + ret = zmq_close(zsock); + if (ret) { + fprintf(stderr, "Error: failed to close socket: %s\n", + strerror(errno)); + return 8; + } + + ret = zmq_ctx_destroy(zctx); + if (ret) { + fprintf(stderr, "Error: failed to destroy context: %s\n", + strerror(errno)); + return 9; + } + + return 0; +}