Mercurial > illumos > onarm
view usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.c @ 4:1a15d5aaf794
synchronized with onnv_86 (6202) in onnv-gate
author | Koji Uno <koji.uno@sun.com> |
---|---|
date | Mon, 31 Aug 2009 14:38:03 +0900 |
parents | c9caec207d52 |
children |
line wrap: on
line source
/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" /* * FMA Event Transport Module Transport Layer API implementation. * * Library for establishing connections and transporting FMA events between * ETMs (event-transport modules) in separate fault domains. * * The transport for this library is internet socket based and uses the DSCP * client services library (libdscp). */ #include "ex_dscp.h" /* * On the SP, there is one DSCP interface for every domain. * Each domain has one and only one DSCP interface to the SP. * * The DSCP interface is created when the domain powers-on. On the SP, * a sysevent will be generated when the DSCP interface is up. On the domain, * the DSCP interface should be up when ETM loads. */ exs_conn_t Acc; /* Connection for accepting/listening */ pthread_t Acc_tid; /* Thread ID for accepting conns */ int Acc_quit; /* Signal to quit the acceptor thread */ int Acc_destroy; /* Destroy accept/listen thread? */ exs_hdl_t *Exh_head = NULL; /* Head of ex_hdl_t list */ pthread_mutex_t List_lock = PTHREAD_MUTEX_INITIALIZER; /* Protects linked list of ex_hdl_t */ static void *Dlp = NULL; /* Handle for dlopen/dlclose/dlsym */ static int (*Send_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *dest); static int (*Post_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *src); /* * * * * * * * * * * * * * * * Module specific routines * * * * * * * * * * * * * * */ /* * Allocate and initialize a transport instance handle. * Return hdl pointer for success, NULL for failure. */ static exs_hdl_t * exs_hdl_alloc(fmd_hdl_t *hdl, char *endpoint_id, int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, void *arg), void *cb_func_arg, int dom) { exs_hdl_t *hp; hp = fmd_hdl_zalloc(hdl, sizeof (exs_hdl_t), FMD_SLEEP); hp->h_endpt_id = fmd_hdl_strdup(hdl, endpoint_id, FMD_SLEEP); hp->h_dom = dom; hp->h_client.c_sd = EXS_SD_FREE; hp->h_server.c_sd = EXS_SD_FREE; hp->h_tid = EXS_TID_FREE; hp->h_destroy = 0; hp->h_hdl = hdl; hp->h_cb_func = cb_func; hp->h_cb_func_arg = cb_func_arg; hp->h_quit = 0; return (hp); } /* * dlopen() the platform filter library and dlsym() the filter funcs. */ static void exs_filter_init(fmd_hdl_t *hdl) { char *propstr = fmd_prop_get_string(hdl, "filter_path"); if (propstr == NULL) { fmd_hdl_debug(hdl, "No filter plugin specified"); Send_filter = NULL; Post_filter = NULL; return; } else { if ((Dlp = dlopen(propstr, RTLD_LOCAL | RTLD_NOW)) == NULL) { fmd_hdl_debug(hdl, "Failed to dlopen filter plugin"); Send_filter = NULL; Post_filter = NULL; fmd_prop_free_string(hdl, propstr); return; } if ((Send_filter = (int (*)())dlsym(Dlp, "send_filter")) == NULL) { fmd_hdl_debug(hdl, "failed to dlsym send_filter()"); Send_filter = NULL; } if ((Post_filter = (int (*)())dlsym(Dlp, "post_filter")) == NULL) { fmd_hdl_debug(hdl, "failed to dlsym post_filter()"); Post_filter = NULL; } } fmd_prop_free_string(hdl, propstr); } /* * If open, dlclose() the platform filter library. */ /*ARGSUSED*/ static void exs_filter_fini(fmd_hdl_t *hdl) { if (Dlp != NULL) (void) dlclose(Dlp); } /* * Translate endpoint_id string to int. * Return the domain ID via "dom_id". * Return 0 for success, nonzero for failure */ static int exs_get_id(fmd_hdl_t *hdl, char *endpoint_id, int *dom_id) { char *ptr; if (strstr(endpoint_id, EXS_SP_PREFIX) != NULL) { /* Remote endpoint is the SP */ *dom_id = DSCP_IDENT_SP; return (0); } else { if ((ptr = strstr(endpoint_id, EXS_DOMAIN_PREFIX)) == NULL) { fmd_hdl_error(hdl, "Property parsing error : %s not " "found in %s. Check event-transport.conf\n", EXS_DOMAIN_PREFIX, endpoint_id); return (1); } ptr += EXS_DOMAIN_PREFIX_LEN; if ((sscanf(ptr, "%d", dom_id)) != 1) { fmd_hdl_error(hdl, "Property parsing error : no " "integer found in %s. Check event-transport.conf\n", endpoint_id); return (2); } } return (0); } /* * Prepare the client connection. * Return 0 for success, nonzero for failure. */ static int exs_prep_client(exs_hdl_t *hp) { int rv, optval = 1; struct linger ling; /* Find the DSCP address for the remote endpoint */ if ((rv = dscpAddr(hp->h_dom, DSCP_ADDR_REMOTE, (struct sockaddr *)&hp->h_client.c_saddr, &hp->h_client.c_len)) != DSCP_OK) { fmd_hdl_debug(hp->h_hdl, "dscpAddr on the client socket " "failed for %s : rv = %d\n", hp->h_endpt_id, rv); return (1); } if ((hp->h_client.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { fmd_hdl_error(hp->h_hdl, "Failed to create the client socket " "for %s", hp->h_endpt_id); return (2); } if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof (optval))) { fmd_hdl_error(hp->h_hdl, "Failed to set REUSEADDR on the " "client socket for %s", hp->h_endpt_id); EXS_CLOSE_CLR(hp->h_client); return (3); } /* * Set SO_LINGER so TCP aborts the connection when closed. * If the domain's client socket goes into the TIME_WAIT state, * ETM will be unable to connect to the SP until this clears. * This connection is over DSCP, which is a simple point-to-point * connection and therefore has no routers or multiple forwarding. * The risk of receiving old packets from a previously terminated * connection is very small. */ ling.l_onoff = 1; ling.l_linger = 0; if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_LINGER, &ling, sizeof (ling))) { fmd_hdl_error(hp->h_hdl, "Failed to set SO_LINGER on the " "client socket for %s", hp->h_endpt_id); EXS_CLOSE_CLR(hp->h_client); return (4); } /* Bind the socket to the local IP address of the DSCP link */ if ((rv = dscpBind(hp->h_dom, hp->h_client.c_sd, EXS_CLIENT_PORT)) != DSCP_OK) { if (rv == DSCP_ERROR_DOWN) { fmd_hdl_debug(hp->h_hdl, "xport - dscp link for %s " "is down", hp->h_endpt_id); } else { fmd_hdl_error(hp->h_hdl, "dscpBind on the client " "socket failed : rv = %d\n", rv); } EXS_CLOSE_CLR(hp->h_client); return (5); } hp->h_client.c_saddr.sin_port = htons(EXS_SERVER_PORT); /* Set IPsec security policy for this socket */ if ((rv = dscpSecure(hp->h_dom, hp->h_client.c_sd)) != DSCP_OK) { fmd_hdl_error(hp->h_hdl, "dscpSecure on the client socket " "failed for %s : rv = %d\n", hp->h_endpt_id, rv); EXS_CLOSE_CLR(hp->h_client); return (6); } return (0); } /* * Server function/thread. There is one thread per endpoint. * Accepts incoming connections and notifies ETM of incoming data. */ void exs_server(void *arg) { exs_hdl_t *hp = (exs_hdl_t *)arg; struct pollfd pfd; while (!hp->h_quit) { pfd.events = POLLIN; pfd.revents = 0; pfd.fd = hp->h_server.c_sd; if (poll(&pfd, 1, -1) <= 0) continue; /* loop around and check h_quit */ if (pfd.revents & (POLLHUP | POLLERR)) { fmd_hdl_debug(hp->h_hdl, "xport - poll hangup/err for " "%s server socket", hp->h_endpt_id); EXS_CLOSE_CLR(hp->h_server); hp->h_destroy++; break; /* thread exits */ } if (pfd.revents & POLLIN) { /* Notify ETM that incoming data is available */ if (hp->h_cb_func(hp->h_hdl, &hp->h_server, ETM_CBFLAG_RECV, hp->h_cb_func_arg)) { /* * For any non-zero return, close the * connection and exit the thread. */ EXS_CLOSE_CLR(hp->h_server); hp->h_destroy++; break; /* thread exits */ } } } fmd_hdl_debug(hp->h_hdl, "xport - exiting server thread for %s", hp->h_endpt_id); } /* * Accept a new incoming connection. */ static void exs_accept(fmd_hdl_t *hdl) { int new_sd, dom, flags, rv; struct sockaddr_in new_saddr; socklen_t new_len = sizeof (struct sockaddr); exs_hdl_t *hp; if ((new_sd = accept(Acc.c_sd, (struct sockaddr *)&new_saddr, &new_len)) != -1) { /* Translate saddr to domain id */ if ((rv = dscpIdent((struct sockaddr *)&new_saddr, (int)new_len, &dom)) != DSCP_OK) { fmd_hdl_error(hdl, "dscpIdent failed : rv = %d\n", rv); (void) close(new_sd); return; } /* Find the exs_hdl_t for the domain trying to connect */ (void) pthread_mutex_lock(&List_lock); for (hp = Exh_head; hp; hp = hp->h_next) { if (hp->h_dom == dom) break; } (void) pthread_mutex_unlock(&List_lock); if (hp == NULL) { fmd_hdl_error(hdl, "Not configured to accept a " "connection from domain %d. Check " "event-transport.conf\n", dom); (void) close(new_sd); return; } /* Authenticate this connection request */ if ((rv = dscpAuth(dom, (struct sockaddr *)&new_saddr, (int)new_len)) != DSCP_OK) { fmd_hdl_error(hdl, "dscpAuth failed for %s : rv = %d ", " Possible spoofing attack\n", hp->h_endpt_id, rv); (void) close(new_sd); return; } if (hp->h_tid != EXS_TID_FREE) { hp->h_quit = 1; fmd_thr_signal(hp->h_hdl, hp->h_tid); fmd_thr_destroy(hp->h_hdl, hp->h_tid); hp->h_destroy = 0; hp->h_quit = 0; } if (hp->h_server.c_sd != EXS_SD_FREE) EXS_CLOSE_CLR(hp->h_server); /* Set the socket to be non-blocking */ flags = fcntl(new_sd, F_GETFL, 0); (void) fcntl(new_sd, F_SETFL, flags | O_NONBLOCK); hp->h_server.c_sd = new_sd; hp->h_tid = fmd_thr_create(hdl, exs_server, hp); } else { fmd_hdl_error(hp->h_hdl, "Failed to accept() a new connection"); } } /* * Listen for and accept incoming connections. * There is only one such thread. */ void exs_listen(void *arg) { fmd_hdl_t *hdl = (fmd_hdl_t *)arg; struct pollfd pfd; while (!Acc_quit) { pfd.events = POLLIN; pfd.revents = 0; pfd.fd = Acc.c_sd; if (poll(&pfd, 1, -1) <= 0) continue; /* loop around and check Acc_quit */ if (pfd.revents & (POLLHUP | POLLERR)) { fmd_hdl_debug(hdl, "xport - poll hangup/err on " "accept socket"); EXS_CLOSE_CLR(Acc); Acc_destroy++; break; /* thread exits */ } if (pfd.revents & POLLIN) exs_accept(hdl); } fmd_hdl_debug(hdl, "xport - exiting accept-listen thread"); } /* * Prepare to accept a connection. * Return 0 for success, nonzero for failure. */ void exs_prep_accept(fmd_hdl_t *hdl, int dom) { int flags, optval = 1; int rv; if (Acc.c_sd != EXS_SD_FREE) return; /* nothing to do */ if (Acc_destroy) { fmd_thr_destroy(hdl, Acc_tid); Acc_tid = EXS_TID_FREE; } /* Check to see if the DSCP interface is configured */ if ((rv = dscpAddr(dom, DSCP_ADDR_LOCAL, (struct sockaddr *)&Acc.c_saddr, &Acc.c_len)) != DSCP_OK) { fmd_hdl_debug(hdl, "xport - dscpAddr on the accept socket " "failed for domain %d : rv = %d", dom, rv); return; } if ((Acc.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { fmd_hdl_error(hdl, "Failed to create the accept socket"); return; } if (setsockopt(Acc.c_sd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof (optval))) { fmd_hdl_error(hdl, "Failed to set REUSEADDR for the accept " "socket"); EXS_CLOSE_CLR(Acc); return; } /* Bind the socket to the local IP address of the DSCP link */ if ((rv = dscpBind(dom, Acc.c_sd, EXS_SERVER_PORT)) != DSCP_OK) { if (rv == DSCP_ERROR_DOWN) { fmd_hdl_debug(hdl, "xport - dscp link for domain %d " "is down", dom); } else { fmd_hdl_error(hdl, "dscpBind on the accept socket " "failed : rv = %d\n", rv); } EXS_CLOSE_CLR(Acc); return; } /* Activate IPsec security policy for this socket */ if ((rv = dscpSecure(dom, Acc.c_sd)) != DSCP_OK) { fmd_hdl_error(hdl, "dscpSecure on the accept socket failed : " "rv = %d\n", dom, rv); EXS_CLOSE_CLR(Acc); return; } if ((listen(Acc.c_sd, EXS_NUM_SOCKS)) == -1) { fmd_hdl_debug(hdl, "Failed to listen() for connections"); EXS_CLOSE_CLR(Acc); return; } flags = fcntl(Acc.c_sd, F_GETFL, 0); (void) fcntl(Acc.c_sd, F_SETFL, flags | O_NONBLOCK); Acc_tid = fmd_thr_create(hdl, exs_listen, hdl); } /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * ETM-to-Transport API Connection Management routines * * * * * * * * * * * * * * * * * * * * * * * * * * * */ /* * Initialize and setup any transport infrastructure before any connections * are opened. * Return etm_xport_hdl_t for success, NULL for failure. */ etm_xport_hdl_t etm_xport_init(fmd_hdl_t *hdl, char *endpoint_id, int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, void *arg), void *cb_func_arg) { exs_hdl_t *hp, *curr; int dom; if (exs_get_id(hdl, endpoint_id, &dom)) return (NULL); (void) pthread_mutex_lock(&List_lock); /* Check for a duplicate endpoint_id on the list */ for (curr = Exh_head; curr; curr = curr->h_next) { if (dom == curr->h_dom) { fmd_hdl_debug(hdl, "xport - init failed, " "duplicate domain id : %d\n", dom); (void) pthread_mutex_unlock(&List_lock); return (NULL); } } if (Exh_head == NULL) { /* Do one-time initializations */ exs_filter_init(hdl); /* Initialize the accept/listen vars */ Acc.c_sd = EXS_SD_FREE; Acc_tid = EXS_TID_FREE; Acc_destroy = 0; Acc_quit = 0; } hp = exs_hdl_alloc(hdl, endpoint_id, cb_func, cb_func_arg, dom); /* Add this transport instance handle to the list */ hp->h_next = Exh_head; Exh_head = hp; (void) pthread_mutex_unlock(&List_lock); exs_prep_accept(hdl, dom); return ((etm_xport_hdl_t)hp); } /* * Teardown any transport infrastructure after all connections are closed. * Return 0 for success, or nonzero for failure. */ int etm_xport_fini(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl) { exs_hdl_t *hp = (exs_hdl_t *)tlhdl; exs_hdl_t *xp, **ppx; (void) pthread_mutex_lock(&List_lock); ppx = &Exh_head; for (xp = *ppx; xp; xp = xp->h_next) { if (xp != hp) ppx = &xp->h_next; else break; } if (xp != hp) { (void) pthread_mutex_unlock(&List_lock); fmd_hdl_abort(hdl, "xport - fini failed, tlhdl %p not on list", (void *)hp); } *ppx = hp->h_next; hp->h_next = NULL; if (hp->h_tid != EXS_TID_FREE) { hp->h_quit = 1; fmd_thr_signal(hdl, hp->h_tid); fmd_thr_destroy(hdl, hp->h_tid); } if (hp->h_server.c_sd != EXS_SD_FREE) (void) close(hp->h_server.c_sd); if (hp->h_client.c_sd != EXS_SD_FREE) (void) close(hp->h_client.c_sd); fmd_hdl_strfree(hdl, hp->h_endpt_id); fmd_hdl_free(hdl, hp, sizeof (exs_hdl_t)); if (Exh_head == NULL) { /* Undo one-time initializations */ exs_filter_fini(hdl); /* Destroy the accept/listen thread */ if (Acc_tid != EXS_TID_FREE) { Acc_quit = 1; fmd_thr_signal(hdl, Acc_tid); fmd_thr_destroy(hdl, Acc_tid); } if (Acc.c_sd != EXS_SD_FREE) EXS_CLOSE_CLR(Acc); } (void) pthread_mutex_unlock(&List_lock); return (0); } /* * Open a connection with the given endpoint, * Return etm_xport_conn_t for success, NULL and set errno for failure. */ etm_xport_conn_t etm_xport_open(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl) { int flags; exs_hdl_t *hp = (exs_hdl_t *)tlhdl; if (hp->h_destroy) { fmd_thr_destroy(hp->h_hdl, hp->h_tid); hp->h_tid = EXS_TID_FREE; hp->h_destroy = 0; } if (hp->h_client.c_sd == EXS_SD_FREE) { if (exs_prep_client(hp) != 0) return (NULL); } /* Set the socket to be non-blocking */ flags = fcntl(hp->h_client.c_sd, F_GETFL, 0); (void) fcntl(hp->h_client.c_sd, F_SETFL, flags | O_NONBLOCK); if ((connect(hp->h_client.c_sd, (struct sockaddr *)&hp->h_client.c_saddr, hp->h_client.c_len)) == -1) { if (errno != EINPROGRESS) { fmd_hdl_debug(hdl, "xport - failed to connect to %s", hp->h_endpt_id); EXS_CLOSE_CLR(hp->h_client); return (NULL); } } fmd_hdl_debug(hdl, "xport - connected client socket for %s", hp->h_endpt_id); return (&hp->h_client); } /* * Close a connection from either endpoint. * Return zero for success, nonzero for failure. */ /*ARGSUSED*/ int etm_xport_close(fmd_hdl_t *hdl, etm_xport_conn_t conn) { exs_conn_t *cp = (exs_conn_t *)conn; if (cp->c_sd == EXS_SD_FREE) return (0); /* Connection already closed */ (void) close(cp->c_sd); cp->c_sd = EXS_SD_FREE; return (0); } /* * * * * * * * * * * * * * * * * * * * ETM-to-Transport API I/O routines * * * * * * * * * * * * * * * * * * */ /* * Try to read byte_cnt bytes from the connection into the given buffer. * Return how many bytes actually read for success, negative value for failure. */ ssize_t etm_xport_read(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout, void *buf, size_t byte_cnt) { ssize_t len, nbytes = 0; hrtime_t endtime, sleeptime; struct timespec tms; char *ptr = (char *)buf; exs_conn_t *cp = (exs_conn_t *)conn; if (cp->c_sd == EXS_SD_FREE) { fmd_hdl_debug(hdl, "xport - read socket %d is closed\n", cp->c_sd); return (-EBADF); } endtime = gethrtime() + timeout; sleeptime = timeout / EXS_IO_SLEEP_DIV; tms.tv_sec = 0; tms.tv_nsec = sleeptime; while (nbytes < byte_cnt) { if (gethrtime() < endtime) { if ((len = recv(cp->c_sd, ptr, byte_cnt - nbytes, 0)) < 0) { if (errno != EINTR && errno != EWOULDBLOCK) { fmd_hdl_debug(hdl, "xport - recv " "failed for socket %d", cp->c_sd); } (void) nanosleep(&tms, 0); continue; } else if (len == 0) { fmd_hdl_debug(hdl, "xport - remote endpt " "closed for socket %d", cp->c_sd); return (0); } ptr += len; nbytes += len; } else { fmd_hdl_debug(hdl, "xport - read timed out for socket " "%d", cp->c_sd); break; } } if (nbytes) return (nbytes); else return (-1); } /* * Try to write byte_cnt bytes to the connection from the given buffer. * Return how many bytes actually written for success, negative value * for failure. */ ssize_t etm_xport_write(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout, void *buf, size_t byte_cnt) { ssize_t len, nbytes = 0; hrtime_t endtime, sleeptime; struct timespec tms; char *ptr = (char *)buf; exs_conn_t *cp = (exs_conn_t *)conn; if (cp->c_sd == EXS_SD_FREE) { fmd_hdl_debug(hdl, "xport - write socket %d is closed\n", cp->c_sd); return (-EBADF); } endtime = gethrtime() + timeout; sleeptime = timeout / EXS_IO_SLEEP_DIV; tms.tv_sec = 0; tms.tv_nsec = sleeptime; while (nbytes < byte_cnt) { if (gethrtime() < endtime) { if ((len = send(cp->c_sd, ptr, byte_cnt - nbytes, 0)) < 0) { if (errno != EINTR && errno != EWOULDBLOCK) { fmd_hdl_debug(hdl, "xport - send " "failed for socket %d", cp->c_sd); } (void) nanosleep(&tms, 0); continue; } ptr += len; nbytes += len; } else { fmd_hdl_debug(hdl, "xport - write timed out for socket " "%d", cp->c_sd); break; } } if (nbytes) return (nbytes); else return (-1); } /* * * * * * * * * * * * * * * * * * * * * * ETM-to-Transport API Filter routines * * * * * * * * * * * * * * * * * * * * */ /* * Call the platform's send_filter function. * Otherwise return ETM_XPORT_FILTER_OK. */ int etm_xport_send_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *dest) { if (Send_filter != NULL) return (Send_filter(hdl, event, dest)); else return (ETM_XPORT_FILTER_OK); } /* * Call the platform's post_filter function. * Otherwise return ETM_XPORT_FILTER_OK. */ int etm_xport_post_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *src) { if (Post_filter != NULL) return (Post_filter(hdl, event, src)); else return (ETM_XPORT_FILTER_OK); }