imap: listen for events

This commit is contained in:
emersion 2018-01-12 13:20:17 +01:00
parent 12cccff787
commit f9cb9383e4
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
7 changed files with 69 additions and 29 deletions

View File

@ -32,10 +32,11 @@ func newClient() *protonmail.Client {
} }
} }
func receiveEvents(c *protonmail.Client, last string, ch chan<- *protonmail.Event) { func receiveEvents(c *protonmail.Client, ch chan<- *protonmail.Event) {
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Minute)
defer t.Stop() defer t.Stop()
var last string
for range t.C { for range t.C {
event, err := c.GetEvent(last) event, err := c.GetEvent(last)
if err != nil { if err != nil {
@ -160,8 +161,9 @@ func main() {
} }
sessions := auth.NewManager(newClient) sessions := auth.NewManager(newClient)
eventsManager := events.NewManager()
be := imapbackend.New(sessions) be := imapbackend.New(sessions, eventsManager)
s := imapserver.New(be) s := imapserver.New(be)
s.Addr = "127.0.0.1:" + port s.Addr = "127.0.0.1:" + port
s.AllowInsecureAuth = true // TODO: remove this s.AllowInsecureAuth = true // TODO: remove this
@ -176,9 +178,8 @@ func main() {
port = "8080" port = "8080"
} }
eventsManager := events.NewManager()
sessions := auth.NewManager(newClient) sessions := auth.NewManager(newClient)
eventsManager := events.NewManager()
handlers := make(map[string]http.Handler) handlers := make(map[string]http.Handler)
s := &http.Server{ s := &http.Server{
@ -207,7 +208,7 @@ func main() {
h, ok := handlers[username] h, ok := handlers[username]
if !ok { if !ok {
ch := make(chan *protonmail.Event) ch := make(chan *protonmail.Event)
eventsManager.Register(c, username, ch) eventsManager.Register(c, username, ch, nil)
h = carddav.NewHandler(c, privateKeys, ch) h = carddav.NewHandler(c, privateKeys, ch)
handlers[username] = h handlers[username] = h

View File

@ -8,7 +8,7 @@ import (
"github.com/emersion/hydroxide/protonmail" "github.com/emersion/hydroxide/protonmail"
) )
const pollInterval = time.Minute const pollInterval = 30 * time.Second
type receiver struct { type receiver struct {
channels []chan<- *protonmail.Event channels []chan<- *protonmail.Event
@ -32,10 +32,15 @@ func (r *receiver) receiveEvents(c *protonmail.Client, last string) {
last = event.ID last = event.ID
r.locker.Lock() r.locker.Lock()
n := len(r.channels)
for _, ch := range r.channels { for _, ch := range r.channels {
ch <- event ch <- event
} }
r.locker.Unlock() r.locker.Unlock()
if n == 0 {
break
}
} }
} }
@ -50,11 +55,12 @@ func NewManager() *Manager {
} }
} }
func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event) { func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *protonmail.Event, done <-chan struct{}) {
m.locker.Lock() m.locker.Lock()
defer m.locker.Unlock() defer m.locker.Unlock()
if r, ok := m.receivers[username]; ok { r, ok := m.receivers[username]
if ok {
r.locker.Lock() r.locker.Lock()
r.channels = append(r.channels, ch) r.channels = append(r.channels, ch)
r.locker.Unlock() r.locker.Unlock()
@ -62,7 +68,31 @@ func (m *Manager) Register(c *protonmail.Client, username string, ch chan<- *pro
r = &receiver{ r = &receiver{
channels: []chan<- *protonmail.Event{ch}, channels: []chan<- *protonmail.Event{ch},
} }
go r.receiveEvents(c, "")
go func() {
r.receiveEvents(c, "")
m.locker.Lock()
delete(m.receivers, username)
m.locker.Unlock()
}()
m.receivers[username] = r m.receivers[username] = r
} }
if done != nil {
go func() {
<-done
r.locker.Lock()
for i, c := range r.channels {
if c == ch {
r.channels = append(r.channels[:i], r.channels[i+1:]...)
}
}
r.locker.Unlock()
close(ch)
}()
}
} }

View File

@ -123,14 +123,14 @@ func (u *User) CreateMessage(msg *protonmail.Message) error {
}) })
} }
func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error { func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate) error {
return u.db.Update(func(tx *bolt.Tx) error { return u.db.Update(func(tx *bolt.Tx) error {
messages := tx.Bucket(messagesBucket) messages := tx.Bucket(messagesBucket)
if messages == nil { if messages == nil {
return errors.New("cannot update message in local DB: messages bucket doesn't exist") return errors.New("cannot update message in local DB: messages bucket doesn't exist")
} }
msg, err := userMessage(messages, update.ID) msg, err := userMessage(messages, apiID)
if err != nil { if err != nil {
return err return err
} }
@ -147,7 +147,7 @@ func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error {
return err return err
} }
if err := mailboxCreateMessage(mbox, update.ID); err != nil { if err := mailboxCreateMessage(mbox, apiID); err != nil {
return err return err
} }
} }
@ -157,7 +157,7 @@ func (u *User) UpdateMessage(update *protonmail.EventMessageUpdate) error {
continue continue
} }
if err := mailboxDeleteMessage(mbox, update.ID); err != nil { if err := mailboxDeleteMessage(mbox, apiID); err != nil {
return err return err
} }
} }

View File

@ -6,12 +6,14 @@ import (
imapbackend "github.com/emersion/go-imap/backend" imapbackend "github.com/emersion/go-imap/backend"
"github.com/emersion/hydroxide/auth" "github.com/emersion/hydroxide/auth"
"github.com/emersion/hydroxide/events"
) )
var errNotYetImplemented = errors.New("not yet implemented") var errNotYetImplemented = errors.New("not yet implemented")
type backend struct { type backend struct {
sessions *auth.Manager sessions *auth.Manager
eventsManager *events.Manager
} }
func (be *backend) Login(username, password string) (imapbackend.User, error) { func (be *backend) Login(username, password string) (imapbackend.User, error) {
@ -27,9 +29,9 @@ func (be *backend) Login(username, password string) (imapbackend.User, error) {
// TODO: decrypt private keys in u.Addresses // TODO: decrypt private keys in u.Addresses
return newUser(c, u, privateKeys) return newUser(be, c, u, privateKeys)
} }
func New(sessions *auth.Manager) imapbackend.Backend { func New(sessions *auth.Manager, eventsManager *events.Manager) imapbackend.Backend {
return &backend{sessions} return &backend{sessions, eventsManager}
} }

View File

@ -225,7 +225,9 @@ func (mbox *mailbox) ListMessages(uid bool, seqSet *imap.SeqSet, items []imap.Fe
for i := start; i <= stop; i++ { for i := start; i <= stop; i++ {
msg, err := mbox.fetchMessage(uid, i, items) msg, err := mbox.fetchMessage(uid, i, items)
if err != nil { if err == database.ErrNotFound {
continue
} else if err != nil {
return err return err
} }
if msg != nil { if msg != nil {

View File

@ -37,9 +37,11 @@ type user struct {
locker sync.Mutex locker sync.Mutex
mailboxes map[string]*mailbox mailboxes map[string]*mailbox
done chan<- struct{}
} }
func newUser(c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) { func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys openpgp.EntityList) (*user, error) {
uu := &user{ uu := &user{
c: c, c: c,
u: u, u: u,
@ -56,7 +58,11 @@ func newUser(c *protonmail.Client, u *protonmail.User, privateKeys openpgp.Entit
return nil, err return nil, err
} }
// TODO: go uu.receiveEvents(events) done := make(chan struct{})
uu.done = done
ch := make(chan *protonmail.Event)
go uu.receiveEvents(ch)
be.eventsManager.Register(c, u.Name, ch, done)
return uu, nil return uu, nil
} }
@ -137,6 +143,8 @@ func (u *user) RenameMailbox(existingName, newName string) error {
} }
func (u *user) Logout() error { func (u *user) Logout() error {
close(u.done)
if err := u.db.Close(); err != nil { if err := u.db.Close(); err != nil {
return err return err
} }
@ -171,22 +179,23 @@ func (u *user) receiveEvents(events <-chan *protonmail.Event) {
for _, eventMessage := range event.Messages { for _, eventMessage := range event.Messages {
switch eventMessage.Action { switch eventMessage.Action {
case protonmail.EventCreate: case protonmail.EventCreate:
log.Println("Received create event for message", eventMessage.ID)
if err := u.db.CreateMessage(eventMessage.Created); err != nil { if err := u.db.CreateMessage(eventMessage.Created); err != nil {
log.Printf("cannot handle create event for message %s: cannot create message in local DB: %v", eventMessage.ID, err) log.Printf("cannot handle create event for message %s: cannot create message in local DB: %v", eventMessage.ID, err)
break break
} }
// TODO: send updates // TODO: send updates
case protonmail.EventUpdate: case protonmail.EventUpdate, protonmail.EventUpdateFlags:
// No-op log.Println("Received update event for message", eventMessage.ID)
case protonmail.EventUpdateFlags: if err := u.db.UpdateMessage(eventMessage.ID, eventMessage.Updated); err != nil {
if err := u.db.UpdateMessage(eventMessage.Updated); err != nil {
log.Printf("cannot handle update event for message %s: cannot update message in local DB: %v", eventMessage.ID, err) log.Printf("cannot handle update event for message %s: cannot update message in local DB: %v", eventMessage.ID, err)
break break
} }
// TODO: send updates // TODO: send updates
case protonmail.EventDelete: case protonmail.EventDelete:
log.Println("Received delete event for message", eventMessage.ID)
if err := u.db.DeleteMessage(eventMessage.ID); err != nil { if err := u.db.DeleteMessage(eventMessage.ID); err != nil {
log.Printf("cannot handle delete event for message %s: cannot delete message from local DB: %v", eventMessage.ID, err) log.Printf("cannot handle delete event for message %s: cannot delete message from local DB: %v", eventMessage.ID, err)
break break

View File

@ -51,8 +51,6 @@ type EventMessage struct {
} }
type EventMessageUpdate struct { type EventMessageUpdate struct {
ID string
IsRead *int IsRead *int
Type *MessageType Type *MessageType
Time int64 Time int64
@ -98,10 +96,6 @@ func (update *EventMessageUpdate) DiffLabelIDs(current []string) (added, removed
} }
func (update *EventMessageUpdate) Patch(msg *Message) { func (update *EventMessageUpdate) Patch(msg *Message) {
if update.ID != msg.ID {
return
}
msg.Time = update.Time msg.Time = update.Time
if update.IsRead != nil { if update.IsRead != nil {
msg.IsRead = *update.IsRead msg.IsRead = *update.IsRead
@ -151,8 +145,10 @@ func (em *EventMessage) UnmarshalJSON(b []byte) error {
em.Action = raw.Action em.Action = raw.Action
switch raw.Action { switch raw.Action {
case EventCreate: case EventCreate:
em.Created = new(Message)
return json.Unmarshal(raw.Message, em.Created) return json.Unmarshal(raw.Message, em.Created)
case EventUpdate, EventUpdateFlags: case EventUpdate, EventUpdateFlags:
em.Updated = new(EventMessageUpdate)
return json.Unmarshal(raw.Message, em.Updated) return json.Unmarshal(raw.Message, em.Updated)
} }
return nil return nil