From 3e54e0b9eaddf422f0bab8ef40d6b406a6d3cdaf Mon Sep 17 00:00:00 2001 From: emersion Date: Sat, 13 Jan 2018 11:35:10 +0100 Subject: [PATCH] imap: poll events after mutating the mailbox --- events/events.go | 42 +++++++++++++++++++++++++++-------------- imap/mailbox.go | 26 +++++++++++++++++++------ imap/user.go | 49 +++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 88 insertions(+), 29 deletions(-) diff --git a/events/events.go b/events/events.go index 3b83f88..9a1505e 100644 --- a/events/events.go +++ b/events/events.go @@ -10,25 +10,26 @@ import ( const pollInterval = 30 * time.Second -type receiver struct { - channels []chan<- *protonmail.Event +type Receiver struct { + c *protonmail.Client + locker sync.Mutex + channels []chan<- *protonmail.Event + + poll chan struct{} } -func (r *receiver) receiveEvents(c *protonmail.Client, last string) { +func (r *Receiver) receiveEvents() { t := time.NewTicker(pollInterval) defer t.Stop() - for range t.C { - event, err := c.GetEvent(last) + var last string + for { + event, err := r.c.GetEvent(last) if err != nil { log.Println("cannot receive event:", err) continue } - - if event.ID == last { - continue - } last = event.ID r.locker.Lock() @@ -41,21 +42,30 @@ func (r *receiver) receiveEvents(c *protonmail.Client, last string) { if n == 0 { break } + + select { + case <-t.C: + case <-r.poll: + } } } +func (r *Receiver) Poll() { + r.poll <- struct{}{} +} + type Manager struct { - receivers map[string]*receiver + receivers map[string]*Receiver locker sync.Mutex } func NewManager() *Manager { return &Manager{ - receivers: make(map[string]*receiver), + receivers: make(map[string]*Receiver), } } -func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) { +func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) *Receiver { m.locker.Lock() defer m.locker.Unlock() @@ -65,12 +75,14 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro r.channels = append(r.channels, ch) r.locker.Unlock() } else { - r = &receiver{ + r = &Receiver{ + c: c, channels: []chan<- *protonmail.Event{ch}, + poll: make(chan struct{}), } go func() { - r.receiveEvents(c, "") + r.receiveEvents() m.locker.Lock() delete(m.receivers, username) @@ -95,4 +107,6 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro close(ch) }() } + + return r } diff --git a/imap/mailbox.go b/imap/mailbox.go index 980bda1..6a4c3df 100644 --- a/imap/mailbox.go +++ b/imap/mailbox.go @@ -82,6 +82,11 @@ func (mbox *mailbox) Check() error { func (mbox *mailbox) sync() error { log.Printf("Synchronizing mailbox %v...", mbox.name) + // TODO: don't do this without incrementing UIDVALIDITY + if err := mbox.db.Reset(); err != nil { + return err + } + filter := &protonmail.MessageFilter{ PageSize: 150, Label: mbox.label, @@ -362,7 +367,11 @@ func (mbox *mailbox) CreateMessage(flags []string, date time.Time, body imap.Lit } _, err := createMessage(mbox.u.c, mbox.u.u, mbox.u.privateKeys, body) - return err + if err != nil { + return err + } + + return mbox.Poll() } func (mbox *mailbox) fromSeqSet(isUID bool, seqSet *imap.SeqSet) ([]string, error) { @@ -430,7 +439,7 @@ func (mbox *mailbox) UpdateMessagesFlags(uid bool, seqSet *imap.SeqSet, op imap. } } - return nil + return mbox.Poll() } func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string) error { @@ -443,7 +452,7 @@ func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string return err } - dest := mbox.u.getMailboxByLabel(destName) + dest := mbox.u.getMailbox(destName) if dest == nil { return imapbackend.ErrNoSuchMailbox } @@ -451,7 +460,7 @@ func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string if err := mbox.u.c.LabelMessages(dest.label, apiIDs); err != nil { return err } - return nil + return mbox.Poll() } func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string) error { @@ -464,7 +473,7 @@ func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string return err } - dest := mbox.u.getMailboxByLabel(destName) + dest := mbox.u.getMailbox(destName) if dest == nil { return imapbackend.ErrNoSuchMailbox } @@ -475,7 +484,7 @@ func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string if err := mbox.u.c.UnlabelMessages(mbox.label, apiIDs); err != nil { return err } - return nil + return mbox.Poll() } func (mbox *mailbox) Expunge() error { @@ -492,5 +501,10 @@ func (mbox *mailbox) Expunge() error { return err } + return mbox.Poll() +} + +func (mbox *mailbox) Poll() error { + mbox.u.poll() return nil } diff --git a/imap/user.go b/imap/user.go index 13cc729..fcb7f4b 100644 --- a/imap/user.go +++ b/imap/user.go @@ -9,6 +9,7 @@ import ( imapbackend "github.com/emersion/go-imap/backend" "github.com/emersion/go-imap-specialuse" + "github.com/emersion/hydroxide/events" "github.com/emersion/hydroxide/imap/database" "github.com/emersion/hydroxide/protonmail" ) @@ -34,11 +35,13 @@ type user struct { privateKeys openpgp.EntityList db *database.User + eventsReceiver *events.Receiver locker sync.Mutex mailboxes map[string]*mailbox done chan<- struct{} + eventSent chan struct{} } func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) { @@ -46,6 +49,7 @@ func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys c: c, u: u, privateKeys: privateKeys, + eventSent: make(chan struct{}), } db, err := database.Open(u.Name+".db") @@ -62,7 +66,7 @@ func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys uu.done = done ch := make(chan *protonmail.Event) go uu.receiveEvents(be.updates, ch) - be.eventsManager.Register(c, u.Name, ch, done) + uu.eventsReceiver = be.eventsManager.Register(c, u.Name, ch, done) return uu, nil } @@ -125,16 +129,24 @@ func (u *user) getMailboxByLabel(labelID string) *mailbox { return u.mailboxes[labelID] } -func (u *user) GetMailbox(name string) (imapbackend.Mailbox, error) { +func (u *user) getMailbox(name string) *mailbox { u.locker.Lock() defer u.locker.Unlock() for _, mbox := range u.mailboxes { if mbox.name == name { - return mbox, nil + return mbox } } - return nil, imapbackend.ErrNoSuchMailbox + return nil +} + +func (u *user) GetMailbox(name string) (imapbackend.Mailbox, error) { + mbox := u.getMailbox(name) + if mbox == nil { + return nil, imapbackend.ErrNoSuchMailbox + } + return mbox, nil } func (u *user) CreateMailbox(name string) error { @@ -162,8 +174,15 @@ func (u *user) Logout() error { return nil } +func (u *user) poll() { + go u.eventsReceiver.Poll() + <-u.eventSent +} + func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonmail.Event) { for event := range events { + var eventUpdates []interface{} + if event.Refresh&protonmail.EventRefreshMail != 0 { log.Println("Reinitializing the whole IMAP database") @@ -201,7 +220,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma update.Mailbox = mbox.name update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages}) update.MailboxStatus.Messages = seqNum - updates <- update + eventUpdates = append(eventUpdates, update) } } case protonmail.EventUpdate, protonmail.EventUpdateFlags: @@ -219,7 +238,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma update.Mailbox = mbox.name update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages}) update.MailboxStatus.Messages = seqNum - updates <- update + eventUpdates = append(eventUpdates, update) } } for labelID, seqNum := range deletedSeqNums { @@ -228,7 +247,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma update.Username = u.u.Name update.Mailbox = mbox.name update.SeqNum = seqNum - updates <- update + eventUpdates = append(eventUpdates, update) } } @@ -257,7 +276,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma update.Mailbox = mbox.name update.Message = imap.NewMessage(seqNum, []imap.FetchItem{imap.FetchFlags}) update.Message.Flags = fetchFlags(msg) - updates <- update + eventUpdates = append(eventUpdates, update) } } case protonmail.EventDelete: @@ -274,7 +293,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma update.Username = u.u.Name update.Mailbox = mbox.name update.SeqNum = seqNum - updates <- update + eventUpdates = append(eventUpdates, update) } } } @@ -289,5 +308,17 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma } u.locker.Unlock() } + + done := imapbackend.WaitUpdates(eventUpdates...) + for _, update := range eventUpdates { + updates <- update + } + go func() { + <-done + select { + case u.eventSent <- struct{}{}: + default: + } + }() } }