imap: poll events after mutating the mailbox

This commit is contained in:
emersion 2018-01-13 11:35:10 +01:00
parent 7db5627275
commit 3e54e0b9ea
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
3 changed files with 88 additions and 29 deletions

View File

@ -10,25 +10,26 @@ import (
const pollInterval = 30 * time.Second const pollInterval = 30 * time.Second
type receiver struct { type Receiver struct {
channels []chan<- *protonmail.Event c *protonmail.Client
locker sync.Mutex locker sync.Mutex
channels []chan<- *protonmail.Event
poll chan struct{}
} }
func (r *receiver) receiveEvents(c *protonmail.Client, last string) { func (r *Receiver) receiveEvents() {
t := time.NewTicker(pollInterval) t := time.NewTicker(pollInterval)
defer t.Stop() defer t.Stop()
for range t.C { var last string
event, err := c.GetEvent(last) for {
event, err := r.c.GetEvent(last)
if err != nil { if err != nil {
log.Println("cannot receive event:", err) log.Println("cannot receive event:", err)
continue continue
} }
if event.ID == last {
continue
}
last = event.ID last = event.ID
r.locker.Lock() r.locker.Lock()
@ -41,21 +42,30 @@ func (r *receiver) receiveEvents(c *protonmail.Client, last string) {
if n == 0 { if n == 0 {
break break
} }
select {
case <-t.C:
case <-r.poll:
}
} }
} }
func (r *Receiver) Poll() {
r.poll <- struct{}{}
}
type Manager struct { type Manager struct {
receivers map[string]*receiver receivers map[string]*Receiver
locker sync.Mutex locker sync.Mutex
} }
func NewManager() *Manager { func NewManager() *Manager {
return &Manager{ return &Manager{
receivers: make(map[string]*receiver), receivers: make(map[string]*Receiver),
} }
} }
func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) { func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) *Receiver {
m.locker.Lock() m.locker.Lock()
defer m.locker.Unlock() defer m.locker.Unlock()
@ -65,12 +75,14 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro
r.channels = append(r.channels, ch) r.channels = append(r.channels, ch)
r.locker.Unlock() r.locker.Unlock()
} else { } else {
r = &receiver{ r = &Receiver{
c: c,
channels: []chan<- *protonmail.Event{ch}, channels: []chan<- *protonmail.Event{ch},
poll: make(chan struct{}),
} }
go func() { go func() {
r.receiveEvents(c, "") r.receiveEvents()
m.locker.Lock() m.locker.Lock()
delete(m.receivers, username) delete(m.receivers, username)
@ -95,4 +107,6 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro
close(ch) close(ch)
}() }()
} }
return r
} }

View File

@ -82,6 +82,11 @@ func (mbox *mailbox) Check() error {
func (mbox *mailbox) sync() error { func (mbox *mailbox) sync() error {
log.Printf("Synchronizing mailbox %v...", mbox.name) log.Printf("Synchronizing mailbox %v...", mbox.name)
// TODO: don't do this without incrementing UIDVALIDITY
if err := mbox.db.Reset(); err != nil {
return err
}
filter := &protonmail.MessageFilter{ filter := &protonmail.MessageFilter{
PageSize: 150, PageSize: 150,
Label: mbox.label, Label: mbox.label,
@ -362,7 +367,11 @@ func (mbox *mailbox) CreateMessage(flags []string, date time.Time, body imap.Lit
} }
_, err := createMessage(mbox.u.c, mbox.u.u, mbox.u.privateKeys, body) _, err := createMessage(mbox.u.c, mbox.u.u, mbox.u.privateKeys, body)
return err if err != nil {
return err
}
return mbox.Poll()
} }
func (mbox *mailbox) fromSeqSet(isUID bool, seqSet *imap.SeqSet) ([]string, error) { func (mbox *mailbox) fromSeqSet(isUID bool, seqSet *imap.SeqSet) ([]string, error) {
@ -430,7 +439,7 @@ func (mbox *mailbox) UpdateMessagesFlags(uid bool, seqSet *imap.SeqSet, op imap.
} }
} }
return nil return mbox.Poll()
} }
func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string) error { func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string) error {
@ -443,7 +452,7 @@ func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string
return err return err
} }
dest := mbox.u.getMailboxByLabel(destName) dest := mbox.u.getMailbox(destName)
if dest == nil { if dest == nil {
return imapbackend.ErrNoSuchMailbox return imapbackend.ErrNoSuchMailbox
} }
@ -451,7 +460,7 @@ func (mbox *mailbox) CopyMessages(uid bool, seqSet *imap.SeqSet, destName string
if err := mbox.u.c.LabelMessages(dest.label, apiIDs); err != nil { if err := mbox.u.c.LabelMessages(dest.label, apiIDs); err != nil {
return err return err
} }
return nil return mbox.Poll()
} }
func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string) error { func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string) error {
@ -464,7 +473,7 @@ func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string
return err return err
} }
dest := mbox.u.getMailboxByLabel(destName) dest := mbox.u.getMailbox(destName)
if dest == nil { if dest == nil {
return imapbackend.ErrNoSuchMailbox return imapbackend.ErrNoSuchMailbox
} }
@ -475,7 +484,7 @@ func (mbox *mailbox) MoveMessages(uid bool, seqSet *imap.SeqSet, destName string
if err := mbox.u.c.UnlabelMessages(mbox.label, apiIDs); err != nil { if err := mbox.u.c.UnlabelMessages(mbox.label, apiIDs); err != nil {
return err return err
} }
return nil return mbox.Poll()
} }
func (mbox *mailbox) Expunge() error { func (mbox *mailbox) Expunge() error {
@ -492,5 +501,10 @@ func (mbox *mailbox) Expunge() error {
return err return err
} }
return mbox.Poll()
}
func (mbox *mailbox) Poll() error {
mbox.u.poll()
return nil return nil
} }

View File

@ -9,6 +9,7 @@ import (
imapbackend "github.com/emersion/go-imap/backend" imapbackend "github.com/emersion/go-imap/backend"
"github.com/emersion/go-imap-specialuse" "github.com/emersion/go-imap-specialuse"
"github.com/emersion/hydroxide/events"
"github.com/emersion/hydroxide/imap/database" "github.com/emersion/hydroxide/imap/database"
"github.com/emersion/hydroxide/protonmail" "github.com/emersion/hydroxide/protonmail"
) )
@ -34,11 +35,13 @@ type user struct {
privateKeys openpgp.EntityList privateKeys openpgp.EntityList
db *database.User db *database.User
eventsReceiver *events.Receiver
locker sync.Mutex locker sync.Mutex
mailboxes map[string]*mailbox mailboxes map[string]*mailbox
done chan<- struct{} done chan<- struct{}
eventSent chan struct{}
} }
func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) { func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) {
@ -46,6 +49,7 @@ func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys
c: c, c: c,
u: u, u: u,
privateKeys: privateKeys, privateKeys: privateKeys,
eventSent: make(chan struct{}),
} }
db, err := database.Open(u.Name+".db") db, err := database.Open(u.Name+".db")
@ -62,7 +66,7 @@ func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys
uu.done = done uu.done = done
ch := make(chan *protonmail.Event) ch := make(chan *protonmail.Event)
go uu.receiveEvents(be.updates, ch) go uu.receiveEvents(be.updates, ch)
be.eventsManager.Register(c, u.Name, ch, done) uu.eventsReceiver = be.eventsManager.Register(c, u.Name, ch, done)
return uu, nil return uu, nil
} }
@ -125,16 +129,24 @@ func (u *user) getMailboxByLabel(labelID string) *mailbox {
return u.mailboxes[labelID] return u.mailboxes[labelID]
} }
func (u *user) GetMailbox(name string) (imapbackend.Mailbox, error) { func (u *user) getMailbox(name string) *mailbox {
u.locker.Lock() u.locker.Lock()
defer u.locker.Unlock() defer u.locker.Unlock()
for _, mbox := range u.mailboxes { for _, mbox := range u.mailboxes {
if mbox.name == name { if mbox.name == name {
return mbox, nil return mbox
} }
} }
return nil, imapbackend.ErrNoSuchMailbox return nil
}
func (u *user) GetMailbox(name string) (imapbackend.Mailbox, error) {
mbox := u.getMailbox(name)
if mbox == nil {
return nil, imapbackend.ErrNoSuchMailbox
}
return mbox, nil
} }
func (u *user) CreateMailbox(name string) error { func (u *user) CreateMailbox(name string) error {
@ -162,8 +174,15 @@ func (u *user) Logout() error {
return nil return nil
} }
func (u *user) poll() {
go u.eventsReceiver.Poll()
<-u.eventSent
}
func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonmail.Event) { func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonmail.Event) {
for event := range events { for event := range events {
var eventUpdates []interface{}
if event.Refresh&protonmail.EventRefreshMail != 0 { if event.Refresh&protonmail.EventRefreshMail != 0 {
log.Println("Reinitializing the whole IMAP database") log.Println("Reinitializing the whole IMAP database")
@ -201,7 +220,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
update.Mailbox = mbox.name update.Mailbox = mbox.name
update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages}) update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages})
update.MailboxStatus.Messages = seqNum update.MailboxStatus.Messages = seqNum
updates <- update eventUpdates = append(eventUpdates, update)
} }
} }
case protonmail.EventUpdate, protonmail.EventUpdateFlags: case protonmail.EventUpdate, protonmail.EventUpdateFlags:
@ -219,7 +238,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
update.Mailbox = mbox.name update.Mailbox = mbox.name
update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages}) update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages})
update.MailboxStatus.Messages = seqNum update.MailboxStatus.Messages = seqNum
updates <- update eventUpdates = append(eventUpdates, update)
} }
} }
for labelID, seqNum := range deletedSeqNums { for labelID, seqNum := range deletedSeqNums {
@ -228,7 +247,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
update.Username = u.u.Name update.Username = u.u.Name
update.Mailbox = mbox.name update.Mailbox = mbox.name
update.SeqNum = seqNum update.SeqNum = seqNum
updates <- update eventUpdates = append(eventUpdates, update)
} }
} }
@ -257,7 +276,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
update.Mailbox = mbox.name update.Mailbox = mbox.name
update.Message = imap.NewMessage(seqNum, []imap.FetchItem{imap.FetchFlags}) update.Message = imap.NewMessage(seqNum, []imap.FetchItem{imap.FetchFlags})
update.Message.Flags = fetchFlags(msg) update.Message.Flags = fetchFlags(msg)
updates <- update eventUpdates = append(eventUpdates, update)
} }
} }
case protonmail.EventDelete: case protonmail.EventDelete:
@ -274,7 +293,7 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
update.Username = u.u.Name update.Username = u.u.Name
update.Mailbox = mbox.name update.Mailbox = mbox.name
update.SeqNum = seqNum update.SeqNum = seqNum
updates <- update eventUpdates = append(eventUpdates, update)
} }
} }
} }
@ -289,5 +308,17 @@ func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonma
} }
u.locker.Unlock() u.locker.Unlock()
} }
done := imapbackend.WaitUpdates(eventUpdates...)
for _, update := range eventUpdates {
updates <- update
}
go func() {
<-done
select {
case u.eventSent <- struct{}{}:
default:
}
}()
} }
} }