changeset 2702:d92a4e3ea90c

6467522 event-transport: add filter interfaces 6468866 event-transport: server not sending resource.fm.xprt.ack event first
author jrutt
date Fri, 08 Sep 2006 16:31:23 -0700
parents cc4df3b0edec
children 333cd67fe595
files usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/event-transport.conf usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.c usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.h usr/src/cmd/fm/modules/common/event-transport/etm.c usr/src/cmd/fm/modules/common/event-transport/etm_xport_api.h
diffstat 5 files changed, 189 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/event-transport.conf	Fri Sep 08 11:31:51 2006 -0700
+++ b/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/event-transport.conf	Fri Sep 08 16:31:23 2006 -0700
@@ -43,6 +43,14 @@
 #		Sets the timeout value for a read or write I/O operation.
 #		Time is in nanoseconds.
 #
+#	setprop filter_path <path>
+#
+#		Specifies the location of a platform-specific filter plugin
+#		for incoming and outgoing events.  No assumptions are made
+#		regarding the relative path or the name of the filter plugin.
+#		Therefore, the absolute path and the name of the binary must
+#		be used.
+#
 #	setprop client_list <FMRI string>
 #
 #		Specifies a list of peers this module will accept a
--- a/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.c	Fri Sep 08 11:31:51 2006 -0700
+++ b/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.c	Fri Sep 08 16:31:23 2006 -0700
@@ -50,6 +50,9 @@
 exs_hdl_t *Exh_head = NULL;		/* Head of ex_hdl_t list */
 pthread_mutex_t	List_lock = PTHREAD_MUTEX_INITIALIZER;
 					/* Protects linked list of ex_hdl_t */
+static void *Dlp = NULL;		/* Handle for dlopen/dlclose/dlsym */
+static int (*Send_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *dest);
+static int (*Post_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *src);
 
 /*
  * * * * * * * * * * * * * *
@@ -86,6 +89,55 @@
 }
 
 /*
+ * dlopen() the platform filter library and dlsym() the filter funcs.
+ */
+static void
+exs_filter_init(fmd_hdl_t *hdl)
+{
+	char *propstr = fmd_prop_get_string(hdl, "filter_path");
+
+	if (propstr == NULL) {
+		fmd_hdl_debug(hdl, "No filter plugin specified");
+		Send_filter = NULL;
+		Post_filter = NULL;
+		return;
+	} else {
+		if ((Dlp = dlopen(propstr, RTLD_LOCAL | RTLD_NOW)) == NULL) {
+			fmd_hdl_debug(hdl, "Failed to dlopen filter plugin");
+			Send_filter = NULL;
+			Post_filter = NULL;
+			fmd_prop_free_string(hdl, propstr);
+			return;
+		}
+
+		if ((Send_filter = (int (*)())dlsym(Dlp, "send_filter"))
+		    == NULL) {
+			fmd_hdl_debug(hdl, "failed to dlsym send_filter()");
+			Send_filter = NULL;
+		}
+
+		if ((Post_filter = (int (*)())dlsym(Dlp, "post_filter"))
+		    == NULL) {
+			fmd_hdl_debug(hdl, "failed to dlsym post_filter()");
+			Post_filter = NULL;
+		}
+	}
+
+	fmd_prop_free_string(hdl, propstr);
+}
+
+/*
+ * If open, dlclose() the platform filter library.
+ */
+/*ARGSUSED*/
+static void
+exs_filter_fini(fmd_hdl_t *hdl)
+{
+	if (Dlp != NULL)
+		(void) dlclose(Dlp);
+}
+
+/*
  * Translate endpoint_id string to int.
  * Return the domain ID via "dom_id".
  * Return 0 for success, nonzero for failure
@@ -413,6 +465,11 @@
 		}
 	}
 
+	if (Exh_head == NULL) {
+		/* Do one-time initializations */
+		exs_filter_init(hdl);
+	}
+
 	hp = exs_hdl_alloc(hdl, endpoint_id, cb_func, cb_func_arg, dom);
 
 	/* Add this transport instance handle to the list */
@@ -469,6 +526,11 @@
 	fmd_hdl_strfree(hdl, hp->h_endpt_id);
 	fmd_hdl_free(hdl, hp, sizeof (exs_hdl_t));
 
+	if (Exh_head == NULL) {
+		/* Undo one-time initializations */
+		exs_filter_fini(hdl);
+	}
+
 	(void) pthread_mutex_unlock(&List_lock);
 
 	return (0);
@@ -659,3 +721,35 @@
 	else
 		return (-1);
 }
+
+/*
+ * * * * * * * * * * * * * * * * * * * *
+ * ETM-to-Transport API Filter routines
+ * * * * * * * * * * * * * * * * * * * *
+ */
+
+/*
+ * Call the platform's send_filter function.
+ * Otherwise return ETM_XPORT_FILTER_OK.
+ */
+int
+etm_xport_send_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *dest)
+{
+	if (Send_filter != NULL)
+		return (Send_filter(hdl, event, dest));
+	else
+		return (ETM_XPORT_FILTER_OK);
+}
+
+/*
+ * Call the platform's post_filter function.
+ * Otherwise return ETM_XPORT_FILTER_OK.
+ */
+int
+etm_xport_post_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *src)
+{
+	if (Post_filter != NULL)
+		return (Post_filter(hdl, event, src));
+	else
+		return (ETM_XPORT_FILTER_OK);
+}
--- a/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.h	Fri Sep 08 11:31:51 2006 -0700
+++ b/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.h	Fri Sep 08 16:31:23 2006 -0700
@@ -52,6 +52,7 @@
 #include <errno.h>
 #include <time.h>
 #include <poll.h>
+#include <dlfcn.h>
 #include <libdscp.h>
 #include "etm_xport_api.h"
 
--- a/usr/src/cmd/fm/modules/common/event-transport/etm.c	Fri Sep 08 11:31:51 2006 -0700
+++ b/usr/src/cmd/fm/modules/common/event-transport/etm.c	Fri Sep 08 16:31:23 2006 -0700
@@ -33,6 +33,7 @@
  */
 
 #include <netinet/in.h>
+#include <errno.h>
 #include <sys/fm/protocol.h>
 #include <sys/sysmacros.h>
 #include <pthread.h>
@@ -129,16 +130,20 @@
 	fmd_stat_t read_ack;
 	fmd_stat_t read_bytes;
 	fmd_stat_t read_msg;
+	fmd_stat_t post_filter;
 	/* write counters */
 	fmd_stat_t write_ack;
 	fmd_stat_t write_bytes;
 	fmd_stat_t write_msg;
+	fmd_stat_t send_filter;
 	/* error counters */
 	fmd_stat_t error_protocol;
 	fmd_stat_t error_drop_read;
 	fmd_stat_t error_read;
 	fmd_stat_t error_read_badhdr;
 	fmd_stat_t error_write;
+	fmd_stat_t error_send_filter;
+	fmd_stat_t error_post_filter;
 	/* misc */
 	fmd_stat_t peer_count;
 
@@ -147,16 +152,20 @@
 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
+	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
 	/* write counters */
 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
+	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
 	/* ETM error counters */
 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
+	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
+	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
 	/* ETM Misc */
 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
 };
@@ -296,6 +305,18 @@
 		return (1);
 	}
 
+	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
+	if (rv == ETM_XPORT_FILTER_DROP) {
+		fmd_hdl_debug(hdl, "post_filter dropped event");
+		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
+		nvlist_free(nvl);
+		return (0);
+	} else if (rv == ETM_XPORT_FILTER_ERROR) {
+		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
+		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
+		/* Still post event */
+	}
+
 	(void) pthread_mutex_lock(&mp->epm_lock);
 	(void) pthread_mutex_lock(&Etm_mod_lock);
 	if (!Etm_exit) {
@@ -1081,9 +1102,10 @@
 {
 	etm_epmap_t *mp;
 	nvlist_t *msgnvl;
-	int hdrstat, rv;
+	int hdrstat, rv, cnt = 0;
 	char *buf, *nvbuf, *class;
 	size_t nvsize, buflen, hdrlen;
+	struct timespec tms;
 
 	(void) pthread_mutex_lock(&Etm_mod_lock);
 	if (Etm_exit) {
@@ -1094,9 +1116,26 @@
 
 	mp = fmd_xprt_getspecific(hdl, xprthdl);
 
-	if (pthread_mutex_trylock(&mp->epm_lock))
-		/* Another thread may be trying to close this fmd_xprt_t */
-		return (FMD_SEND_RETRY);
+	for (;;) {
+		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
+			break;
+		} else {
+			/*
+			 * Another thread may be (1) trying to close this
+			 * fmd_xprt_t, or (2) posting an event to it.
+			 * If (1), don't want to spend too much time here.
+			 * If (2), allow it to finish and release epm_lock.
+			 */
+			if (cnt++ < 10) {
+				tms.tv_sec = 0;
+				tms.tv_nsec = (cnt * 10000);
+				(void) nanosleep(&tms, NULL);
+
+			} else {
+				return (FMD_SEND_RETRY);
+			}
+		}
+	}
 
 	mp->epm_txbusy++;
 
@@ -1156,6 +1195,21 @@
 		return (FMD_SEND_FAILED);
 	}
 
+	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
+	if (rv == ETM_XPORT_FILTER_DROP) {
+		mp->epm_txbusy--;
+		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
+		(void) pthread_mutex_unlock(&mp->epm_lock);
+		fmd_hdl_debug(hdl, "send_filter dropped event");
+		nvlist_free(msgnvl);
+		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
+		return (FMD_SEND_SUCCESS);
+	} else if (rv == ETM_XPORT_FILTER_ERROR) {
+		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
+		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
+		/* Still send event */
+	}
+
 	(void) pthread_mutex_unlock(&mp->epm_lock);
 
 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
@@ -1175,6 +1229,7 @@
 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
 		(void) pthread_mutex_unlock(&mp->epm_lock);
 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
+		nvlist_free(msgnvl);
 		FREE_BUF(hdl, buf, buflen);
 		return (FMD_SEND_FAILED);
 	}
@@ -1346,8 +1401,9 @@
 	{ "client_list", FMD_TYPE_STRING, NULL },
 	{ "server_list", FMD_TYPE_STRING, NULL },
 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
-	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000"},
-	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000"},
+	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
+	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
+	{ "filter_path", FMD_TYPE_STRING, NULL },
 	{ NULL, 0, NULL }
 };
 
--- a/usr/src/cmd/fm/modules/common/event-transport/etm_xport_api.h	Fri Sep 08 11:31:51 2006 -0700
+++ b/usr/src/cmd/fm/modules/common/event-transport/etm_xport_api.h	Fri Sep 08 16:31:23 2006 -0700
@@ -115,6 +115,30 @@
 etm_xport_write(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout,
     void *buf, size_t byte_cnt);
 
+/*
+ * Filter
+ */
+
+#define	ETM_XPORT_FILTER_OK (1)		/* OK to send/post event */
+#define	ETM_XPORT_FILTER_DROP (0)	/* Do not send/post event */
+#define	ETM_XPORT_FILTER_ERROR (-1)	/* Error */
+
+/*
+ * Make a decision whether or not to send an event to a remote endpoint.
+ * Return ETM_XPORT_FILTER_OK, ETM_XPORT_FILTER_DROP, or ETM_XPORT_FILTER_ERROR
+ * and set errno for failure.
+ */
+int
+etm_xport_send_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *dest);
+
+/*
+ * Make a decision whether or not to post an event to FMD.
+ * Return ETM_XPORT_FILTER_OK, ETM_XPORT_FILTER_DROP, or ETM_XPORT_FILTER_ERROR
+ * and set errno for failure.
+ */
+int
+etm_xport_post_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *src);
+
 #ifdef __cplusplus
 }
 #endif