9559
|
1 /* Copyright (c) 2009 Dovecot authors, see the included COPYING file */
|
|
2
|
|
3 #include "lib.h"
|
|
4 #include "array.h"
|
|
5 #include "hash.h"
|
|
6 #include "master-service.h"
|
|
7 #include "dsync-worker.h"
|
|
8 #include "dsync-brain-private.h"
|
|
9
|
|
10 static void
|
|
11 dsync_brain_mailbox_list_deinit(struct dsync_brain_mailbox_list **list);
|
|
12 static void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **sync);
|
|
13
|
|
14 struct dsync_brain *dsync_brain_init(struct dsync_worker *src_worker,
|
|
15 struct dsync_worker *dest_worker)
|
|
16 {
|
|
17 struct dsync_brain *brain;
|
|
18
|
|
19 brain = i_new(struct dsync_brain, 1);
|
|
20 brain->src_worker = src_worker;
|
|
21 brain->dest_worker = dest_worker;
|
|
22 return brain;
|
|
23 }
|
|
24
|
|
25 static void dsync_brain_fail(struct dsync_brain *brain)
|
|
26 {
|
|
27 brain->failed = TRUE;
|
|
28 master_service_stop(master_service);
|
|
29 }
|
|
30
|
|
31 int dsync_brain_deinit(struct dsync_brain **_brain)
|
|
32 {
|
|
33 struct dsync_brain *brain = *_brain;
|
|
34 int ret = brain->failed ? -1 : 0;
|
|
35
|
|
36 if (brain->mailbox_sync != NULL)
|
|
37 dsync_brain_msg_sync_deinit(&brain->mailbox_sync);
|
|
38 if (brain->src_mailbox_list != NULL)
|
|
39 dsync_brain_mailbox_list_deinit(&brain->src_mailbox_list);
|
|
40 if (brain->dest_mailbox_list != NULL)
|
|
41 dsync_brain_mailbox_list_deinit(&brain->dest_mailbox_list);
|
|
42
|
|
43 *_brain = NULL;
|
|
44 i_free(brain);
|
|
45 return ret;
|
|
46 }
|
|
47
|
|
48 static void dsync_brain_mailbox_list_finished(struct dsync_brain *brain)
|
|
49 {
|
|
50 if (brain->src_mailbox_list->iter != NULL ||
|
|
51 brain->dest_mailbox_list->iter != NULL)
|
|
52 return;
|
|
53
|
|
54 /* both lists are finished */
|
|
55 brain->state++;
|
|
56 dsync_brain_sync(brain);
|
|
57 }
|
|
58
|
|
59 static void dsync_worker_mailbox_input(void *context)
|
|
60 {
|
|
61 struct dsync_brain_mailbox_list *list = context;
|
|
62 struct dsync_mailbox dsync_box, *dup_box;
|
|
63 int ret;
|
|
64
|
|
65 while ((ret = dsync_worker_mailbox_iter_next(list->iter,
|
|
66 &dsync_box)) > 0) {
|
|
67 dup_box = dsync_mailbox_dup(list->pool, &dsync_box);
|
|
68 array_append(&list->mailboxes, &dup_box, 1);
|
|
69 }
|
|
70 if (ret < 0) {
|
|
71 /* finished listing mailboxes */
|
|
72 if (dsync_worker_mailbox_iter_deinit(&list->iter) < 0)
|
|
73 dsync_brain_fail(list->brain);
|
|
74 array_sort(&list->mailboxes, dsync_mailbox_p_guid_cmp);
|
|
75 dsync_brain_mailbox_list_finished(list->brain);
|
|
76 }
|
|
77 }
|
|
78
|
|
79 static struct dsync_brain_mailbox_list *
|
|
80 dsync_brain_mailbox_list_init(struct dsync_brain *brain,
|
|
81 struct dsync_worker *worker)
|
|
82 {
|
|
83 struct dsync_brain_mailbox_list *list;
|
|
84 pool_t pool;
|
|
85
|
|
86 pool = pool_alloconly_create("dsync brain mailbox list", 10240);
|
|
87 list = p_new(pool, struct dsync_brain_mailbox_list, 1);
|
|
88 list->pool = pool;
|
|
89 list->brain = brain;
|
|
90 list->worker = worker;
|
|
91 list->iter = dsync_worker_mailbox_iter_init(worker);
|
|
92 p_array_init(&list->mailboxes, pool, 128);
|
|
93 dsync_worker_set_input_callback(worker, dsync_worker_mailbox_input,
|
|
94 list);
|
|
95 return list;
|
|
96 }
|
|
97
|
|
98 static void
|
|
99 dsync_brain_mailbox_list_deinit(struct dsync_brain_mailbox_list **_list)
|
|
100 {
|
|
101 struct dsync_brain_mailbox_list *list = *_list;
|
|
102
|
|
103 *_list = NULL;
|
|
104
|
|
105 if (list->iter != NULL)
|
|
106 (void)dsync_worker_mailbox_iter_deinit(&list->iter);
|
|
107 pool_unref(&list->pool);
|
|
108 }
|
|
109
|
|
110 static void dsync_brain_create_missing_mailboxes(struct dsync_brain *brain)
|
|
111 {
|
|
112 struct dsync_mailbox *const *src_boxes, *const *dest_boxes, new_box;
|
|
113 unsigned int src, dest, src_count, dest_count;
|
|
114 int ret;
|
|
115
|
|
116 /* FIXME: handle different hierarchy separators? */
|
|
117
|
|
118 memset(&new_box, 0, sizeof(new_box));
|
|
119
|
|
120 /* find mailboxes from source whose GUIDs don't exist in dest.
|
|
121 the mailboxes are sorted by GUID, so we can do this quickly. */
|
|
122 src_boxes = array_get(&brain->src_mailbox_list->mailboxes, &src_count);
|
|
123 dest_boxes = array_get(&brain->dest_mailbox_list->mailboxes, &dest_count);
|
|
124 for (src = dest = 0; src < src_count && dest < dest_count; ) {
|
|
125 ret = dsync_mailbox_guid_cmp(src_boxes[src], dest_boxes[dest]);
|
|
126 if (ret == 0) {
|
|
127 src++; dest++;
|
|
128 } else if (ret < 0) {
|
|
129 /* exists only in source */
|
|
130 new_box = *src_boxes[src];
|
|
131 new_box.uid_next = 0;
|
|
132 new_box.highest_modseq = 0;
|
|
133 dsync_worker_create_mailbox(brain->dest_worker,
|
|
134 &new_box);
|
|
135 src++;
|
|
136 } else {
|
|
137 /* exists only in dest */
|
|
138 dest++;
|
|
139 }
|
|
140 }
|
|
141 for (; src < src_count; src++) {
|
|
142 new_box = *src_boxes[src];
|
|
143 new_box.uid_next = 0;
|
|
144 new_box.highest_modseq = 0;
|
|
145 dsync_worker_create_mailbox(brain->dest_worker, &new_box);
|
|
146 }
|
|
147 }
|
|
148
|
|
149 static void dsync_brain_guid_add(struct dsync_brain_mailbox_sync *sync,
|
|
150 struct dsync_brain_msg_iter *iter)
|
|
151 {
|
|
152 struct dsync_brain_guid_instance *inst, *prev_inst;
|
|
153
|
|
154 inst = p_new(sync->pool, struct dsync_brain_guid_instance, 1);
|
|
155 inst->mailbox_idx = iter->mailbox_idx;
|
|
156 inst->uid = iter->msg.uid;
|
|
157
|
|
158 prev_inst = hash_table_lookup(sync->guid_hash, iter->msg.guid);
|
|
159 if (prev_inst == NULL) {
|
|
160 hash_table_insert(sync->guid_hash,
|
|
161 p_strdup(sync->pool, iter->msg.guid), inst);
|
|
162 } else {
|
|
163 inst->next = prev_inst->next;
|
|
164 prev_inst->next = inst;
|
|
165 }
|
|
166 }
|
|
167
|
|
168 static int dsync_brain_msg_iter_next(struct dsync_brain_msg_iter *iter)
|
|
169 {
|
|
170 int ret = 1;
|
|
171
|
|
172 if (iter->msg.guid == NULL) {
|
|
173 ret = dsync_worker_msg_iter_next(iter->iter, &iter->mailbox_idx,
|
|
174 &iter->msg);
|
|
175 if (ret > 0) {
|
|
176 if (iter->save_guids)
|
|
177 dsync_brain_guid_add(iter->sync, iter);
|
|
178 }
|
|
179 }
|
|
180
|
|
181 if (iter->wanted_mailbox_idx != iter->mailbox_idx) {
|
|
182 /* finished with this mailbox */
|
|
183 return -1;
|
|
184 }
|
|
185 return ret;
|
|
186 }
|
|
187
|
|
188 static int dsync_brain_msg_iter_next_pair(struct dsync_brain_mailbox_sync *sync)
|
|
189 {
|
|
190 int ret;
|
|
191
|
|
192 if ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) <= 0)
|
|
193 return ret;
|
|
194 if ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) <= 0)
|
|
195 return ret;
|
|
196 return 1;
|
|
197 }
|
|
198
|
|
199 static void
|
|
200 dsync_brain_msg_sync_save_source(struct dsync_brain_mailbox_sync *sync)
|
|
201 {
|
|
202 struct dsync_brain_new_msg *new_msg;
|
|
203
|
|
204 new_msg = array_append_space(&sync->new_msgs);
|
|
205 new_msg->mailbox_idx = sync->src_msg_iter->mailbox_idx;
|
|
206 new_msg->msg = dsync_message_dup(sync->pool, &sync->src_msg_iter->msg);
|
|
207 }
|
|
208
|
|
209 static void dsync_brain_msg_sync_existing(struct dsync_brain *brain,
|
|
210 struct dsync_message *src_msg,
|
|
211 struct dsync_message *dest_msg)
|
|
212 {
|
|
213 if (src_msg->flags != dest_msg->flags ||
|
|
214 src_msg->modseq > dest_msg->modseq ||
|
|
215 !dsync_keyword_list_equals(src_msg->keywords, dest_msg->keywords))
|
|
216 dsync_worker_msg_update_metadata(brain->dest_worker, src_msg);
|
|
217 }
|
|
218
|
|
219 static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync)
|
|
220 {
|
|
221 struct dsync_message *src_msg = &sync->src_msg_iter->msg;
|
|
222 struct dsync_message *dest_msg = &sync->dest_msg_iter->msg;
|
|
223 struct dsync_brain_uid_conflict *conflict;
|
|
224
|
|
225 if (src_msg->uid < dest_msg->uid) {
|
|
226 /* message has been expunged from dest. ignore it, unless
|
|
227 we're in uid-conflict mode. */
|
|
228 if (sync->uid_conflict)
|
|
229 dsync_brain_msg_sync_save_source(sync);
|
|
230 src_msg->guid = NULL;
|
|
231 } else if (src_msg->uid > dest_msg->uid) {
|
|
232 /* message has been expunged from src. expunge it from dest
|
|
233 too, unless we're in uid-conflict mode. */
|
|
234 if (!sync->uid_conflict) {
|
|
235 dsync_worker_msg_expunge(sync->brain->dest_worker,
|
|
236 dest_msg->uid);
|
|
237 }
|
|
238 dest_msg->guid = NULL;
|
|
239 } else if (strcmp(src_msg->guid, dest_msg->guid) == 0) {
|
|
240 /* message exists, sync metadata */
|
|
241 dsync_brain_msg_sync_existing(sync->brain, src_msg, dest_msg);
|
|
242 src_msg->guid = NULL;
|
|
243 dest_msg->guid = NULL;
|
|
244 } else {
|
|
245 /* UID conflict */
|
|
246 sync->uid_conflict = TRUE;
|
|
247 conflict = array_append_space(&sync->uid_conflicts);
|
|
248 conflict->mailbox_idx = sync->src_msg_iter->mailbox_idx;
|
|
249 conflict->uid = dest_msg->uid;
|
|
250 dsync_brain_msg_sync_save_source(sync);
|
|
251 src_msg->guid = NULL;
|
|
252 dest_msg->guid = NULL;
|
|
253 }
|
|
254 return 0;
|
|
255 }
|
|
256
|
|
257 static bool
|
|
258 dsync_brain_msg_sync_mailbox_more(struct dsync_brain_mailbox_sync *sync)
|
|
259 {
|
|
260 struct dsync_mailbox *const *boxp;
|
|
261 int ret;
|
|
262
|
|
263 while ((ret = dsync_brain_msg_iter_next_pair(sync)) > 0) {
|
|
264 if (dsync_brain_msg_sync_pair(sync) < 0)
|
|
265 break;
|
|
266 if (dsync_worker_is_output_full(sync->brain->dest_worker))
|
|
267 return FALSE;
|
|
268 }
|
|
269 if (ret == 0)
|
|
270 return FALSE;
|
|
271
|
|
272 /* finished syncing messages in this mailbox that exist in both source
|
|
273 and destination. if there are any messages left in destination,
|
|
274 expunge them if possible and add their GUIDs to hash in any case. */
|
|
275
|
|
276 boxp = array_idx(&sync->brain->src_mailbox_list->mailboxes,
|
|
277 sync->src_msg_iter->wanted_mailbox_idx);
|
|
278 while ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) > 0) {
|
|
279 if (sync->dest_msg_iter->msg.uid >= (*boxp)->uid_next)
|
|
280 sync->uid_conflict = TRUE;
|
|
281 if (!sync->uid_conflict) {
|
|
282 dsync_worker_msg_expunge(sync->brain->dest_worker,
|
|
283 sync->dest_msg_iter->msg.uid);
|
|
284 }
|
|
285
|
|
286 sync->dest_msg_iter->msg.guid = NULL;
|
|
287 }
|
|
288 if (ret == 0)
|
|
289 return FALSE;
|
|
290
|
|
291 /* if there are any messages left in source, we'll copy all of them */
|
|
292 while ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) > 0) {
|
|
293 dsync_brain_msg_sync_save_source(sync);
|
|
294 sync->src_msg_iter->msg.guid = NULL;
|
|
295 }
|
|
296 if (ret == 0)
|
|
297 return FALSE;
|
|
298 /* done with this mailbox. the same iterator is still used for
|
|
299 getting messages from other mailboxes. */
|
|
300 return TRUE;
|
|
301 }
|
|
302
|
|
303 static void dsync_brain_msg_sync_finish(struct dsync_brain_mailbox_sync *sync)
|
|
304 {
|
|
305 /* synced all existing messages. now add the new messages. */
|
|
306 if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 ||
|
|
307 dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter))
|
|
308 dsync_brain_fail(sync->brain);
|
|
309
|
|
310 sync->brain->state++;
|
|
311 dsync_brain_sync(sync->brain);
|
|
312 }
|
|
313
|
|
314 static void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync)
|
|
315 {
|
|
316 struct dsync_mailbox *const *mailboxes;
|
|
317 unsigned int count, mailbox_idx;
|
|
318
|
|
319 mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes,
|
|
320 &count);
|
|
321 while (dsync_brain_msg_sync_mailbox_more(sync)) {
|
|
322 /* sync the next mailbox */
|
|
323 mailbox_idx = ++sync->src_msg_iter->wanted_mailbox_idx;
|
|
324 sync->dest_msg_iter->wanted_mailbox_idx++;
|
|
325 if (mailbox_idx == count) {
|
|
326 dsync_brain_msg_sync_finish(sync);
|
|
327 return;
|
|
328 }
|
|
329 dsync_worker_select_mailbox(sync->brain->dest_worker,
|
|
330 &mailboxes[mailbox_idx]->guid);
|
|
331 }
|
|
332 }
|
|
333
|
|
334 static void dsync_worker_msg_callback(void *context)
|
|
335 {
|
|
336 struct dsync_brain_mailbox_sync *sync = context;
|
|
337
|
|
338 dsync_brain_msg_sync_more(sync);
|
|
339 }
|
|
340
|
|
341 static struct dsync_brain_msg_iter *
|
|
342 dsync_brain_msg_iter_init(struct dsync_brain_mailbox_sync *sync,
|
|
343 struct dsync_worker *worker,
|
|
344 const mailbox_guid_t mailboxes[],
|
|
345 unsigned int mailbox_count)
|
|
346 {
|
|
347 struct dsync_brain_msg_iter *iter;
|
|
348
|
|
349 iter = p_new(sync->pool, struct dsync_brain_msg_iter, 1);
|
|
350 iter->sync = sync;
|
|
351 iter->worker = worker;
|
|
352 iter->iter = dsync_worker_msg_iter_init(worker, mailboxes,
|
|
353 mailbox_count);
|
|
354 dsync_worker_set_input_callback(worker,
|
|
355 dsync_worker_msg_callback, sync);
|
|
356 dsync_worker_set_output_callback(worker,
|
|
357 dsync_worker_msg_callback, sync);
|
|
358 return iter;
|
|
359 }
|
|
360
|
|
361 static struct dsync_brain_mailbox_sync *
|
|
362 dsync_brain_msg_sync_init(struct dsync_brain *brain)
|
|
363 {
|
|
364 struct dsync_brain_mailbox_sync *sync;
|
|
365 ARRAY_DEFINE(guids, mailbox_guid_t);
|
|
366 struct dsync_mailbox *const *mailboxes;
|
|
367 unsigned int i, count;
|
|
368 pool_t pool;
|
|
369
|
|
370 /* initialize message iteration on both workers */
|
|
371 mailboxes = array_get(&brain->src_mailbox_list->mailboxes, &count);
|
|
372 t_array_init(&guids, count);
|
|
373 for (i = 0; i < count; i++)
|
|
374 array_append(&guids, &mailboxes[i]->guid, 1);
|
|
375
|
|
376 pool = pool_alloconly_create("dsync brain mailbox sync", 1024*256);
|
|
377 sync = p_new(pool, struct dsync_brain_mailbox_sync, 1);
|
|
378 sync->pool = pool;
|
|
379 sync->brain = brain;
|
|
380
|
|
381 i_array_init(&sync->uid_conflicts, 128);
|
|
382 i_array_init(&sync->new_msgs, 128);
|
|
383 i_array_init(&sync->copy_retry_indexes, 32);
|
|
384 sync->src_msg_iter =
|
|
385 dsync_brain_msg_iter_init(sync, brain->src_worker,
|
|
386 array_idx(&guids, 0), count);
|
|
387 sync->dest_msg_iter =
|
|
388 dsync_brain_msg_iter_init(sync, brain->dest_worker,
|
|
389 array_idx(&guids, 0), count);
|
|
390
|
|
391 sync->guid_hash = hash_table_create(default_pool, pool, 10000,
|
|
392 strcase_hash,
|
|
393 (hash_cmp_callback_t *)strcasecmp);
|
|
394 sync->dest_msg_iter->save_guids = TRUE;
|
|
395 return sync;
|
|
396 }
|
|
397
|
|
398 static void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync)
|
|
399 {
|
|
400 struct dsync_brain_mailbox_sync *sync = *_sync;
|
|
401
|
|
402 *_sync = NULL;
|
|
403
|
|
404 if (sync->src_msg_iter->iter != NULL)
|
|
405 (void)dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter);
|
|
406 if (sync->dest_msg_iter->iter != NULL)
|
|
407 (void)dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter);
|
|
408
|
|
409 hash_table_destroy(&sync->guid_hash);
|
|
410 array_free(&sync->uid_conflicts);
|
|
411 array_free(&sync->new_msgs);
|
|
412 array_free(&sync->copy_retry_indexes);
|
|
413 pool_unref(&sync->pool);
|
|
414 }
|
|
415
|
|
416 static void dsync_brain_sync_existing_mailboxes(struct dsync_brain *brain)
|
|
417 {
|
|
418 brain->mailbox_sync = dsync_brain_msg_sync_init(brain);
|
|
419 dsync_brain_msg_sync_more(brain->mailbox_sync);
|
|
420 }
|
|
421
|
|
422 static int
|
|
423 dsync_brain_msg_sync_add_new_msg(struct dsync_brain_mailbox_sync *sync,
|
|
424 const struct dsync_mailbox *src_mailbox,
|
|
425 unsigned int msg_idx,
|
|
426 const struct dsync_message *msg)
|
|
427 {
|
|
428 const struct dsync_brain_guid_instance *inst;
|
|
429 struct dsync_mailbox *const *inst_box;
|
|
430 struct dsync_msg_static_data data;
|
|
431 int ret;
|
|
432
|
|
433 inst = hash_table_lookup(sync->guid_hash, msg->guid);
|
|
434 if (inst != NULL) {
|
|
435 /* we can save this by copying an existing message */
|
|
436 dsync_worker_select_mailbox(sync->brain->dest_worker,
|
|
437 &src_mailbox->guid);
|
|
438 dsync_worker_set_next_result_tag(sync->brain->dest_worker,
|
|
439 msg_idx+1);
|
|
440 inst_box = array_idx(&sync->brain->src_mailbox_list->mailboxes,
|
|
441 inst->mailbox_idx);
|
|
442 dsync_worker_msg_copy(sync->brain->dest_worker,
|
|
443 &(*inst_box)->guid, inst->uid, msg);
|
|
444 sync->copy_results_left++;
|
|
445 } else {
|
|
446 dsync_worker_select_mailbox(sync->brain->src_worker,
|
|
447 &src_mailbox->guid);
|
|
448 ret = dsync_worker_msg_get(sync->brain->src_worker,
|
|
449 msg->uid, &data);
|
|
450 if (ret <= 0) {
|
|
451 if (ret == 0) {
|
|
452 /* mail got expunged during sync.
|
|
453 just skip this. */
|
|
454 return 1;
|
|
455 } else {
|
|
456 dsync_brain_fail(sync->brain);
|
|
457 return -1;
|
|
458 }
|
|
459 }
|
|
460 dsync_worker_select_mailbox(sync->brain->dest_worker,
|
|
461 &src_mailbox->guid);
|
|
462 dsync_worker_msg_save(sync->brain->dest_worker, msg, &data);
|
|
463 }
|
|
464 return dsync_worker_is_output_full(sync->brain->dest_worker) ? 0 : 1;
|
|
465 }
|
|
466
|
|
467 static void
|
|
468 dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_mailbox_sync *sync)
|
|
469 {
|
|
470 struct dsync_mailbox *const *mailboxes, *mailbox;
|
|
471 const struct dsync_brain_new_msg *msgs;
|
|
472 unsigned int i, mailbox_count, msg_count;
|
|
473
|
|
474 mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes,
|
|
475 &mailbox_count);
|
|
476 msgs = array_get(&sync->new_msgs, &msg_count);
|
|
477 for (i = sync->next_new_msg; i < msg_count; i++) {
|
|
478 mailbox = mailboxes[msgs[i].mailbox_idx];
|
|
479 if (dsync_brain_msg_sync_add_new_msg(sync, mailbox, i,
|
|
480 msgs[i].msg) <= 0) {
|
|
481 /* failed / continue later */
|
|
482 sync->next_new_msg = i + 1;
|
|
483 return;
|
|
484 }
|
|
485 }
|
|
486
|
|
487 /* all messages sent */
|
|
488 if (sync->copy_results_left == 0) {
|
|
489 sync->brain->state++;
|
|
490 dsync_brain_sync(sync->brain);
|
|
491 }
|
|
492 }
|
|
493
|
|
494 static void dsync_worker_copy_input(void *context)
|
|
495 {
|
|
496 struct dsync_brain_mailbox_sync *sync = context;
|
|
497 struct dsync_brain_guid_instance *inst;
|
|
498 const struct dsync_brain_new_msg *msgs;
|
|
499 unsigned int count;
|
|
500 uint32_t tag;
|
|
501 int result;
|
|
502
|
|
503 msgs = array_get(&sync->new_msgs, &count);
|
|
504 while (dsync_worker_get_next_result(sync->brain->dest_worker,
|
|
505 &tag, &result)) {
|
|
506 if (tag == 0 || tag > count) {
|
|
507 i_error("Worker sent result with invalid tag %u", tag);
|
|
508 dsync_brain_fail(sync->brain);
|
|
509 return;
|
|
510 }
|
|
511 tag--;
|
|
512 if (sync->copy_results_left == 0) {
|
|
513 i_error("Worker sent unexpected result");
|
|
514 dsync_brain_fail(sync->brain);
|
|
515 return;
|
|
516 }
|
|
517 sync->copy_results_left--;
|
|
518 if (result < 0) {
|
|
519 /* mark the guid instance invalid and try again later */
|
|
520 inst = hash_table_lookup(sync->guid_hash,
|
|
521 msgs[tag].msg->guid);
|
|
522 inst->failed = TRUE;
|
|
523 array_append(&sync->copy_retry_indexes, &tag, 1);
|
|
524 }
|
|
525 }
|
|
526 if (sync->copy_results_left == 0) {
|
|
527 sync->brain->state++;
|
|
528 dsync_brain_sync(sync->brain);
|
|
529 }
|
|
530 }
|
|
531
|
|
532 static void dsync_worker_new_msg_output(void *context)
|
|
533 {
|
|
534 struct dsync_brain_mailbox_sync *sync = context;
|
|
535
|
|
536 dsync_brain_msg_sync_add_new_msgs(sync);
|
|
537 }
|
|
538
|
|
539 static void
|
|
540 dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync)
|
|
541 {
|
|
542 dsync_worker_set_input_callback(sync->brain->dest_worker,
|
|
543 dsync_worker_copy_input, sync);
|
|
544 dsync_worker_set_output_callback(sync->brain->dest_worker,
|
|
545 dsync_worker_new_msg_output, sync);
|
|
546 dsync_brain_msg_sync_add_new_msgs(sync);
|
|
547 }
|
|
548
|
|
549 static void
|
|
550 dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync)
|
|
551 {
|
|
552 const uint32_t *indexes;
|
|
553 struct dsync_mailbox *const *mailboxes, *mailbox;
|
|
554 const struct dsync_brain_new_msg *msgs;
|
|
555 unsigned int i, msg_idx, idx_count, msg_count, mailbox_count;
|
|
556 struct dsync_brain_guid_instance *inst;
|
|
557 const char *guid_str;
|
|
558 void *orig_key, *orig_value;
|
|
559
|
|
560 /* first remove GUID instances that had failed. */
|
|
561 msgs = array_get(&sync->new_msgs, &msg_count);
|
|
562 indexes = array_get(&sync->copy_retry_indexes, &idx_count);
|
|
563 for (i = 0; i < idx_count; i++) {
|
|
564 guid_str = msgs[indexes[i]].msg->guid;
|
|
565 if (hash_table_lookup_full(sync->guid_hash, guid_str,
|
|
566 &orig_key, &orig_value))
|
|
567 inst = orig_value;
|
|
568 else
|
|
569 inst = NULL;
|
|
570 if (inst != NULL && inst->failed) {
|
|
571 inst = inst->next;
|
|
572 if (inst == NULL)
|
|
573 hash_table_remove(sync->guid_hash, guid_str);
|
|
574 else {
|
|
575 hash_table_update(sync->guid_hash, orig_key,
|
|
576 inst);
|
|
577 }
|
|
578 }
|
|
579 }
|
|
580
|
|
581 /* try saving again. there probably weren't many of them, so don't
|
|
582 worry about filling output buffer. */
|
|
583 mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes,
|
|
584 &mailbox_count);
|
|
585 for (i = 0; i < idx_count; i++) {
|
|
586 msg_idx = indexes[i];
|
|
587 mailbox = mailboxes[msgs[msg_idx].mailbox_idx];
|
|
588 (void)dsync_brain_msg_sync_add_new_msg(sync, mailbox, msg_idx,
|
|
589 msgs[msg_idx].msg);
|
|
590 }
|
|
591
|
|
592 /* if we copied anything, we'll again have to wait for the results */
|
|
593 array_clear(&sync->copy_retry_indexes);
|
|
594 dsync_worker_set_output_callback(sync->brain->dest_worker, NULL, NULL);
|
|
595
|
|
596 if (sync->copy_results_left == 0) {
|
|
597 dsync_worker_set_input_callback(sync->brain->dest_worker,
|
|
598 NULL, NULL);
|
|
599 sync->brain->state++;
|
|
600 dsync_brain_sync(sync->brain);
|
|
601 } else {
|
|
602 /* temporarily move back the state. once copies have returned
|
|
603 success/failures, we'll get back to this function and see
|
|
604 if we need to retry again */
|
|
605 sync->brain->state--;
|
|
606 }
|
|
607 }
|
|
608
|
|
609 static void
|
|
610 dsync_brain_msg_sync_update_mailbox(struct dsync_brain *brain)
|
|
611 {
|
|
612 struct dsync_mailbox *const *mailboxes;
|
|
613 unsigned int i, count;
|
|
614
|
|
615 mailboxes = array_get(&brain->src_mailbox_list->mailboxes, &count);
|
|
616 for (i = 0; i < count; i++)
|
|
617 dsync_worker_update_mailbox(brain->dest_worker, mailboxes[i]);
|
|
618 }
|
|
619
|
|
620 static void
|
|
621 dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync)
|
|
622 {
|
|
623 const struct dsync_brain_uid_conflict *conflicts;
|
|
624 struct dsync_mailbox *const *mailboxes, *mailbox;
|
|
625 unsigned int i, count, mailbox_count;
|
|
626
|
|
627 mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes,
|
|
628 &mailbox_count);
|
|
629 conflicts = array_get(&sync->uid_conflicts, &count);
|
|
630 for (i = 0; i < count; i++) {
|
|
631 mailbox = mailboxes[conflicts[i].mailbox_idx];
|
|
632 dsync_worker_select_mailbox(sync->brain->dest_worker,
|
|
633 &mailbox->guid);
|
|
634 dsync_worker_msg_update_uid(sync->brain->dest_worker,
|
|
635 conflicts[i].uid);
|
|
636 }
|
|
637 }
|
|
638
|
|
639 static void dsync_worker_flush_callback(void *context)
|
|
640 {
|
|
641 struct dsync_brain *brain = context;
|
|
642 int ret;
|
|
643
|
|
644 if ((ret = dsync_worker_output_flush(brain->dest_worker)) <= 0) {
|
|
645 if (ret < 0)
|
|
646 dsync_brain_fail(brain);
|
|
647 return;
|
|
648 }
|
|
649 brain->state++;
|
|
650 dsync_brain_sync(brain);
|
|
651 }
|
|
652
|
|
653 void dsync_brain_sync(struct dsync_brain *brain)
|
|
654 {
|
|
655 switch (brain->state) {
|
|
656 case DSYNC_STATE_GET_MAILBOXES:
|
|
657 i_assert(brain->src_mailbox_list == NULL);
|
|
658 brain->src_mailbox_list =
|
|
659 dsync_brain_mailbox_list_init(brain, brain->src_worker);
|
|
660 brain->dest_mailbox_list =
|
|
661 dsync_brain_mailbox_list_init(brain, brain->dest_worker);
|
|
662 dsync_worker_mailbox_input(brain->src_mailbox_list);
|
|
663 dsync_worker_mailbox_input(brain->dest_mailbox_list);
|
|
664 break;
|
|
665 case DSYNC_STATE_CREATE_MAILBOXES:
|
|
666 if (array_count(&brain->src_mailbox_list->mailboxes) == 0) {
|
|
667 /* no mailboxes */
|
|
668 i_error("No source mailboxes");
|
|
669 dsync_brain_fail(brain);
|
|
670 }
|
|
671
|
|
672 /* FIXME: maybe wait and verify that all mailboxes are
|
|
673 created successfully? */
|
|
674 dsync_brain_create_missing_mailboxes(brain);
|
|
675 brain->state++;
|
|
676 /* fall through */
|
|
677 case DSYNC_STATE_SYNC_EXISTING_MSGS:
|
|
678 dsync_brain_sync_existing_mailboxes(brain);
|
|
679 break;
|
|
680 case DSYNC_STATE_SYNC_NEW_MSGS:
|
|
681 dsync_brain_msg_sync_new_msgs(brain->mailbox_sync);
|
|
682 break;
|
|
683 case DSYNC_STATE_SYNC_RETRY_COPIES:
|
|
684 dsync_brain_msg_sync_retry_copies(brain->mailbox_sync);
|
|
685 break;
|
|
686 case DSYNC_STATE_SYNC_UPDATE_MAILBOX:
|
|
687 dsync_brain_msg_sync_update_mailbox(brain);
|
|
688 brain->state++;
|
|
689 /* fall through */
|
|
690 case DSYNC_STATE_SYNC_RESOLVE_UID_CONFLICTS:
|
|
691 /* resolve uid conflicts after uid_nexts have been updated,
|
|
692 so that it won't again collide uids */
|
|
693 dsync_brain_msg_sync_resolve_uid_conflicts(brain->mailbox_sync);
|
|
694 brain->state++;
|
|
695 /* fall through */
|
|
696 case DSYNC_STATE_SYNC_FLUSH:
|
|
697 dsync_worker_set_output_callback(brain->dest_worker,
|
|
698 dsync_worker_flush_callback,
|
|
699 brain);
|
|
700 dsync_worker_flush_callback(brain);
|
|
701 break;
|
|
702 case DSYNC_STATE_SYNC_END:
|
|
703 master_service_stop(master_service);
|
|
704 break;
|
|
705 }
|
|
706 }
|