Mercurial > dovecot > original-hg > dovecot-1.2
changeset 1870:c972ea085643 HEAD
istream rewrite. instead of directly setting any limits to stream, you now
have to use i_stream_create_limit() to existing stream. this should make the
istreams much easier to create and understand how they work.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sun, 09 Nov 2003 20:26:25 +0200 |
parents | 39d0548792d8 |
children | 638944ab7753 |
files | src/imap/cmd-append.c src/lib-index/mbox/istream-mbox.c src/lib-index/mbox/mbox-append.c src/lib-index/mbox/mbox-index.c src/lib-index/mbox/mbox-index.h src/lib-index/mbox/mbox-open.c src/lib-index/mbox/mbox-rewrite.c src/lib-index/mbox/mbox-sync-full.c src/lib-mail/message-body-search.c src/lib-storage/index/mbox/mbox-expunge.c src/lib-storage/mail-save.c src/lib/Makefile.am src/lib/iostream-internal.h src/lib/istream-data.c src/lib/istream-file.c src/lib/istream-internal.h src/lib/istream-limit.c src/lib/istream-mmap.c src/lib/istream.c src/lib/istream.h src/lib/ostream-file.c |
diffstat | 21 files changed, 512 insertions(+), 461 deletions(-) [+] |
line wrap: on
line diff
--- a/src/imap/cmd-append.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/imap/cmd-append.c Sun Nov 09 20:26:25 2003 +0200 @@ -54,6 +54,7 @@ struct imap_arg_list *flags_list; struct mailbox_custom_flags old_flags; struct mail_full_flags flags; + struct istream *input; time_t internal_date; const char *mailbox, *internal_date_str; uoff_t msg_size; @@ -180,14 +181,16 @@ } /* save the mail */ - i_stream_set_read_limit(client->input, - client->input->v_offset + msg_size); + input = i_stream_create_limit(default_pool, client->input, + client->input->v_offset, + msg_size); if (!box->save_next(ctx, &flags, internal_date, - timezone_offset, client->input)) { + timezone_offset, input)) { + i_stream_unref(input); client_send_storage_error(client, storage); break; } - i_stream_set_read_limit(client->input, 0); + i_stream_unref(input); if (client->input->closed) break;
--- a/src/lib-index/mbox/istream-mbox.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/istream-mbox.c Sun Nov 09 20:26:25 2003 +0200 @@ -12,8 +12,7 @@ struct istream *input; buffer_t *headers; - uoff_t body_offset, body_size; - struct message_size header_size; + uoff_t v_header_size, body_offset, body_size; }; static void _close(struct _iostream *stream __attr_unused__) @@ -49,42 +48,27 @@ struct mbox_istream *mstream = (struct mbox_istream *) stream; ssize_t ret; size_t pos; - uoff_t limit, old_limit; - off_t vsize_diff; + uoff_t offset; - if (stream->istream.v_offset < mstream->header_size.virtual_size) { + if (stream->istream.v_offset < mstream->v_header_size) { /* we don't support mixing headers and body. it shouldn't be needed. */ return -2; } - /* may be positive or negative, depending on how much there was CRs - and how much headers were hidden */ - vsize_diff = mstream->header_size.virtual_size - - mstream->header_size.physical_size; - - limit = stream->istream.v_limit - vsize_diff; - old_limit = mstream->input->v_limit; - if (limit != old_limit) - i_stream_set_read_limit(mstream->input, limit); - - if (mstream->input->v_offset != stream->istream.v_offset - vsize_diff) { - i_stream_seek(mstream->input, - stream->istream.v_offset - vsize_diff); - } + offset = stream->istream.v_offset - mstream->v_header_size; + if (mstream->input->v_offset != offset) + i_stream_seek(mstream->input, offset); ret = i_stream_read(mstream->input); - mstream->istream.pos -= mstream->istream.skip; - mstream->istream.skip = 0; - mstream->istream.buffer = i_stream_get_data(mstream->input, &pos); + stream->pos -= stream->skip; + stream->skip = 0; + stream->buffer = i_stream_get_data(mstream->input, &pos); - ret = pos <= mstream->istream.pos ? -1 : - (ssize_t) (pos - mstream->istream.pos); + ret = pos <= stream->pos ? -1 : + (ssize_t) (pos - stream->pos); mstream->istream.pos = pos; - - if (limit != old_limit) - i_stream_set_read_limit(mstream->input, old_limit); return ret; } @@ -93,54 +77,51 @@ struct mbox_istream *mstream = (struct mbox_istream *) stream; stream->istream.v_offset = v_offset; - if (v_offset < mstream->header_size.virtual_size) { + if (v_offset < mstream->v_header_size) { /* still in headers */ stream->skip = v_offset; - stream->pos = stream->high_pos = - mstream->header_size.virtual_size; + stream->pos = mstream->v_header_size; stream->buffer = buffer_get_data(mstream->headers, NULL); } else { /* body - use our real input stream */ - stream->skip = stream->pos = stream->high_pos = 0; + stream->skip = stream->pos = 0; stream->buffer = NULL; - - v_offset += (off_t)mstream->header_size.physical_size - - (off_t)mstream->header_size.virtual_size; - i_stream_seek(mstream->input, v_offset); } } -static void _skip(struct _istream *stream, uoff_t count) -{ - i_stream_seek(&stream->istream, stream->istream.v_offset + count); -} - struct istream *i_stream_create_mbox(pool_t pool, struct istream *input, - uoff_t body_size) + uoff_t offset, uoff_t body_size) { struct mbox_istream *mstream; + struct istream *hdr_input; mstream = p_new(pool, struct mbox_istream, 1); - mstream->input = input; mstream->body_size = body_size; if (body_size == 0) { /* possibly broken message, find the next From-line and make sure header parser won't pass it. */ mbox_skip_header(input); - i_stream_set_read_limit(input, input->v_offset); - i_stream_seek(input, 0); + hdr_input = i_stream_create_limit(pool, input, + 0, input->v_offset); + } else { + hdr_input = input; + i_stream_ref(input); } mstream->headers = buffer_create_dynamic(default_pool, 8192, (size_t)-1); - mbox_hide_headers(input, mstream->headers, - &mstream->header_size); - mstream->body_offset = input->v_offset; - i_stream_set_read_limit(input, mstream->body_offset + body_size); + i_stream_seek(hdr_input, offset); + mbox_read_headers(hdr_input, mstream->headers); + mstream->v_header_size = buffer_get_used_size(mstream->headers); + mstream->body_offset = hdr_input->v_offset; + i_stream_unref(hdr_input); + + mstream->input = i_stream_create_limit(pool, input, + mstream->body_offset, body_size); mstream->istream.buffer = buffer_get_data(mstream->headers, NULL); - mstream->istream.pos = mstream->header_size.virtual_size; + mstream->istream.pos = mstream->v_header_size; mstream->istream.iostream.close = _close; mstream->istream.iostream.destroy = _destroy; @@ -148,9 +129,8 @@ mstream->istream.iostream.set_blocking = _set_blocking; mstream->istream.read = _read; - mstream->istream.skip_count = _skip; mstream->istream.seek = _seek; - return _i_stream_create(&mstream->istream, pool, -1, 0, - mstream->header_size.virtual_size + body_size); + return _i_stream_create(&mstream->istream, pool, -1, + mstream->v_header_size + body_size); }
--- a/src/lib-index/mbox/mbox-append.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-append.c Sun Nov 09 20:26:25 2003 +0200 @@ -14,10 +14,11 @@ struct istream *input) { struct mail_index_record *rec; - struct mbox_header_context ctx; + struct mbox_header_context ctx; + struct istream *hdr_stream; enum mail_index_record_flag index_flags; time_t received_date; - uoff_t abs_start_offset, eoh_offset; + uoff_t hdr_offset, body_offset, end_offset; const unsigned char *data; unsigned char md5_digest[16]; size_t size, pos; @@ -38,8 +39,7 @@ if (size == 0) return -1; - if (pos == size || size <= 5 || - strncmp((const char *) data, "From ", 5) != 0) { + if (pos == size || size <= 5 || memcmp(data, "From ", 5) != 0) { /* a) no \n found, or line too long b) not a From-line */ index_set_error(index, "Error indexing mbox file %s: " @@ -55,12 +55,12 @@ received_date = ioloop_time; i_stream_skip(input, pos+1); - abs_start_offset = input->start_offset + input->v_offset; + hdr_offset = input->v_offset; /* now, find the end of header. also stops at "\nFrom " if it's found (broken messages) */ mbox_skip_header(input); - eoh_offset = input->v_offset; + body_offset = input->v_offset; index_flags = 0; @@ -72,17 +72,31 @@ reading the headers. it uses Content-Length if available or finds the next From-line. */ mbox_header_init_context(&ctx, index, input); - ctx.set_read_limit = TRUE; - i_stream_seek(input, abs_start_offset - input->start_offset); - i_stream_set_read_limit(input, eoh_offset); + hdr_stream = i_stream_create_limit(default_pool, input, + hdr_offset, + body_offset - hdr_offset); + i_stream_seek(hdr_stream, 0); + message_parse_header(NULL, hdr_stream, NULL, mbox_header_cb, &ctx); + i_stream_unref(hdr_stream); + + dirty = FALSE; - message_parse_header(NULL, input, NULL, mbox_header_cb, &ctx); + /* try Content-Length */ + end_offset = body_offset + ctx.content_length; + if (ctx.content_length == (uoff_t)-1 || + !mbox_verify_end_of_body(input, end_offset)) { + /* failed, search for From-line */ + if (ctx.content_length != (uoff_t)-1) { + /* broken, rewrite it */ + dirty = TRUE; + } - i_stream_seek(input, input->v_limit); - i_stream_set_read_limit(input, 0); + i_stream_seek(input, body_offset); + mbox_skip_message(input); + ctx.content_length = input->v_offset - body_offset; + } - dirty = ctx.content_length_broken; if (index->header->messages_count == 0 && ctx.uid_validity != index->header->uid_validity) { /* UID validity is different */ @@ -139,7 +153,7 @@ /* location offset = beginning of headers in message */ if (!mail_cache_add(trans_ctx, rec, MAIL_CACHE_LOCATION_OFFSET, - &abs_start_offset, sizeof(abs_start_offset))) + &hdr_offset, sizeof(hdr_offset))) return -1; if (!mail_cache_add(trans_ctx, rec, MAIL_CACHE_RECEIVED_DATE, @@ -167,11 +181,6 @@ uoff_t offset; int ret; - if (input->eof) { - /* no new data */ - return TRUE; - } - if (!index->set_lock(index, MAIL_LOCK_EXCLUSIVE)) return FALSE; @@ -180,7 +189,7 @@ do { offset = input->v_offset; - if (input->start_offset + input->v_offset != 0) { + if (input->v_offset != 0) { /* we're at the [\r]\n before the From-line, skip it */ if (!mbox_skip_crlf(input)) { @@ -195,15 +204,15 @@ } } - t_push(); - ret = mbox_index_append_next(index, trans_ctx, input); - t_pop(); - if (input->eof) { ret = 1; break; } + t_push(); + ret = mbox_index_append_next(index, trans_ctx, input); + t_pop(); + if (ret == 0) { /* we want to rescan this message with exclusive locking */
--- a/src/lib-index/mbox/mbox-index.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-index.c Sun Nov 09 20:26:25 2003 +0200 @@ -55,11 +55,9 @@ return TRUE; } -struct istream *mbox_get_stream(struct mail_index *index, uoff_t offset, +struct istream *mbox_get_stream(struct mail_index *index, enum mail_lock_type lock_type) { - i_assert(offset < OFF_T_MAX); - switch (lock_type) { case MAIL_LOCK_SHARED: case MAIL_LOCK_EXCLUSIVE: @@ -93,10 +91,7 @@ } } - i_stream_set_read_limit(index->mbox_stream, 0); - i_stream_set_start_offset(index->mbox_stream, (uoff_t)offset); i_stream_seek(index->mbox_stream, 0); - i_stream_ref(index->mbox_stream); return index->mbox_stream; } @@ -245,37 +240,10 @@ struct message_header_line *hdr, void *context) { struct mbox_header_context *ctx = context; - uoff_t start_offset, end_offset; size_t i; int fixed = FALSE; - if (hdr == NULL) { - /* End of headers */ - if (!ctx->set_read_limit) - return; - - /* a) use Content-Length, b) search for "From "-line */ - start_offset = ctx->input->v_offset; - i_stream_set_read_limit(ctx->input, 0); - - end_offset = start_offset + ctx->content_length; - if (ctx->content_length == (uoff_t)-1 || - !mbox_verify_end_of_body(ctx->input, end_offset)) { - if (ctx->content_length != (uoff_t)-1) { - i_stream_seek(ctx->input, start_offset); - ctx->content_length_broken = TRUE; - } - mbox_skip_message(ctx->input); - end_offset = ctx->input->v_offset; - ctx->content_length = end_offset - start_offset; - } - - i_stream_seek(ctx->input, start_offset); - i_stream_set_read_limit(ctx->input, end_offset); - return; - } - - if (hdr->eoh) + if (hdr == NULL || hdr->eoh) return; /* Pretty much copy&pasted from popa3d by Solar Designer */ @@ -293,8 +261,7 @@ case 'C': case 'c': - if (ctx->set_read_limit && - strcasecmp(hdr->name, "Content-Length") == 0) { + if (strcasecmp(hdr->name, "Content-Length") == 0) { /* manual parsing, so we can deal with uoff_t */ ctx->content_length = 0; for (i = 0; i < hdr->value_len; i++) { @@ -655,12 +622,6 @@ if (i_stream_read_data(input, &data, &size, 6) < 0) return FALSE; - if (input->eof) { - /* end of file. a bit unexpected though, - since \n is missing. */ - return TRUE; - } - /* either there should be the next From-line, or [\r]\n at end of file */ if (size > 0 && data[0] == '\r') { @@ -725,20 +686,15 @@ return TRUE; } -void mbox_hide_headers(struct istream *input, buffer_t *dest, - struct message_size *hdr_size) +void mbox_read_headers(struct istream *input, buffer_t *dest) { struct message_header_parser_ctx *hdr_ctx; struct message_header_line *hdr; - uoff_t virtual_size = 0; - hdr_ctx = message_parse_header_init(input, hdr_size); + hdr_ctx = message_parse_header_init(input, NULL); while ((hdr = message_parse_header_next(hdr_ctx)) != NULL) { if (hdr->eoh) { - if (dest != NULL) - buffer_append(dest, "\r\n", 2); - else - virtual_size += 2; + buffer_append(dest, "\r\n", 2); break; } @@ -750,26 +706,16 @@ strcasecmp(hdr->name, "Content-Length") == 0 || strcasecmp(hdr->name, "Status") == 0) { /* ignore */ - } else if (dest != NULL) { + } else { if (!hdr->continued) { buffer_append(dest, hdr->name, hdr->name_len); buffer_append(dest, ": ", 2); } buffer_append(dest, hdr->value, hdr->value_len); buffer_append(dest, "\r\n", 2); - } else { - if (!hdr->continued) - virtual_size += hdr->name_len + 2; - virtual_size += hdr->value_len + 2; } } message_parse_header_deinit(hdr_ctx); - - if (dest != NULL) - virtual_size = buffer_get_used_size(dest); - - hdr_size->virtual_size = virtual_size; - hdr_size->lines = 0; } struct mail_index *
--- a/src/lib-index/mbox/mbox-index.h Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-index.h Sun Nov 09 20:26:25 2003 +0200 @@ -18,7 +18,6 @@ struct istream *input; uoff_t content_length; - int set_read_limit, content_length_broken; }; int mbox_set_syscall_error(struct mail_index *index, const char *function); @@ -27,7 +26,7 @@ which is useful when you want to be sure you're not accessing a deleted mbox file. */ int mbox_file_open(struct mail_index *index); -struct istream *mbox_get_stream(struct mail_index *index, uoff_t offset, +struct istream *mbox_get_stream(struct mail_index *index, enum mail_lock_type lock_type); void mbox_file_close_stream(struct mail_index *index); void mbox_file_close_fd(struct mail_index *index); @@ -50,8 +49,7 @@ int mbox_mail_get_location(struct mail_index *index, struct mail_index_record *rec, uoff_t *offset, uoff_t *body_size); -void mbox_hide_headers(struct istream *input, buffer_t *dest, - struct message_size *hdr_size); +void mbox_read_headers(struct istream *input, buffer_t *dest); struct mail_index * mbox_index_alloc(const char *mbox_path, const char *index_dir, @@ -71,6 +69,6 @@ int mbox_index_rewrite(struct mail_index *index); struct istream *i_stream_create_mbox(pool_t pool, struct istream *input, - uoff_t body_size); + uoff_t offset, uoff_t body_size); #endif
--- a/src/lib-index/mbox/mbox-open.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-open.c Sun Nov 09 20:26:25 2003 +0200 @@ -27,7 +27,7 @@ if (!mbox_mail_get_location(index, rec, &offset, &body_size)) return NULL; - input = mbox_get_stream(index, offset, MAIL_LOCK_SHARED); + input = mbox_get_stream(index, MAIL_LOCK_SHARED); if (input == NULL) return NULL; @@ -35,6 +35,5 @@ *received_date = index->get_received_date(index, rec); i_assert(index->mbox_sync_counter == index->mbox_lock_counter); - - return i_stream_create_mbox(default_pool, input, body_size); + return i_stream_create_mbox(default_pool, input, offset, body_size); }
--- a/src/lib-index/mbox/mbox-rewrite.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-rewrite.c Sun Nov 09 20:26:25 2003 +0200 @@ -69,13 +69,11 @@ static int mbox_write(struct mail_index *index, struct istream *input, struct ostream *output, uoff_t end_offset) { - uoff_t old_limit; int failed; i_assert(input->v_offset <= end_offset); - old_limit = input->v_limit; - i_stream_set_read_limit(input, end_offset); + input = i_stream_create_limit(default_pool, input, 0, end_offset); if (o_stream_send_istream(output, input) < 0) { index_set_error(index, "Error rewriting mbox file %s: %s", index->mailbox_path, @@ -90,7 +88,7 @@ failed = FALSE; } - i_stream_set_read_limit(input, old_limit); + i_stream_unref(input); return !failed; } @@ -406,6 +404,7 @@ struct message_header_parser_ctx *hdr_ctx; struct message_header_line *hdr; struct message_size hdr_size; + struct istream *hdr_input; uoff_t offset; int force_filler; @@ -422,16 +421,19 @@ ctx.uid_last = index->header->next_uid-1; ctx.custom_flags = mail_custom_flags_list_get(index->custom_flags); - if (body_size == 0) { + if (body_size != 0) { + hdr_input = input; + i_stream_ref(hdr_input); + } else { /* possibly broken message, find the next From-line and make sure header parser won't pass it. */ offset = input->v_offset; mbox_skip_header(input); - i_stream_set_read_limit(input, input->v_offset); - i_stream_seek(input, offset); - } + hdr_input = i_stream_create_limit(default_pool, input, offset, + input->v_offset); + } - hdr_ctx = message_parse_header_init(input, &hdr_size); + hdr_ctx = message_parse_header_init(hdr_input, &hdr_size); while ((hdr = message_parse_header_next(hdr_ctx)) != NULL) { t_push(); write_header(&ctx, hdr); @@ -440,7 +442,7 @@ message_parse_header_deinit(hdr_ctx); *hdr_input_size = hdr_size.physical_size; - i_stream_set_read_limit(input, 0); + i_stream_unref(hdr_input); /* append the flag fields */ if (seq == 1 && !ctx.ximapbase_found) { @@ -476,7 +478,7 @@ static int fd_copy(struct mail_index *index, int in_fd, int out_fd, uoff_t out_offset, uoff_t size) { - struct istream *input; + struct istream *input, *input2; struct ostream *output; struct stat st; int ret; @@ -506,21 +508,22 @@ t_push(); - input = i_stream_create_mmap(in_fd, pool_datastack_create(), - 1024*256, 0, 0, FALSE); - i_stream_set_read_limit(input, size); + input = i_stream_create_file(in_fd, pool_datastack_create(), + 1024*256, FALSE); + input2 = i_stream_create_limit(pool_datastack_create(), input, 0, size); output = o_stream_create_file(out_fd, pool_datastack_create(), 1024, FALSE); o_stream_set_blocking(output, 60000, NULL, NULL); - ret = o_stream_send_istream(output, input); + ret = o_stream_send_istream(output, input2); if (ret < 0) { errno = output->stream_errno; mbox_set_syscall_error(index, "o_stream_send_istream()"); } o_stream_unref(output); + i_stream_unref(input2); i_stream_unref(input); t_pop(); @@ -625,7 +628,7 @@ break; } - input = mbox_get_stream(index, 0, MAIL_LOCK_EXCLUSIVE); + input = mbox_get_stream(index, MAIL_LOCK_EXCLUSIVE); if (input == NULL) break; @@ -714,6 +717,7 @@ break; } offset += hdr_size; + i_assert(input->v_offset == offset); if (dirty_found && offset - dirty_offset == output->offset) {
--- a/src/lib-index/mbox/mbox-sync-full.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-index/mbox/mbox-sync-full.c Sun Nov 09 20:26:25 2003 +0200 @@ -77,49 +77,58 @@ unsigned int *seq, struct istream *input, struct mail_index_record **next_rec, int *dirty) { - struct message_size hdr_parsed_size; struct mbox_header_context ctx; struct mail_index_record *first_rec, *last_rec; + struct istream *hdr_input; enum mail_index_record_flag index_flags; - uoff_t header_offset, body_offset, offset; - uoff_t hdr_size, body_size; + uoff_t header_offset, body_offset, offset, body_size, eoh_offset; unsigned char current_digest[16]; unsigned int first_seq, last_seq; - int ret, hdr_size_fixed; + int ret, hdr_parsed; *next_rec = NULL; /* skip the From-line */ skip_line(input); - if (input->eof) - return -1; header_offset = input->v_offset; first_rec = last_rec = NULL; first_seq = last_seq = 0; - ret = 0; hdr_size = 0; body_offset = 0; hdr_size_fixed = FALSE; + ret = 0; body_offset = 0; eoh_offset = (uoff_t)-1; hdr_parsed = FALSE; do { if (!mbox_mail_get_location(index, rec, &offset, &body_size)) return -1; - i_stream_seek(input, header_offset); - - if (body_size == 0 && !hdr_size_fixed) { + if (body_size == 0 && eoh_offset == (uoff_t)-1) { /* possibly broken message, find the next From-line and make sure header parser won't pass it. */ + i_stream_seek(input, header_offset); mbox_skip_header(input); - i_stream_set_read_limit(input, input->v_offset); - i_stream_seek(input, header_offset); - hdr_size_fixed = TRUE; - hdr_size = 0; + eoh_offset = input->v_offset; + hdr_parsed = FALSE; } - if (hdr_size == 0) { + if (!hdr_parsed) { /* get the MD5 sum of fixed headers and the current message flags in Status and X-Status fields */ - mbox_header_init_context(&ctx, index, input); - message_parse_header(NULL, input, &hdr_parsed_size, + if (eoh_offset == (uoff_t)-1) + hdr_input = input; + else { + hdr_input = i_stream_create_limit(default_pool, + input, 0, eoh_offset); + } + i_stream_seek(hdr_input, header_offset); + + mbox_header_init_context(&ctx, index, hdr_input); + message_parse_header(NULL, hdr_input, NULL, mbox_header_cb, &ctx); + + hdr_parsed = TRUE; + body_offset = hdr_input->v_offset; + + if (eoh_offset != (uoff_t)-1) + i_stream_unref(hdr_input); + hdr_input = NULL; md5_final(&ctx.md5, current_digest); if (*seq == 1) { @@ -135,10 +144,6 @@ ctx.uid_last+1; } } - - i_stream_set_read_limit(input, 0); - - body_offset = input->v_offset; } if (verify_header(index, rec, ctx.uid, current_digest) && @@ -277,7 +282,7 @@ index->header->flags &= ~MAIL_INDEX_HDR_FLAG_DIRTY_MESSAGES; } - if (input->eof || (index->set_flags & MAIL_INDEX_HDR_FLAG_REBUILD)) + if ((index->set_flags & MAIL_INDEX_HDR_FLAG_REBUILD)) return TRUE; else return mbox_index_append_stream(index, input); @@ -292,7 +297,7 @@ i_assert(index->lock_type == MAIL_LOCK_EXCLUSIVE); - input = mbox_get_stream(index, 0, MAIL_LOCK_SHARED); + input = mbox_get_stream(index, MAIL_LOCK_SHARED); if (input == NULL) return FALSE; @@ -316,7 +321,7 @@ if (!mbox_unlock(index)) return FALSE; - input = mbox_get_stream(index, 0, MAIL_LOCK_EXCLUSIVE); + input = mbox_get_stream(index, MAIL_LOCK_EXCLUSIVE); if (input == NULL) return FALSE;
--- a/src/lib-mail/message-body-search.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-mail/message-body-search.c Sun Nov 09 20:26:25 2003 +0200 @@ -275,7 +275,6 @@ buffer_t *decodebuf; pool_t pool; size_t data_size, pos; - uoff_t old_limit; ssize_t ret; int found; @@ -302,9 +301,8 @@ i_stream_skip(input, part->physical_pos + part->header_size.physical_size - input->v_offset); - old_limit = input->v_limit; - i_stream_set_read_limit(input, input->v_offset + - part->body_size.physical_size); + input = i_stream_create_limit(default_pool, input, 0, + part->body_size.physical_size); found = FALSE; pos = 0; while (i_stream_read_data(input, &data, &data_size, pos) > 0) { @@ -347,7 +345,7 @@ pos -= data_size; } - i_stream_set_read_limit(input, old_limit); + i_stream_unref(input); if (ctx->translation != NULL) charset_to_utf8_end(ctx->translation);
--- a/src/lib-storage/index/mbox/mbox-expunge.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-storage/index/mbox/mbox-expunge.c Sun Nov 09 20:26:25 2003 +0200 @@ -37,7 +37,7 @@ /* mbox must be already opened, synced and locked at this point. we just want the istream. */ - input = mbox_get_stream(ibox->index, 0, MAIL_LOCK_EXCLUSIVE); + input = mbox_get_stream(ibox->index, MAIL_LOCK_EXCLUSIVE); if (input == NULL) return NULL; @@ -58,9 +58,9 @@ static int mbox_move_data(struct mbox_expunge_context *ctx) { + struct istream *input; const unsigned char *data; size_t size; - uoff_t old_limit; int failed; i_stream_seek(ctx->input, ctx->move_offset); @@ -76,15 +76,16 @@ i_stream_skip(ctx->input, 2); } - old_limit = ctx->input->v_limit; - i_stream_set_read_limit(ctx->input, ctx->from_offset); - failed = o_stream_send_istream(ctx->output, ctx->input) < 0; - i_stream_set_read_limit(ctx->input, old_limit); + if (ctx->from_offset == 0) + failed = o_stream_send_istream(ctx->output, ctx->input) < 0; + else { + input = i_stream_create_limit(default_pool, ctx->input, + 0, ctx->from_offset); + failed = o_stream_send_istream(ctx->output, ctx->input) < 0; + i_stream_unref(input); + } - if (failed || (ctx->input->v_offset != ctx->from_offset && - ctx->from_offset != 0)) - return FALSE; - return TRUE; + return !failed; } int mbox_storage_expunge_deinit(struct mail_expunge_context *_ctx) @@ -94,7 +95,7 @@ if (ctx->expunges) { if (!failed && ctx->move_offset != (uoff_t)-1) { - ctx->from_offset = ctx->input->v_limit; + ctx->from_offset = 0; if (!mbox_move_data(ctx)) failed = TRUE; } else if (failed && ctx->output->offset > 0) {
--- a/src/lib-storage/mail-save.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib-storage/mail-save.c Sun Nov 09 20:26:25 2003 +0200 @@ -167,8 +167,7 @@ errno = input->stream_errno; if (errno == 0) { /* EOF */ - if (input->v_offset != input->v_limit && - input->v_limit != 0) { + if (input->eof) { /* too early */ mail_storage_set_error(storage, "Unexpected EOF");
--- a/src/lib/Makefile.am Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/Makefile.am Sun Nov 09 20:26:25 2003 +0200 @@ -23,6 +23,7 @@ istream.c \ istream-data.c \ istream-file.c \ + istream-limit.c \ istream-mmap.c \ ioloop.c \ ioloop-notify-none.c \
--- a/src/lib/iostream-internal.h Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/iostream-internal.h Sun Nov 09 20:26:25 2003 +0200 @@ -23,7 +23,7 @@ void (*timeout_cb)(void *), void *context); #define GET_TIMEOUT_TIME(fstream) \ - ((fstream)->timeout_msecs == 0 ? 0 : \ + ((fstream)->timeout_msecs <= 0 ? 0 : \ time(NULL) + ((fstream)->timeout_msecs / 1000)) #define STREAM_IS_BLOCKING(fstream) \ ((fstream)->timeout_msecs != 0)
--- a/src/lib/istream-data.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream-data.c Sun Nov 09 20:26:25 2003 +0200 @@ -23,8 +23,9 @@ { } -static ssize_t _read(struct _istream *stream __attr_unused__) +static ssize_t _read(struct _istream *stream) { + stream->istream.eof = TRUE; return -1; } @@ -34,10 +35,9 @@ stream->istream.v_offset = v_offset; } -static void _skip(struct _istream *stream, uoff_t count) +static uoff_t _get_size(struct _istream *stream) { - stream->skip += count; - stream->istream.v_offset += count; + return stream->pos; } struct istream *i_stream_create_from_data(pool_t pool, const void *data, @@ -55,8 +55,8 @@ stream->iostream.set_blocking = _set_blocking; stream->read = _read; - stream->skip_count = _skip; stream->seek = _seek; + stream->get_size = _get_size; - return _i_stream_create(stream, pool, -1, 0, size); + return _i_stream_create(stream, pool, -1, 0); }
--- a/src/lib/istream-file.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream-file.c Sun Nov 09 20:26:25 2003 +0200 @@ -106,7 +106,6 @@ { struct file_istream *fstream = (struct file_istream *) stream; time_t timeout_time; - uoff_t read_limit; size_t size; ssize_t ret; @@ -130,22 +129,6 @@ } size = stream->buffer_size - stream->pos; - if (stream->istream.v_limit > 0) { - i_assert(stream->istream.v_limit >= stream->istream.v_offset); - - read_limit = stream->istream.v_limit - - stream->istream.v_offset + fstream->skip_left; - if (read_limit <= stream->pos - stream->skip) { - /* virtual limit reached == EOF */ - stream->istream.eof = TRUE; - return -1; - } - - read_limit -= stream->pos - stream->skip; - if (size > read_limit) - size = read_limit; - } - timeout_time = GET_TIMEOUT_TIME(fstream); ret = -1; @@ -161,7 +144,7 @@ if (fstream->file) { ret = pread(stream->fd, stream->w_buffer + stream->pos, size, - stream->istream.start_offset + + stream->abs_start_offset + stream->istream.v_offset + (stream->pos - stream->skip)); } else { @@ -170,7 +153,6 @@ } if (ret == 0) { /* EOF */ - stream->istream.stream_errno = 0; stream->istream.eof = TRUE; return -1; } @@ -178,7 +160,6 @@ if (ret < 0) { if (errno == ECONNRESET || errno == ETIMEDOUT) { /* treat as disconnection */ - stream->istream.stream_errno = 0; stream->istream.eof = TRUE; return -1; } @@ -193,6 +174,8 @@ if (ret > 0 && fstream->skip_left > 0) { i_assert(!fstream->file); + i_assert(stream->skip == stream->pos); + if (fstream->skip_left >= (size_t)ret) { fstream->skip_left -= ret; ret = 0; @@ -209,25 +192,16 @@ return ret; } -static void _skip(struct _istream *stream, uoff_t count) -{ - struct file_istream *fstream = (struct file_istream *) stream; - - i_assert(stream->skip == stream->pos); - - if (!fstream->file) - fstream->skip_left += count; - stream->istream.v_offset += count; - stream->skip = stream->pos = 0; -} - static void _seek(struct _istream *stream, uoff_t v_offset) { struct file_istream *fstream = (struct file_istream *) stream; if (!fstream->file) { - stream->istream.stream_errno = ESPIPE; - return; + if (v_offset < stream->istream.v_offset) { + stream->istream.stream_errno = ESPIPE; + return; + } + fstream->skip_left += v_offset - stream->istream.v_offset; } stream->istream.stream_errno = 0; @@ -235,6 +209,17 @@ stream->skip = stream->pos = 0; } +static uoff_t _get_size(struct _istream *stream) +{ + struct file_istream *fstream = (struct file_istream *) stream; + struct stat st; + + if (fstream->file && fstat(stream->fd, &st) == 0 && S_ISREG(st.st_mode)) + return (uoff_t)st.st_size; + else + return (uoff_t)-1; +} + struct istream *i_stream_create_file(int fd, pool_t pool, size_t max_buffer_size, int autoclose_fd) { @@ -251,12 +236,12 @@ fstream->istream.iostream.set_blocking = _set_blocking; fstream->istream.read = _read; - fstream->istream.skip_count = _skip; fstream->istream.seek = _seek; + fstream->istream.get_size = _get_size; /* get size of fd if it's a file */ if (fstat(fd, &st) == 0 && S_ISREG(st.st_mode)) fstream->file = TRUE; - return _i_stream_create(&fstream->istream, pool, fd, 0, 0); + return _i_stream_create(&fstream->istream, pool, fd, 0); }
--- a/src/lib/istream-internal.h Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream-internal.h Sun Nov 09 20:26:25 2003 +0200 @@ -10,8 +10,8 @@ /* methods: */ ssize_t (*read)(struct _istream *stream); - void (*skip_count)(struct _istream *stream, uoff_t count); void (*seek)(struct _istream *stream, uoff_t v_offset); + uoff_t (*get_size)(struct _istream *stream); /* data: */ struct istream istream; @@ -20,11 +20,12 @@ const unsigned char *buffer; unsigned char *w_buffer; /* may be NULL */ size_t buffer_size; + uoff_t abs_start_offset; - size_t skip, pos, high_pos; + size_t skip, pos; }; struct istream *_i_stream_create(struct _istream *_buf, pool_t pool, int fd, - uoff_t start_offset, uoff_t v_size); + uoff_t abs_start_offset); #endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-limit.c Sun Nov 09 20:26:25 2003 +0200 @@ -0,0 +1,123 @@ +/* Copyright (C) 2003 Timo Sirainen */ + +#include "lib.h" +#include "istream-internal.h" + +struct limit_istream { + struct _istream istream; + + struct istream *input; + uoff_t v_start_offset, v_size; +}; + +static void _close(struct _iostream *stream __attr_unused__) +{ +} + +static void _destroy(struct _iostream *stream) +{ + struct limit_istream *lstream = (struct limit_istream *) stream; + + /* get to same position in parent stream */ + i_stream_seek(lstream->input, lstream->v_start_offset + + lstream->istream.istream.v_offset); + i_stream_unref(lstream->input); +} + +static void _set_max_buffer_size(struct _iostream *stream, size_t max_size) +{ + struct limit_istream *lstream = (struct limit_istream *) stream; + + i_stream_set_max_buffer_size(lstream->input, max_size); +} + +static void _set_blocking(struct _iostream *stream, int timeout_msecs, + void (*timeout_cb)(void *), void *context) +{ + struct limit_istream *lstream = (struct limit_istream *) stream; + + i_stream_set_blocking(lstream->input, timeout_msecs, + timeout_cb, context); +} + +static ssize_t _read(struct _istream *stream) +{ + struct limit_istream *lstream = (struct limit_istream *) stream; + uoff_t left; + ssize_t ret; + size_t pos; + + if (stream->istream.v_offset >= lstream->v_size) + return -1; + + if (lstream->input->v_offset != + lstream->v_start_offset + stream->istream.v_offset) { + i_stream_seek(lstream->input, + lstream->v_start_offset + stream->istream.v_offset); + } + + if (i_stream_read(lstream->input) == -2 && stream->buffer != NULL) { + if (lstream->istream.skip == 0) + return -2; + stream->istream.eof = lstream->input->eof; + } + + stream->pos -= stream->skip; + stream->skip = 0; + stream->buffer = i_stream_get_data(lstream->input, &pos); + + left = lstream->v_size - stream->istream.v_offset; + if (pos > left) + pos = left; + + ret = pos <= lstream->istream.pos ? -1 : + (ssize_t) (pos - stream->pos); + lstream->istream.pos = pos; + return ret; +} + +static void _seek(struct _istream *stream, uoff_t v_offset) +{ + stream->istream.stream_errno = 0; + stream->istream.v_offset = v_offset; + stream->skip = stream->pos = 0; +} + +static uoff_t _get_size(struct _istream *stream) +{ + struct limit_istream *lstream = (struct limit_istream *) stream; + + return lstream->v_size != (uoff_t)-1 ? lstream->v_size : + i_stream_get_size(lstream->input); +} + +struct istream *i_stream_create_limit(pool_t pool, struct istream *input, + uoff_t v_start_offset, uoff_t v_size) +{ + struct limit_istream *lstream; + + i_stream_ref(input); + + lstream = p_new(pool, struct limit_istream, 1); + lstream->input = input; + lstream->v_start_offset = v_start_offset; + lstream->v_size = v_size; + + lstream->istream.istream.v_offset = + input->v_offset < v_start_offset ? 0 : + input->v_offset - v_start_offset > v_size ? v_size : + input->v_offset - v_start_offset; + + lstream->istream.iostream.close = _close; + lstream->istream.iostream.destroy = _destroy; + lstream->istream.iostream.set_max_buffer_size = _set_max_buffer_size; + lstream->istream.iostream.set_blocking = _set_blocking; + + lstream->istream.read = _read; + lstream->istream.seek = _seek; + lstream->istream.get_size = _get_size; + + return _i_stream_create(&lstream->istream, pool, i_stream_get_fd(input), + input->real_stream->abs_start_offset + + v_start_offset); +}
--- a/src/lib/istream-mmap.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream-mmap.c Sun Nov 09 20:26:25 2003 +0200 @@ -14,6 +14,7 @@ void *mmap_base; off_t mmap_offset; size_t mmap_block_size; + uoff_t v_size; unsigned int autoclose_fd:1; }; @@ -75,38 +76,23 @@ /* we never block */ } -static ssize_t io_stream_set_mmaped_pos(struct _istream *stream) -{ - struct mmap_istream *mstream = (struct mmap_istream *) stream; - uoff_t top; - - i_assert((uoff_t)mstream->mmap_offset <= - stream->istream.start_offset + stream->istream.v_limit); - - top = stream->istream.start_offset + stream->istream.v_limit - - mstream->mmap_offset; - stream->pos = I_MIN(top, stream->buffer_size); - - return stream->pos - stream->skip; -} - static ssize_t _read(struct _istream *stream) { struct mmap_istream *mstream = (struct mmap_istream *) stream; - size_t aligned_skip, limit_size; + size_t aligned_skip; uoff_t top; - if (stream->istream.start_offset + stream->istream.v_limit <= - (uoff_t)mstream->mmap_offset + stream->pos) { - /* end of file */ - stream->istream.stream_errno = 0; - stream->istream.eof = TRUE; - return -1; - } + stream->istream.stream_errno = 0; if (stream->pos < stream->buffer_size) { /* more bytes available without needing to mmap() */ - return io_stream_set_mmaped_pos(stream); + stream->pos = stream->buffer_size; + return stream->pos - stream->skip; + } + + if (stream->istream.v_offset >= mstream->v_size) { + stream->istream.eof = TRUE; + return -1; } aligned_skip = stream->skip & ~mmap_pagemask; @@ -123,12 +109,11 @@ i_error("io_stream_read_mmaped(): munmap() failed: %m"); } - top = stream->istream.start_offset + stream->istream.v_size - - mstream->mmap_offset; + top = stream->abs_start_offset + mstream->v_size - mstream->mmap_offset; stream->buffer_size = I_MIN(top, mstream->mmap_block_size); i_assert((uoff_t)mstream->mmap_offset + stream->buffer_size <= - stream->istream.start_offset + stream->istream.v_size); + stream->abs_start_offset + mstream->v_size); mstream->mmap_base = mmap(NULL, stream->buffer_size, PROT_READ, MAP_PRIVATE, @@ -144,20 +129,14 @@ } stream->buffer = mstream->mmap_base; - /* madvise() only if non-limited mmap()ed buffer area larger than - page size */ - limit_size = stream->istream.start_offset + stream->istream.v_limit - - mstream->mmap_offset; - if (limit_size > mmap_pagesize) { - if (limit_size > stream->buffer_size) - limit_size = stream->buffer_size; - - if (madvise(mstream->mmap_base, limit_size, + if (stream->buffer_size > mmap_pagesize) { + if (madvise(mstream->mmap_base, stream->buffer_size, MADV_SEQUENTIAL) < 0) i_error("mmap_istream.madvise(): %m"); } - return io_stream_set_mmaped_pos(stream); + stream->pos = stream->buffer_size; + return stream->pos - stream->skip; } static void _seek(struct _istream *stream, uoff_t v_offset) @@ -165,7 +144,7 @@ struct mmap_istream *mstream = (struct mmap_istream *) stream; uoff_t abs_offset; - abs_offset = stream->istream.start_offset + v_offset; + abs_offset = stream->abs_start_offset + v_offset; if (stream->buffer_size != 0 && (uoff_t)mstream->mmap_offset <= abs_offset && (uoff_t)mstream->mmap_offset + stream->buffer_size > abs_offset) { @@ -180,9 +159,11 @@ stream->istream.v_offset = v_offset; } -static void _skip(struct _istream *stream, uoff_t count) +static uoff_t _get_size(struct _istream *stream) { - _seek(stream, stream->istream.v_offset + count); + struct mmap_istream *mstream = (struct mmap_istream *) stream; + + return mstream->v_size; } struct istream *i_stream_create_mmap(int fd, pool_t pool, size_t block_size, @@ -199,10 +180,9 @@ } if (v_size == 0) { - if (fstat(fd, &st) < 0) { + if (fstat(fd, &st) < 0) i_error("i_stream_create_mmap(): fstat() failed: %m"); - v_size = 0; - } else { + else { v_size = st.st_size; if (start_offset > v_size) start_offset = v_size; @@ -214,6 +194,7 @@ mstream->fd = fd; _set_max_buffer_size(&mstream->istream.iostream, block_size); mstream->autoclose_fd = autoclose_fd; + mstream->v_size = v_size; mstream->istream.iostream.close = _close; mstream->istream.iostream.destroy = _destroy; @@ -221,11 +202,10 @@ mstream->istream.iostream.set_blocking = _set_blocking; mstream->istream.read = _read; - mstream->istream.skip_count = _skip; mstream->istream.seek = _seek; + mstream->istream.get_size = _get_size; - istream = _i_stream_create(&mstream->istream, pool, fd, - start_offset, v_size); + istream = _i_stream_create(&mstream->istream, pool, fd, start_offset); istream->mmaped = TRUE; return istream; }
--- a/src/lib/istream.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream.c Sun Nov 09 20:26:25 2003 +0200 @@ -39,56 +39,6 @@ timeout_cb, context); } -void i_stream_set_start_offset(struct istream *stream, uoff_t offset) -{ - struct _istream *_stream = stream->real_stream; - off_t diff; - - i_assert(stream->v_size == 0 || - offset <= stream->start_offset + stream->v_size); - - if (offset == stream->start_offset) - return; - - diff = (off_t)stream->start_offset - (off_t)offset; - stream->start_offset = offset; - stream->v_offset += diff; - if (stream->v_size != 0) - stream->v_size += diff; - if (stream->v_limit != 0) - stream->v_limit += diff; - - /* reset buffer data */ - _stream->skip = _stream->pos = _stream->high_pos = 0; -} - -void i_stream_set_read_limit(struct istream *stream, uoff_t v_offset) -{ - struct _istream *_stream = stream->real_stream; - uoff_t max_pos; - - i_assert(stream->v_size == 0 || v_offset <= stream->v_size); - - stream->eof = FALSE; - if (_stream->high_pos != 0) { - _stream->pos = _stream->high_pos; - _stream->high_pos = 0; - } - - if (v_offset == 0) - stream->v_limit = stream->v_size; - else { - i_assert(v_offset >= stream->v_offset); - - stream->v_limit = v_offset; - max_pos = v_offset - stream->v_offset + _stream->skip; - if (_stream->pos > max_pos) { - _stream->high_pos = _stream->pos; - _stream->pos = max_pos; - } - } -} - ssize_t i_stream_read(struct istream *stream) { struct _istream *_stream = stream->real_stream; @@ -96,11 +46,6 @@ if (stream->closed) return -1; - if (_stream->pos < _stream->high_pos) { - /* virtual limit reached */ - return -1; - } - stream->eof = FALSE; return _stream->read(_stream); } @@ -110,44 +55,44 @@ struct _istream *_stream = stream->real_stream; size_t data_size; - i_assert(stream->v_size == 0 || - stream->v_offset + count <= stream->v_size); - data_size = _stream->pos - _stream->skip; if (count <= data_size) { + /* within buffer */ stream->v_offset += count; _stream->skip += count; return; } - if (stream->closed) - return; - + /* have to seek forward */ count -= data_size; _stream->skip = _stream->pos; stream->v_offset += data_size; - if (_stream->pos < _stream->high_pos) { - /* virtual limit reached */ - } else { - _stream->skip_count(_stream, count); - } + if (stream->closed) + return; + + _stream->seek(_stream, stream->v_offset + count); } void i_stream_seek(struct istream *stream, uoff_t v_offset) { struct _istream *_stream = stream->real_stream; - i_assert(stream->v_size == 0 || v_offset <= stream->v_size); - if (stream->closed) return; - stream->eof = FALSE; - _stream->high_pos = 0; + if (v_offset < stream->v_offset) + stream->eof = FALSE; _stream->seek(_stream, v_offset); } +uoff_t i_stream_get_size(struct istream *stream) +{ + struct _istream *_stream = stream->real_stream; + + return _stream->get_size(_stream); +} + char *i_stream_next_line(struct istream *stream) { struct _istream *_stream = stream->real_stream; @@ -247,14 +192,128 @@ } struct istream *_i_stream_create(struct _istream *_stream, pool_t pool, int fd, - uoff_t start_offset, uoff_t v_size) + uoff_t abs_start_offset) { _stream->fd = fd; - _stream->istream.start_offset = start_offset; - _stream->istream.v_size = v_size; - _stream->istream.v_limit = v_size; + _stream->abs_start_offset = abs_start_offset; _stream->istream.real_stream = _stream; _io_stream_init(pool, &_stream->iostream); return &_stream->istream; } + +#ifdef STREAM_TEST +/* gcc istream.c -o teststream liblib.a -Wall -DHAVE_CONFIG_H -DSTREAM_TEST -g */ + +#include <fcntl.h> +#include <unistd.h> +#include "ostream.h" + +#define BUF_VALUE(offset) \ + (((offset) % 256) ^ ((offset) / 256)) + +static void check_buffer(const unsigned char *data, size_t size, size_t offset) +{ + size_t i; + + for (i = 0; i < size; i++) + i_assert(data[i] == BUF_VALUE(i+offset)); +} + +int main(void) +{ + struct istream *input, *l_input; + struct ostream *output1, *output2; + int i, fd1, fd2; + unsigned char buf[1024]; + const unsigned char *data; + size_t size; + + lib_init(); + + fd1 = open("teststream.1", O_RDWR | O_CREAT | O_TRUNC, 0600); + if (fd1 < 0) + i_fatal("open() failed: %m"); + fd2 = open("teststream.2", O_RDWR | O_CREAT | O_TRUNC, 0600); + if (fd2 < 0) + i_fatal("open() failed: %m"); + + /* write initial data */ + for (i = 0; i < sizeof(buf); i++) + buf[i] = BUF_VALUE(i); + write(fd1, buf, sizeof(buf)); + + /* test reading */ + input = i_stream_create_file(fd1, default_pool, 512, FALSE); + i_assert(i_stream_get_size(input) == sizeof(buf)); + + i_assert(i_stream_read_data(input, &data, &size, 0) > 0); + i_assert(size == 512); + check_buffer(data, size, 0); + + i_stream_seek(input, 256); + i_assert(i_stream_read_data(input, &data, &size, 0) > 0); + i_assert(size == 512); + check_buffer(data, size, 256); + + i_stream_seek(input, 0); + i_assert(i_stream_read_data(input, &data, &size, 512) == -2); + i_assert(size == 512); + check_buffer(data, size, 0); + + i_stream_skip(input, 900); + i_assert(i_stream_read_data(input, &data, &size, 0) > 0); + i_assert(size == sizeof(buf) - 900); + check_buffer(data, size, 900); + + /* test moving data */ + output1 = o_stream_create_file(fd1, default_pool, 512, FALSE); + output2 = o_stream_create_file(fd2, default_pool, 512, FALSE); + + i_stream_seek(input, 1); size = sizeof(buf)-1; + i_assert(o_stream_send_istream(output2, input) == size); + o_stream_flush(output2); + + lseek(fd2, 0, SEEK_SET); + i_assert(read(fd2, buf, sizeof(buf)) == size); + check_buffer(buf, size, 1); + + i_stream_seek(input, 0); + o_stream_seek(output1, sizeof(buf)); + i_assert(o_stream_send_istream(output1, input) == sizeof(buf)); + + /* test moving with limits */ + l_input = i_stream_create_limit(default_pool, input, + sizeof(buf)/2, 512); + i_stream_seek(l_input, 0); + o_stream_seek(output1, 10); + i_assert(o_stream_send_istream(output1, l_input) == 512); + + i_stream_set_max_buffer_size(input, sizeof(buf)); + + i_stream_seek(input, 0); + i_assert(i_stream_read_data(input, &data, &size, sizeof(buf)-1) > 0); + i_assert(size == sizeof(buf)); + check_buffer(data, 10, 0); + check_buffer(data + 10, 512, sizeof(buf)/2); + check_buffer(data + 10 + 512, + size - (10 + 512), 10 + 512); + + /* reading within limits */ + i_stream_seek(l_input, 0); + i_assert(i_stream_read_data(l_input, &data, &size, 511) > 0); + i_assert(size == 512); + i_assert(i_stream_read_data(l_input, &data, &size, 512) == -2); + i_assert(size == 512); + i_stream_skip(l_input, 511); + i_assert(i_stream_read_data(l_input, &data, &size, 0) > 0); + i_assert(size == 1); + i_stream_skip(l_input, 1); + i_assert(i_stream_read_data(l_input, &data, &size, 0) == -1); + i_assert(size == 0); + + unlink("teststream.1"); + unlink("teststream.2"); + return 0; +} +#endif
--- a/src/lib/istream.h Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/istream.h Sun Nov 09 20:26:25 2003 +0200 @@ -2,8 +2,7 @@ #define __ISTREAM_H struct istream { - uoff_t start_offset; - uoff_t v_offset, v_size, v_limit; /* relative to start_offset */ + uoff_t v_offset; int stream_errno; unsigned int mmaped:1; /* be careful when copying data */ @@ -20,6 +19,8 @@ int autoclose_fd); struct istream *i_stream_create_from_data(pool_t pool, const void *data, size_t size); +struct istream *i_stream_create_limit(pool_t pool, struct istream *input, + uoff_t v_start_offset, uoff_t v_size); /* Reference counting. References start from 1, so calling i_stream_unref() destroys the stream if i_stream_ref() is never used. */ @@ -36,12 +37,6 @@ /* Change the maximum size for stream's input buffer to grow. Useful only for buffered streams (currently only file). */ void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size); -/* Change the start_offset and drop all data in buffers. Doesn't do anything - if offset is the same as existing start_offset. */ -void i_stream_set_start_offset(struct istream *stream, uoff_t offset); -/* Stream won't be read past specified offset. Giving 0 as offset - removes the limit. */ -void i_stream_set_read_limit(struct istream *stream, uoff_t v_offset); /* Makes reads blocking until at least one byte is read. timeout_cb is called if nothing is read in specified time. Setting timeout_msecs to 0 makes it non-blocking. This call changes non-blocking state of file @@ -58,6 +53,8 @@ /* Seek to specified position from beginning of file. Never fails, the next read tells if it was successful. This works only for files. */ void i_stream_seek(struct istream *stream, uoff_t v_offset); +/* Returns size of the stream, or (uoff_t)-1 if unknown */ +uoff_t i_stream_get_size(struct istream *stream); /* Gets the next line from stream and returns it, or NULL if more data is needed to make a full line. NOTE: modifies the data in buffer for the \0, so it works only with buffered streams (currently only file). */
--- a/src/lib/ostream-file.c Sun Nov 09 20:23:20 2003 +0200 +++ b/src/lib/ostream-file.c Sun Nov 09 20:26:25 2003 +0200 @@ -502,7 +502,8 @@ } static off_t io_stream_sendfile(struct _ostream *outstream, - struct istream *instream, int in_fd) + struct istream *instream, + int in_fd, uoff_t in_size) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; @@ -536,12 +537,17 @@ break; } - offset = instream->start_offset + v_offset; - send_size = instream->v_limit - v_offset; + offset = instream->real_stream->abs_start_offset + v_offset; + send_size = in_size - v_offset; ret = safe_sendfile(foutstream->fd, in_fd, &offset, MAX_SSIZE_T(send_size)); - if (ret < 0) { + if (ret <= 0) { + if (ret == 0) { + /* EOF */ + break; + } + if (errno != EINTR && errno != EAGAIN) { outstream->ostream.stream_errno = errno; if (errno != EINVAL) { @@ -568,7 +574,7 @@ } static off_t io_stream_copy(struct _ostream *outstream, - struct istream *instream, int overlapping) + struct istream *instream, uoff_t in_size) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; @@ -576,7 +582,7 @@ struct iovec iov[3]; int iov_len; const unsigned char *data; - size_t size, skip_size; + size_t size, skip_size, block_size; ssize_t ret; int pos; @@ -587,14 +593,11 @@ for (pos = 0; pos < iov_len; pos++) skip_size += iov[pos].iov_len; - i_assert(!overlapping || iov_len == 0); - start_offset = instream->v_offset; - for (;;) { - if (overlapping) - i_stream_seek(instream, instream->v_offset); - (void)i_stream_read_data(instream, &data, &size, - foutstream->optimal_block_size-1); + while (in_size > 0) { + block_size = I_MIN(foutstream->optimal_block_size, in_size); + (void)i_stream_read_data(instream, &data, &size, block_size-1); + in_size -= size; if (size == 0) { /* all sent */ @@ -605,12 +608,6 @@ iov[pos].iov_base = (void *) data; iov[pos].iov_len = size; - if (overlapping) { - if (o_stream_seek(&outstream->ostream, - outstream->ostream.offset) < 0) - return -1; - } - ret = o_stream_writev(foutstream, iov, iov_len); if (ret < 0) { /* error */ @@ -659,11 +656,11 @@ } static off_t io_stream_copy_backwards(struct _ostream *outstream, - struct istream *instream) + struct istream *instream, uoff_t in_size) { struct file_ostream *foutstream = (struct file_ostream *) outstream; time_t timeout_time; - uoff_t in_start_offset, in_offset, out_offset; + uoff_t in_start_offset, in_offset, in_limit, out_offset; const unsigned char *data; size_t buffer_size, size, read_size; ssize_t ret; @@ -685,12 +682,8 @@ } in_start_offset = instream->v_offset; - in_offset = instream->v_limit; - out_offset = outstream->ostream.offset + - (instream->v_limit - instream->v_offset); - - i_assert(instream->v_size == 0 || - out_offset <= instream->start_offset + instream->v_size); + in_offset = in_limit = in_size; + out_offset = outstream->ostream.offset + (in_offset - in_start_offset); while (in_offset > in_start_offset) { if (in_offset - in_start_offset <= buffer_size) @@ -701,10 +694,10 @@ out_offset -= read_size; for (;;) { - i_assert(in_offset <= instream->v_limit); + i_assert(in_offset <= in_limit); i_stream_seek(instream, in_offset); - read_size = instream->v_limit - in_offset; + read_size = in_limit - in_offset; (void)i_stream_read_data(instream, &data, &size, read_size-1); @@ -730,6 +723,7 @@ buffer_size -= read_size; } } + in_limit -= size; if (o_stream_seek(&outstream->ostream, out_offset) < 0) return -1; @@ -750,50 +744,46 @@ outstream->ostream.stream_errno = EAGAIN; return -1; } - - i_stream_set_read_limit(instream, in_offset); } - return (off_t) (instream->v_limit - in_start_offset); + return (off_t) (in_size - in_start_offset); } -static off_t send_istream_fd(struct _ostream *outstream, - struct istream *instream, int in_fd) +static off_t _send_istream(struct _ostream *outstream, struct istream *instream) { struct file_ostream *foutstream = (struct file_ostream *) outstream; - uoff_t old_limit; + uoff_t in_size; off_t ret; - int overlapping; + int in_fd, overlapping; - i_assert(instream->v_limit <= OFF_T_MAX); - i_assert(instream->v_offset <= instream->v_limit); + in_fd = i_stream_get_fd(instream); + in_size = i_stream_get_size(instream); + i_assert(instream->v_offset <= in_size); outstream->ostream.stream_errno = 0; - - if (instream->v_offset == instream->v_limit) - return 0; - if (in_fd != foutstream->fd) overlapping = 0; else { /* copying data within same fd. we'll have to be careful with seeks and overlapping writes. */ + if (in_size == (uoff_t)-1) { + outstream->ostream.stream_errno = EINVAL; + return -1; + } + ret = (off_t)outstream->ostream.offset - - (off_t)(instream->start_offset + instream->v_offset); + (off_t)(instream->real_stream->abs_start_offset + + instream->v_offset); if (ret == 0) { /* copying data over itself. we don't really need to do that, just fake it. */ - return instream->v_limit - instream->v_offset; + return in_size - instream->v_offset; } overlapping = ret < 0 ? -1 : 1; - - if (o_stream_seek(&outstream->ostream, - outstream->ostream.offset) < 0) - return -1; } if (!foutstream->no_sendfile && in_fd != -1 && overlapping <= 0) { - ret = io_stream_sendfile(outstream, instream, in_fd); + ret = io_stream_sendfile(outstream, instream, in_fd, in_size); if (ret >= 0 || outstream->ostream.stream_errno != EINVAL) return ret; @@ -804,36 +794,9 @@ } if (overlapping <= 0) - return io_stream_copy(outstream, instream, overlapping); - else { - old_limit = instream->v_limit; - ret = io_stream_copy_backwards(outstream, instream); - i_stream_set_read_limit(instream, old_limit); - return ret; - } -} - -static off_t _send_istream(struct _ostream *outstream, struct istream *instream) -{ - struct stat st; - int in_fd, ret; - - in_fd = i_stream_get_fd(instream); - if (fstat(in_fd, &st) < 0) { - outstream->ostream.stream_errno = errno; - return -1; - } - - if (instream->v_limit != 0) - return send_istream_fd(outstream, instream, in_fd); - else { - /* easier this way so we know exactly how much data we're - moving */ - i_stream_set_read_limit(instream, st.st_size); - ret = send_istream_fd(outstream, instream, in_fd); - i_stream_set_read_limit(instream, 0); - return ret; - } + return io_stream_copy(outstream, instream, in_size); + else + return io_stream_copy_backwards(outstream, instream, in_size); } struct ostream * @@ -883,7 +846,7 @@ fstream->no_socket_cork = TRUE; fstream->file = TRUE; - o_stream_set_blocking(ostream, 60000, 0, NULL); + o_stream_set_blocking(ostream, -1, 0, NULL); } } #ifndef HAVE_LINUX_SENDFILE