changeset 91:80059db17cc5

sink-log: save juf frames in specified log file and publish them via zmq The log file is stored in a per-serial number directory. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Mon, 22 Feb 2021 10:03:09 -0500
parents 6f668b9005aa
children c71c6a44e69f
files .hgignore CMakeLists.txt sink-log.c
diffstat 3 files changed, 298 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Mon Feb 22 11:12:08 2021 -0500
+++ b/.hgignore	Mon Feb 22 10:03:09 2021 -0500
@@ -16,5 +16,6 @@
 dump-ubx
 print-state
 reframe
+sink-log
 zeromq-log-recv
 zeromq-log-send
--- a/CMakeLists.txt	Mon Feb 22 11:12:08 2021 -0500
+++ b/CMakeLists.txt	Mon Feb 22 10:03:09 2021 -0500
@@ -1,5 +1,5 @@
 #
-# Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+# Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 #
 # Permission is hereby granted, free of charge, to any person obtaining a copy
 # of this software and associated documentation files (the "Software"), to deal
@@ -126,6 +126,24 @@
 	${JEFFPC_LIBRARY}
 )
 
+add_executable(sink-log
+	sink-log.c
+)
+
+set_target_properties(sink-log PROPERTIES
+	LINK_FLAGS "-L${ZEROMQ_LIBRARY_DIRS}"
+)
+
+target_include_directories(sink-log PRIVATE
+	${ZEROMQ_INCLUDE_DIRS}
+)
+
+target_link_libraries(sink-log
+	${JEFFPC_LIBRARY}
+	${ZEROMQ_LIBRARIES}
+	ublox8
+)
+
 add_executable(zeromq-log-recv
 	zeromq-log-recv.c
 )
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sink-log.c	Mon Feb 22 10:03:09 2021 -0500
@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <unistd.h>
+
+#include <jeffpc/error.h>
+#include <jeffpc/hexdump.h>
+#include <jeffpc/list.h>
+#include <jeffpc/io.h>
+
+#include <zmq.h>
+
+#include "xstdio.h"
+#include "frame.h"
+#include "ubx.h"
+
+struct logentry {
+	uint8_t buf[(1u << 16) + sizeof(struct frame)];
+	size_t payload_len;
+	struct list_node node;
+};
+
+static const char *prog;
+static const char *logname;
+static const char *endpoint;
+static FILE *logfile;
+static bool initialized = false;
+static void *zsock;
+static void *zctx;
+static struct list queue;
+
+static void usage(void)
+{
+	fprintf(stderr, "Usage: %s <basedir> <logname> <endpoint>\n", prog);
+	fprintf(stderr, "\n");
+	fprintf(stderr, "  <stdin>        input framed log\n");
+	fprintf(stderr, "  <basedir>      base directory for per-serial dirs\n");
+	fprintf(stderr, "  <logname>      name of file to use for logging\n");
+	fprintf(stderr, "  <endpoint>     address to send messages to "
+		"(e.g., tcp://example.com:1234)\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "The effective log file path is "
+		"<basedir>/<serial>/<logname>\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "Note that %s creates and changes to the <serial> "
+		"subdirectory.\n", prog);
+	fprintf(stderr, "Therefore, it is possible to make a per-serial "
+		"zeromq socket via:\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "  %s <basedir> <logname> ipc://sock\n", prog);
+	fprintf(stderr, "\n");
+	fprintf(stderr, "This will publish to:\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "  ipc://<basedir>/<serial>/sock\n");
+	exit(99);
+}
+
+static void queuemsg(uint8_t *buf, size_t payload_len)
+{
+	struct logentry *entry;
+
+	entry = malloc(sizeof(struct logentry));
+	if (!entry)
+		panic("Error: Failed to allocate queue entry: %s",
+		      strerror(errno));
+
+	memcpy(entry->buf, buf, payload_len);
+	entry->payload_len = payload_len;
+
+	list_insert_tail(&queue, entry);
+}
+
+static void logmsg(uint8_t *buf, size_t payload_len)
+{
+	int ret;
+
+	/*
+	 * log to file
+	 */
+	ret = xfwrite(logfile, buf, sizeof(struct frame) + payload_len);
+	if (ret)
+		panic("Error: failed to write frame to stdout");
+
+	fflush(logfile);
+
+	/*
+	 * Publish to MQ
+	 */
+	ret = zmq_send(zsock, buf, sizeof(struct frame) + payload_len, 0);
+	if (ret < 0)
+		fprintf(stderr, "Warning: failed to send frame: %s\n",
+			strerror(errno));
+	else if (ret != (sizeof(struct frame) + payload_len))
+		fprintf(stderr, "Warning: failed to send the whole "
+			"frame of %zu bytes (%d sent)\n",
+			sizeof(struct frame) + payload_len, ret);
+}
+
+static void initialize(const struct ubx_header *hdr, const uint8_t *ptr,
+		       size_t len, enum ubx_msg_id id, uint64_t dummy)
+{
+	const struct ubx_sec_uniqid *data = (const void *) ptr;
+	char serial[sizeof(data->unique) * 2 + 1];
+	struct logentry *cur, *tmp;
+	int ret;
+
+	/* wait for UBX-SEC-UNIQID, which we need for the serial */
+	if (id != UBX_SEC_UNIQID)
+		return;
+
+	ASSERT3U(len, ==, 9);
+	ASSERT3U(data->version, ==, 1);
+
+	hexdumpz(serial, data->unique, sizeof(data->unique), false);
+
+	/* mkdir the per-serial dir */
+	ret = xmkdir(serial, 0755);
+	if (ret && (ret != -EEXIST))
+		panic("Error: failed to create per-serial directory (%s): %s",
+		      serial, xstrerror(ret));
+
+	/* chdir to it */
+	if (chdir(serial) != 0)
+		panic("Error: failed to change into per-serial directory "
+		      "(%s): %s", serial, xstrerror(-errno));
+
+	/* open log file */
+	logfile = fopen(logname, "wb");
+	if (!logfile)
+		panic("Error: failed to open log file (%s): %s", logname,
+		      xstrerror(-errno));
+
+	/* bind zmq sock */
+	ret = zmq_bind(zsock, endpoint);
+	if (ret)
+		panic("Error: failed to bind socket: %s\n", strerror(errno));
+
+	/* log & publish everything that was queued */
+	list_for_each_safe(cur, tmp, &queue) {
+		logmsg(cur->buf, cur->payload_len);
+
+		list_remove(&queue, cur);
+		free(cur);
+	}
+
+	initialized = true;
+}
+
+int main(int argc, char **argv)
+{
+	size_t frames;
+	int ret;
+
+	prog = argv[0];
+
+	if (argc != 4)
+		usage();
+
+	if (chdir(argv[1]) != 0) {
+		fprintf(stderr, "Error: cannot change to basedir: %s\n",
+			xstrerror(-errno));
+		return 1;
+	}
+
+	logname = argv[2];
+	endpoint = argv[3];
+
+	list_create(&queue, sizeof(struct logentry),
+		    offsetof(struct logentry, node));
+
+	zctx = zmq_ctx_new();
+	if (!zctx) {
+		fprintf(stderr, "Error: failed to create context: %s\n",
+			strerror(errno));
+		return 2;
+	}
+
+	zsock = zmq_socket(zctx, ZMQ_PUB);
+	if (!zsock) {
+		fprintf(stderr, "Error: failed to create socket: %s\n",
+			strerror(errno));
+		return 3;
+	}
+
+	for (frames = 0;; frames++) {
+		uint8_t buf[(1u << 16) + sizeof(struct frame)];
+		struct frame frame;
+		size_t payload_len;
+
+		ret = xfread(stdin, &frame, sizeof(frame));
+		if (ret) {
+			fprintf(stderr, "Error: failed to read frame header\n");
+			break;
+		}
+
+		if (be32_to_cpu(frame.magic) != FRAME_MAGIC) {
+			fprintf(stderr, "Error: frame magic mismatch "
+				"(got %08x exp %08x)\n",
+				be32_to_cpu(frame.magic), FRAME_MAGIC);
+			break;
+		}
+
+		payload_len = be32_to_cpu(frame.len);
+
+		memcpy(buf, &frame, sizeof(frame));
+
+		ret = xfread(stdin, &buf[sizeof(struct frame)], payload_len);
+		if (ret) {
+			fprintf(stderr, "Error: failed to read frame payload\n");
+			break;
+		}
+
+		/*
+		 * Initialize if needed
+		 */
+		if (!initialized) {
+			const uint8_t *ptr = &buf[sizeof(struct frame)];
+
+			if (!parse_and_process_ubx_message(ptr, payload_len,
+							   0 /* dummy */,
+							   initialize)) {
+				/* malformed message, queue it up */
+				queuemsg(buf, payload_len);
+				continue;
+			}
+
+			if (!initialized) {
+				/*
+				 * valid message, but we're still not
+				 * initialized - queue it up
+				 */
+				queuemsg(buf, payload_len);
+				continue;
+			}
+		}
+
+		/* log & publish */
+		ASSERT3P(list_head(&queue), ==, NULL);
+		logmsg(buf, payload_len);
+	}
+
+	fprintf(stderr, "Info: processed %zu frames\n", frames);
+
+	ret = zmq_close(zsock);
+	if (ret) {
+		fprintf(stderr, "Error: failed to close socket: %s\n",
+			strerror(errno));
+		return 8;
+	}
+
+	ret = zmq_ctx_destroy(zctx);
+	if (ret) {
+		fprintf(stderr, "Error: failed to destroy context: %s\n",
+			strerror(errno));
+		return 9;
+	}
+
+	return 0;
+}