From f9cb9383e45043f664fdf3691e651019d16ef703 Mon Sep 17 00:00:00 2001 From: emersion Date: Fri, 12 Jan 2018 13:20:17 +0100 Subject: [PATCH] imap: listen for events --- cmd/hydroxide/hydroxide.go | 11 ++++++----- events/events.go | 38 ++++++++++++++++++++++++++++++++++---- imap/database/user.go | 8 ++++---- imap/imap.go | 8 +++++--- imap/mailbox.go | 4 +++- imap/user.go | 21 +++++++++++++++------ protonmail/events.go | 8 ++------ 7 files changed, 69 insertions(+), 29 deletions(-) diff --git a/cmd/hydroxide/hydroxide.go b/cmd/hydroxide/hydroxide.go index 8a25b75..ecbd865 100644 --- a/cmd/hydroxide/hydroxide.go +++ b/cmd/hydroxide/hydroxide.go @@ -32,10 +32,11 @@ func newClient() *protonmail.Client { } } -func receiveEvents(c *protonmail.Client, last string, ch chan<- *protonmail.Event) { +func receiveEvents(c *protonmail.Client, ch chan<- *protonmail.Event) { t := time.NewTicker(time.Minute) defer t.Stop() + var last string for range t.C { event, err := c.GetEvent(last) if err != nil { @@ -160,8 +161,9 @@ func main() { } sessions := auth.NewManager(newClient) + eventsManager := events.NewManager() - be := imapbackend.New(sessions) + be := imapbackend.New(sessions, eventsManager) s := imapserver.New(be) s.Addr = "127.0.0.1:" + port s.AllowInsecureAuth = true // TODO: remove this @@ -176,9 +178,8 @@ func main() { port = "8080" } - eventsManager := events.NewManager() - sessions := auth.NewManager(newClient) + eventsManager := events.NewManager() handlers := make(map[string]http.Handler) s := &http.Server{ @@ -207,7 +208,7 @@ func main() { h, ok := handlers[username] if !ok { ch := make(chan *protonmail.Event) - eventsManager.Register(c, username, ch) + eventsManager.Register(c, username, ch, nil) h = carddav.NewHandler(c, privateKeys, ch) handlers[username] = h diff --git a/events/events.go b/events/events.go index 6f513ff..3b83f88 100644 --- a/events/events.go +++ b/events/events.go @@ -8,7 +8,7 @@ import ( "github.com/emersion/hydroxide/protonmail" ) -const pollInterval = time.Minute +const pollInterval = 30 * time.Second type receiver struct { channels []chan<- *protonmail.Event @@ -32,10 +32,15 @@ func (r *receiver) receiveEvents(c *protonmail.Client, last string) { last = event.ID r.locker.Lock() + n := len(r.channels) for _, ch := range r.channels { ch <- event } r.locker.Unlock() + + if n == 0 { + break + } } } @@ -50,11 +55,12 @@ func NewManager() *Manager { } } -func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event) { +func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) { m.locker.Lock() defer m.locker.Unlock() - if r, ok := m.receivers[username]; ok { + r, ok := m.receivers[username] + if ok { r.locker.Lock() r.channels = append(r.channels, ch) r.locker.Unlock() @@ -62,7 +68,31 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro r = &receiver{ channels: []chan<- *protonmail.Event{ch}, } - go r.receiveEvents(c, "") + + go func() { + r.receiveEvents(c, "") + + m.locker.Lock() + delete(m.receivers, username) + m.locker.Unlock() + }() + m.receivers[username] = r } + + if done != nil { + go func() { + <-done + + r.locker.Lock() + for i, c := range r.channels { + if c == ch { + r.channels = append(r.channels[:i], r.channels[i+1:]...) + } + } + r.locker.Unlock() + + close(ch) + }() + } } diff --git a/imap/database/user.go b/imap/database/user.go index 66fd508..7526173 100644 --- a/imap/database/user.go +++ b/imap/database/user.go @@ -123,14 +123,14 @@ func (u *User) CreateMessage(msg *protonmail.Message) error { }) } -func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error { +func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate) error { return u.db.Update(func(tx *bolt.Tx) error { messages := tx.Bucket(messagesBucket) if messages == nil { return errors.New("cannot update message in local DB: messages bucket doesn't exist") } - msg, err := userMessage(messages, update.ID) + msg, err := userMessage(messages, apiID) if err != nil { return err } @@ -147,7 +147,7 @@ func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error { return err } - if err := mailboxCreateMessage(mbox, update.ID); err != nil { + if err := mailboxCreateMessage(mbox, apiID); err != nil { return err } } @@ -157,7 +157,7 @@ func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error { continue } - if err := mailboxDeleteMessage(mbox, update.ID); err != nil { + if err := mailboxDeleteMessage(mbox, apiID); err != nil { return err } } diff --git a/imap/imap.go b/imap/imap.go index 563291d..b049409 100644 --- a/imap/imap.go +++ b/imap/imap.go @@ -6,12 +6,14 @@ import ( imapbackend "github.com/emersion/go-imap/backend" "github.com/emersion/hydroxide/auth" + "github.com/emersion/hydroxide/events" ) var errNotYetImplemented = errors.New("not yet implemented") type backend struct { sessions *auth.Manager + eventsManager *events.Manager } func (be *backend) Login(username, password string) (imapbackend.User, error) { @@ -27,9 +29,9 @@ func (be *backend) Login(username, password string) (imapbackend.User, error) { // TODO: decrypt private keys in u.Addresses - return newUser(c, u, privateKeys) + return newUser(be, c, u, privateKeys) } -func New(sessions *auth.Manager) imapbackend.Backend { - return &backend{sessions} +func New(sessions *auth.Manager, eventsManager *events.Manager) imapbackend.Backend { + return &backend{sessions, eventsManager} } diff --git a/imap/mailbox.go b/imap/mailbox.go index cb2d0d6..ad47f1c 100644 --- a/imap/mailbox.go +++ b/imap/mailbox.go @@ -225,7 +225,9 @@ func (mbox *mailbox) ListMessages(uid bool, seqSet *imap.SeqSet, items []imap.Fe for i := start; i <= stop; i++ { msg, err := mbox.fetchMessage(uid, i, items) - if err != nil { + if err == database.ErrNotFound { + continue + } else if err != nil { return err } if msg != nil { diff --git a/imap/user.go b/imap/user.go index 7ee11ae..62ed0e4 100644 --- a/imap/user.go +++ b/imap/user.go @@ -37,9 +37,11 @@ type user struct { locker sync.Mutex mailboxes map[string]*mailbox + + done chan<- struct{} } -func newUser(c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) { +func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) { uu := &user{ c: c, u: u, @@ -56,7 +58,11 @@ func newUser(c *protonmail.Client, u *protonmail.User, privateKeys openpgp.Entit return nil, err } - // TODO: go uu.receiveEvents(events) + done := make(chan struct{}) + uu.done = done + ch := make(chan *protonmail.Event) + go uu.receiveEvents(ch) + be.eventsManager.Register(c, u.Name, ch, done) return uu, nil } @@ -137,6 +143,8 @@ func (u *user) RenameMailbox(existingName, newName string) error { } func (u *user) Logout() error { + close(u.done) + if err := u.db.Close(); err != nil { return err } @@ -171,22 +179,23 @@ func (u *user) receiveEvents(events <-chan *protonmail.Event) { for _, eventMessage := range event.Messages { switch eventMessage.Action { case protonmail.EventCreate: + log.Println("Received create event for message", eventMessage.ID) if err := u.db.CreateMessage(eventMessage.Created); err != nil { log.Printf("cannot handle create event for message %s: cannot create message in local DB: %v", eventMessage.ID, err) break } // TODO: send updates - case protonmail.EventUpdate: - // No-op - case protonmail.EventUpdateFlags: - if err := u.db.UpdateMessage(eventMessage.Updated); err != nil { + case protonmail.EventUpdate, protonmail.EventUpdateFlags: + log.Println("Received update event for message", eventMessage.ID) + if err := u.db.UpdateMessage(eventMessage.ID, eventMessage.Updated); err != nil { log.Printf("cannot handle update event for message %s: cannot update message in local DB: %v", eventMessage.ID, err) break } // TODO: send updates case protonmail.EventDelete: + log.Println("Received delete event for message", eventMessage.ID) if err := u.db.DeleteMessage(eventMessage.ID); err != nil { log.Printf("cannot handle delete event for message %s: cannot delete message from local DB: %v", eventMessage.ID, err) break diff --git a/protonmail/events.go b/protonmail/events.go index 7696bb9..ba41e22 100644 --- a/protonmail/events.go +++ b/protonmail/events.go @@ -51,8 +51,6 @@ type EventMessage struct { } type EventMessageUpdate struct { - ID string - IsRead *int Type *MessageType Time int64 @@ -98,10 +96,6 @@ func (update *EventMessageUpdate) DiffLabelIDs(current []string) (added, removed } func (update *EventMessageUpdate) Patch(msg *Message) { - if update.ID != msg.ID { - return - } - msg.Time = update.Time if update.IsRead != nil { msg.IsRead = *update.IsRead @@ -151,8 +145,10 @@ func (em *EventMessage) UnmarshalJSON(b []byte) error { em.Action = raw.Action switch raw.Action { case EventCreate: + em.Created = new(Message) return json.Unmarshal(raw.Message, em.Created) case EventUpdate, EventUpdateFlags: + em.Updated = new(EventMessageUpdate) return json.Unmarshal(raw.Message, em.Updated) } return nil