Mercurial > dovecot > original-hg > dovecot-1.2
view src/lib/ostream-file.c @ 1741:9df02b1533b3 HEAD
Removed most of the license comments from src/lib/*.c. It's just fine to
keep them in a single COPYING.MIT file. Changed a few other comments as well.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Wed, 27 Aug 2003 00:18:16 +0300 |
parents | e850252cdc7e |
children | 6cfa2a123b7e |
line wrap: on
line source
/* Copyright (c) 2002-2003 Timo Sirainen */ /* @UNSAFE: whole file */ #include "lib.h" #include "alarm-hup.h" #include "ioloop.h" #include "file-set-size.h" #include "write-full.h" #include "network.h" #include "sendfile-util.h" #include "istream.h" #include "istream-internal.h" #include "ostream-internal.h" #include <unistd.h> #include <sys/stat.h> #ifdef HAVE_SYS_UIO_H # include <sys/uio.h> #endif /* try to keep the buffer size within 4k..128k. ReiserFS may actually return 128k as optimal size. */ #define DEFAULT_OPTIMAL_BLOCK_SIZE 4096 #define MAX_OPTIMAL_BLOCK_SIZE (128*1024) #define IS_STREAM_EMPTY(fstream) \ ((fstream)->head == (fstream)->tail && !(fstream)->full) #define MAX_SSIZE_T(size) \ ((size) < SSIZE_T_MAX ? (size_t)(size) : SSIZE_T_MAX) struct file_ostream { struct _ostream ostream; int fd; struct io *io; unsigned char *buffer; /* ring-buffer */ size_t buffer_size, max_buffer_size, optimal_block_size; size_t head, tail; /* first unsent/unused byte */ int timeout_msecs; void (*timeout_cb)(void *); void *timeout_context; unsigned int full:1; /* if head == tail, is buffer empty or full? */ unsigned int file:1; unsigned int corked:1; unsigned int no_socket_cork:1; unsigned int no_sendfile:1; unsigned int autoclose_fd:1; }; static void stream_closed(struct file_ostream *fstream) { if (fstream->autoclose_fd && fstream->fd != -1) { if (close(fstream->fd) < 0) i_error("file_ostream.close() failed: %m"); fstream->fd = -1; } if (fstream->io != NULL) { io_remove(fstream->io); fstream->io = NULL; } fstream->ostream.ostream.closed = TRUE; } static void _close(struct _iostream *stream) { struct file_ostream *fstream = (struct file_ostream *) stream; /* flush output before really closing it */ o_stream_flush(&fstream->ostream.ostream); stream_closed(fstream); } static void _destroy(struct _iostream *stream) { struct file_ostream *fstream = (struct file_ostream *) stream; p_free(fstream->ostream.iostream.pool, fstream->buffer); } static void _set_max_buffer_size(struct _iostream *stream, size_t max_size) { struct file_ostream *fstream = (struct file_ostream *) stream; fstream->max_buffer_size = max_size; } static void _set_blocking(struct _iostream *stream, int timeout_msecs, void (*timeout_cb)(void *), void *context) { struct file_ostream *fstream = (struct file_ostream *) stream; fstream->timeout_msecs = timeout_msecs; fstream->timeout_cb = timeout_cb; fstream->timeout_context = context; if (!fstream->file) net_set_nonblock(fstream->fd, timeout_msecs == 0); if (timeout_msecs != 0) alarm_hup_init(); } static void _cork(struct _ostream *stream) { struct file_ostream *fstream = (struct file_ostream *) stream; if (!fstream->corked) { if (!fstream->no_socket_cork) { if (net_set_cork(fstream->fd, TRUE) < 0) fstream->no_socket_cork = TRUE; } fstream->corked = TRUE; } } static void update_iovec(struct iovec *iov, unsigned int iov_size, size_t size) { while (size > 0) { i_assert(iov_size > 0); if ((size_t)iov->iov_len <= size) { size -= iov->iov_len; iov->iov_base = NULL; iov->iov_len = 0; } else { iov->iov_base = (char *) iov->iov_base + size; iov->iov_len -= size; size = 0; } iov++; iov_size--; } } static void update_buffer(struct file_ostream *fstream, size_t size) { size_t used; if (IS_STREAM_EMPTY(fstream)) return; if (fstream->head < fstream->tail) { /* ...HXXXT... */ used = fstream->tail - fstream->head; fstream->head += I_MIN(used, size); } else { /* XXXT...HXXX */ used = fstream->buffer_size - fstream->head; if (size > used) { size -= used; if (size < fstream->tail) fstream->head = size; else { /* whole buffer is sent */ fstream->head = fstream->tail = 0; } } else { fstream->head += I_MIN(used, size); } fstream->full = FALSE; } if (fstream->head == fstream->tail) fstream->head = fstream->tail = 0; if (fstream->head == fstream->buffer_size) fstream->head = 0; } /* NOTE: modifies iov */ static ssize_t o_stream_writev(struct file_ostream *fstream, struct iovec *iov, int iov_size) { ssize_t ret; while (iov->iov_len == 0 && iov_size > 0) { iov++; iov_size--; } i_assert(iov_size > 0); if (iov_size == 1) ret = write(fstream->fd, iov->iov_base, iov->iov_len); else ret = writev(fstream->fd, iov, iov_size); if (ret < 0) { if (errno == EAGAIN || errno == EINTR) return 0; fstream->ostream.ostream.stream_errno = errno; stream_closed(fstream); return -1; } update_iovec(iov, iov_size, ret); update_buffer(fstream, ret); return ret; } /* returns how much of vector was used */ static int o_stream_fill_iovec(struct file_ostream *fstream, struct iovec iov[2]) { if (IS_STREAM_EMPTY(fstream)) return 0; if (fstream->head < fstream->tail) { iov[0].iov_base = fstream->buffer + fstream->head; iov[0].iov_len = fstream->tail - fstream->head; return 1; } else { iov[0].iov_base = fstream->buffer + fstream->head; iov[0].iov_len = fstream->buffer_size - fstream->head; if (fstream->tail == 0) return 1; else { iov[1].iov_base = fstream->buffer; iov[1].iov_len = fstream->tail; return 2; } } } static int o_stream_send_blocking(struct file_ostream *fstream, const void *data, size_t size) { time_t timeout_time; struct iovec iov[3]; int iov_len, first; iov_len = o_stream_fill_iovec(fstream, iov); if (size > 0) { iov[iov_len].iov_base = (void *) data; iov[iov_len].iov_len = size; iov_len++; } first = TRUE; timeout_time = GET_TIMEOUT_TIME(fstream); while (iov[iov_len-1].iov_len != 0) { if (first) first = FALSE; else if (timeout_time > 0 && time(NULL) > timeout_time) { /* timeouted */ if (fstream->timeout_cb != NULL) fstream->timeout_cb(fstream->timeout_context); fstream->ostream.ostream.stream_errno = EAGAIN; return -1; } if (o_stream_writev(fstream, iov, iov_len) < 0) return -1; } return 1; } static int buffer_flush(struct file_ostream *fstream) { struct iovec iov[2]; int iov_len; if (!IS_STREAM_EMPTY(fstream)) { iov_len = o_stream_fill_iovec(fstream, iov); if (o_stream_writev(fstream, iov, iov_len) < 0) return -1; if (!IS_STREAM_EMPTY(fstream)) { if (o_stream_send_blocking(fstream, NULL, 0) < 0) return -1; } } return 1; } static int _flush(struct _ostream *stream) { struct file_ostream *fstream = (struct file_ostream *) stream; int ret; ret = buffer_flush(fstream); if (fstream->corked) { /* remove cork */ if (!fstream->no_socket_cork) { if (net_set_cork(fstream->fd, FALSE) < 0) i_error("net_set_cork() failed: %m"); } fstream->corked = FALSE; } return ret; } static size_t get_unused_space(struct file_ostream *fstream) { if (fstream->head > fstream->tail) { /* XXXT...HXXX */ return fstream->head - fstream->tail; } else if (fstream->head < fstream->tail) { /* ...HXXXT... */ return (fstream->buffer_size - fstream->tail) + fstream->head; } else { /* either fully unused or fully used */ return fstream->full ? 0 : fstream->buffer_size; } } static int _have_space(struct _ostream *stream, size_t size) { struct file_ostream *fstream = (struct file_ostream *) stream; size_t unused; unused = get_unused_space(fstream); if (size <= unused) return 1; if (fstream->max_buffer_size == 0) return 1; unused += (fstream->max_buffer_size - fstream->buffer_size); return size <= unused ? 1 : 0; } static int _seek(struct _ostream *stream, uoff_t offset) { struct file_ostream *fstream = (struct file_ostream *) stream; off_t ret; if (offset > OFF_T_MAX) { stream->ostream.stream_errno = EINVAL; return -1; } if (buffer_flush(fstream) < 0) return -1; ret = lseek(fstream->fd, (off_t)offset, SEEK_SET); if (ret < 0) { stream->ostream.stream_errno = errno; return -1; } if (ret != (off_t)offset) { stream->ostream.stream_errno = EINVAL; return -1; } stream->ostream.stream_errno = 0; stream->ostream.offset = offset; return 1; } static void o_stream_grow_buffer(struct file_ostream *fstream, size_t bytes) { size_t size, head_size; size = nearest_power(fstream->buffer_size + bytes); if (fstream->max_buffer_size != 0) { if (size > fstream->max_buffer_size) { /* limit the size */ size = fstream->max_buffer_size; } else if (fstream->corked) { /* use the largest possible buffer with corking */ size = fstream->max_buffer_size; } } if (size == fstream->buffer_size) return; fstream->buffer = p_realloc(fstream->ostream.iostream.pool, fstream->buffer, fstream->buffer_size, size); if (fstream->tail <= fstream->head && !IS_STREAM_EMPTY(fstream)) { head_size = I_MIN(fstream->head, size - fstream->buffer_size); memcpy(fstream->buffer + fstream->buffer_size, fstream->buffer, head_size); if (head_size == fstream->head) fstream->tail = fstream->buffer_size + head_size; else { memmove(fstream->buffer, fstream->buffer + head_size, fstream->head - head_size); fstream->tail = fstream->head - head_size; } } fstream->full = FALSE; fstream->buffer_size = size; } static void stream_send_io(void *context) { struct file_ostream *fstream = context; struct iovec iov[2]; int iov_len; iov_len = o_stream_fill_iovec(fstream, iov); if (iov_len == 0 || o_stream_writev(fstream, iov, iov_len) < 0 || iov[iov_len-1].iov_len == 0) { /* error / all sent */ if (fstream->io != NULL) { io_remove(fstream->io); fstream->io = NULL; } } } static size_t o_stream_add(struct file_ostream *fstream, const void *data, size_t size) { size_t unused, sent; int i; unused = get_unused_space(fstream); if (unused < size) o_stream_grow_buffer(fstream, size-unused); sent = 0; for (i = 0; i < 2 && sent < size && !fstream->full; i++) { unused = fstream->tail >= fstream->head ? fstream->buffer_size - fstream->tail : fstream->head - fstream->tail; if (unused > size-sent) unused = size-sent; memcpy(fstream->buffer + fstream->tail, data, unused); sent += unused; fstream->tail += unused; if (fstream->tail == fstream->buffer_size) fstream->tail = 0; if (fstream->head == fstream->tail) fstream->full = TRUE; } if (sent != 0 && fstream->io == NULL && !fstream->corked && !fstream->file) { fstream->io = io_add(fstream->fd, IO_WRITE, stream_send_io, fstream); } i_assert(!STREAM_IS_BLOCKING(fstream) || sent == size); return sent; } static ssize_t _send(struct _ostream *stream, const void *data, size_t size) { struct file_ostream *fstream = (struct file_ostream *) stream; struct iovec iov; ssize_t ret = 0; i_assert(size <= SSIZE_T_MAX); stream->ostream.stream_errno = 0; /* never try sending immediately if fd is blocking, so we don't need to deal with timeout issues here */ if (IS_STREAM_EMPTY(fstream) && !STREAM_IS_BLOCKING(fstream) && (!fstream->corked || !_have_space(stream, size))) { iov.iov_base = (void *) data; iov.iov_len = size; ret = o_stream_writev(fstream, &iov, 1); if (ret > 0) stream->ostream.offset += ret; if (ret < 0 || (size_t)ret == size) return ret; data = (const char *) data + ret; size -= ret; } if (!_have_space(stream, size) && STREAM_IS_BLOCKING(fstream)) { /* send it blocking */ if (o_stream_send_blocking(fstream, data, size) < 0) return -1; ret += (ssize_t)size; } else { /* buffer it, at least partly */ ret += (ssize_t)o_stream_add(fstream, data, size); } stream->ostream.offset += ret; return ret; } static off_t io_stream_sendfile(struct _ostream *outstream, struct istream *instream, int in_fd) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; uoff_t start_offset; uoff_t offset, send_size, v_offset; ssize_t ret; int first; /* set timeout time before hflushing existing buffer which may block */ timeout_time = GET_TIMEOUT_TIME(foutstream); start_offset = instream->v_offset; /* flush out any data in buffer */ if (buffer_flush(foutstream) < 0) return -1; v_offset = instream->v_offset; first = TRUE; do { if (first) first = FALSE; else if (timeout_time > 0 && time(NULL) > timeout_time) { /* timeouted */ if (foutstream->timeout_cb != NULL) { foutstream->timeout_cb( foutstream->timeout_context); } outstream->ostream.stream_errno = EAGAIN; ret = -1; break; } offset = instream->start_offset + v_offset; send_size = instream->v_limit - v_offset; ret = safe_sendfile(foutstream->fd, in_fd, &offset, MAX_SSIZE_T(send_size)); if (ret < 0) { if (errno != EINTR && errno != EAGAIN) { outstream->ostream.stream_errno = errno; if (errno != EINVAL) { /* close only if error wasn't because sendfile() isn't supported */ stream_closed(foutstream); } break; } ret = 0; if (!STREAM_IS_BLOCKING(foutstream)) { /* don't block */ break; } } v_offset += ret; outstream->ostream.offset += ret; } while ((uoff_t)ret != send_size); i_stream_seek(instream, v_offset); return ret < 0 ? -1 : (off_t)(instream->v_offset - start_offset); } static off_t io_stream_copy(struct _ostream *outstream, struct istream *instream, int overlapping) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; uoff_t start_offset; struct iovec iov[3]; int iov_len; const unsigned char *data; size_t size, skip_size; ssize_t ret; int pos; timeout_time = GET_TIMEOUT_TIME(foutstream); iov_len = o_stream_fill_iovec(foutstream, iov); skip_size = 0; for (pos = 0; pos < iov_len; pos++) skip_size += iov[pos].iov_len; i_assert(!overlapping || iov_len == 0); start_offset = instream->v_offset; for (;;) { if (overlapping) i_stream_seek(instream, instream->v_offset); (void)i_stream_read_data(instream, &data, &size, foutstream->optimal_block_size-1); if (size == 0) { /* all sent */ break; } pos = iov_len++; iov[pos].iov_base = (void *) data; iov[pos].iov_len = size; if (overlapping) { if (o_stream_seek(&outstream->ostream, outstream->ostream.offset) < 0) return -1; } ret = o_stream_writev(foutstream, iov, iov_len); if (ret < 0) { /* error */ return -1; } if (ret == 0 && !STREAM_IS_BLOCKING(foutstream)) { /* don't block */ break; } if (skip_size > 0) { if ((size_t)ret < skip_size) { skip_size -= ret; ret = 0; } else { ret -= skip_size; skip_size = 0; } } outstream->ostream.offset += ret; if (timeout_time > 0 && time(NULL) > timeout_time) { /* timeouted */ if (foutstream->timeout_cb != NULL) { foutstream->timeout_cb( foutstream->timeout_context); } outstream->ostream.stream_errno = EAGAIN; return -1; } i_stream_skip(instream, size - iov[pos].iov_len); iov_len--; /* if we already sent the iov[0] and iov[1], we can just remove them from future calls */ while (iov_len > 0 && iov[0].iov_len == 0) { iov[0] = iov[1]; if (iov_len > 1) iov[1] = iov[2]; iov_len--; } } return (off_t) (instream->v_offset - start_offset); } static off_t io_stream_copy_backwards(struct _ostream *outstream, struct istream *instream) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; uoff_t in_start_offset, in_offset, out_offset; const unsigned char *data; size_t buffer_size, size, read_size; ssize_t ret; i_assert(IS_STREAM_EMPTY(foutstream)); timeout_time = GET_TIMEOUT_TIME(foutstream); /* figure out optimal buffer size */ buffer_size = instream->real_stream->buffer_size; if (buffer_size == 0 || buffer_size > foutstream->buffer_size) { if (foutstream->optimal_block_size > foutstream->buffer_size) { o_stream_grow_buffer(foutstream, foutstream->optimal_block_size - foutstream->buffer_size); } buffer_size = foutstream->buffer_size; } in_start_offset = instream->v_offset; in_offset = instream->v_limit; out_offset = outstream->ostream.offset + (instream->v_limit - instream->v_offset); i_assert(out_offset <= instream->start_offset + instream->v_size); while (in_offset > in_start_offset) { if (in_offset - in_start_offset <= buffer_size) read_size = in_offset - in_start_offset; else read_size = buffer_size; in_offset -= read_size; out_offset -= read_size; for (;;) { i_assert(in_offset <= instream->v_limit); i_stream_seek(instream, in_offset); read_size = instream->v_limit - in_offset; (void)i_stream_read_data(instream, &data, &size, read_size-1); if (size == read_size) { if (instream->mmaped) { /* we'll have to write it through buffer of the file gets corrupted */ i_assert(size <= foutstream->buffer_size); memcpy(foutstream->buffer, data, size); data = foutstream->buffer; } break; } i_assert(size < read_size); if (size < read_size) { /* buffer too large probably, try with smaller */ read_size -= size; in_offset += read_size; out_offset += read_size; buffer_size -= read_size; } } if (o_stream_seek(&outstream->ostream, out_offset) < 0) return -1; ret = write_full(foutstream->fd, data, size); if (ret < 0) { /* error */ outstream->ostream.stream_errno = errno; return -1; } if (timeout_time > 0 && time(NULL) > timeout_time) { /* timeouted */ if (foutstream->timeout_cb != NULL) { foutstream->timeout_cb( foutstream->timeout_context); } outstream->ostream.stream_errno = EAGAIN; return -1; } i_stream_set_read_limit(instream, in_offset); } return (off_t) (instream->v_limit - in_start_offset); } static off_t _send_istream(struct _ostream *outstream, struct istream *instream) { struct file_ostream *foutstream = (struct file_ostream *) outstream; uoff_t old_limit; off_t ret; int in_fd, overlapping; i_assert(instream->v_limit <= OFF_T_MAX); i_assert(instream->v_offset <= instream->v_limit); outstream->ostream.stream_errno = 0; if (instream->v_offset == instream->v_limit) return 0; in_fd = i_stream_get_fd(instream); if (in_fd != foutstream->fd) overlapping = 0; else { /* copying data within same fd. we'll have to be careful with seeks and overlapping writes. */ ret = (off_t)outstream->ostream.offset - (off_t)(instream->start_offset + instream->v_offset); if (ret == 0) { /* copying data over itself. we don't really need to do that, just fake it. */ return instream->v_limit - instream->v_offset; } overlapping = ret < 0 ? -1 : 1; if (o_stream_seek(&outstream->ostream, outstream->ostream.offset) < 0) return -1; } if (!foutstream->no_sendfile && in_fd != -1 && overlapping <= 0) { ret = io_stream_sendfile(outstream, instream, in_fd); if (ret >= 0 || outstream->ostream.stream_errno != EINVAL) return ret; /* sendfile() not supported (with this fd), fallback to regular sending. */ outstream->ostream.stream_errno = 0; foutstream->no_sendfile = TRUE; } if (overlapping <= 0) return io_stream_copy(outstream, instream, overlapping); else { old_limit = instream->v_limit; ret = io_stream_copy_backwards(outstream, instream); i_stream_set_read_limit(instream, old_limit); return ret; } } struct ostream * o_stream_create_file(int fd, pool_t pool, size_t max_buffer_size, int autoclose_fd) { struct file_ostream *fstream; struct ostream *ostream; struct stat st; off_t offset; fstream = p_new(pool, struct file_ostream, 1); fstream->fd = fd; fstream->max_buffer_size = max_buffer_size; fstream->autoclose_fd = autoclose_fd; fstream->optimal_block_size = DEFAULT_OPTIMAL_BLOCK_SIZE; fstream->ostream.iostream.close = _close; fstream->ostream.iostream.destroy = _destroy; fstream->ostream.iostream.set_max_buffer_size = _set_max_buffer_size; fstream->ostream.iostream.set_blocking = _set_blocking; fstream->ostream.cork = _cork; fstream->ostream.flush = _flush; fstream->ostream.have_space = _have_space; fstream->ostream.seek = _seek; fstream->ostream.send = _send; fstream->ostream.send_istream = _send_istream; ostream = _o_stream_create(&fstream->ostream, pool); offset = lseek(fd, 0, SEEK_CUR); if (offset >= 0) { ostream->offset = offset; if (fstat(fd, &st) == 0) { if ((uoff_t)st.st_blksize > fstream->optimal_block_size) { /* use the optimal block size, but with a reasonable limit */ fstream->optimal_block_size = I_MIN(st.st_blksize, MAX_OPTIMAL_BLOCK_SIZE); } if (S_ISREG(st.st_mode)) { fstream->no_socket_cork = TRUE; fstream->file = TRUE; o_stream_set_blocking(ostream, 60000, 0, NULL); } } #ifndef HAVE_LINUX_SENDFILE /* only Linux supports sendfile() with non-sockets. Other systems fail more or less gracefully if it's tried, so don't bother to even try with them. */ fstream->no_sendfile = TRUE; #endif } else { if (net_getsockname(fd, NULL, NULL) < 0) { fstream->no_sendfile = TRUE; fstream->no_socket_cork = TRUE; } } return ostream; }