imap: send updates
This commit is contained in:
parent
f9cb9383e4
commit
7fa9293eeb
|
@ -14,6 +14,7 @@ var errNotYetImplemented = errors.New("not yet implemented")
|
|||
type backend struct {
|
||||
sessions *auth.Manager
|
||||
eventsManager *events.Manager
|
||||
updates chan interface{}
|
||||
}
|
||||
|
||||
func (be *backend) Login(username, password string) (imapbackend.User, error) {
|
||||
|
@ -32,6 +33,10 @@ func (be *backend) Login(username, password string) (imapbackend.User, error) {
|
|||
return newUser(be, c, u, privateKeys)
|
||||
}
|
||||
|
||||
func New(sessions *auth.Manager, eventsManager *events.Manager) imapbackend.Backend {
|
||||
return &backend{sessions, eventsManager}
|
||||
func (be *backend) Updates() <-chan interface{} {
|
||||
return be.updates
|
||||
}
|
||||
|
||||
func New(sessions *auth.Manager, eventsManager *events.Manager) imapbackend.Backend {
|
||||
return &backend{sessions, eventsManager, make(chan interface{}, 50)}
|
||||
}
|
|
@ -20,29 +20,33 @@ func unserializeUID(b []byte) uint32 {
|
|||
return binary.BigEndian.Uint32(b)
|
||||
}
|
||||
|
||||
func mailboxCreateMessage(b *bolt.Bucket, apiID string) error {
|
||||
func mailboxCreateMessage(b *bolt.Bucket, apiID string) (seqNum uint32, err error) {
|
||||
want := []byte(apiID)
|
||||
c := b.Cursor()
|
||||
var n uint32 = 1
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if bytes.Equal(v, want) {
|
||||
return nil
|
||||
return n, nil
|
||||
}
|
||||
n++
|
||||
}
|
||||
|
||||
id, _ := b.NextSequence()
|
||||
uid := uint32(id)
|
||||
return b.Put(serializeUID(uid), want)
|
||||
return n, b.Put(serializeUID(uid), want)
|
||||
}
|
||||
|
||||
func mailboxDeleteMessage(b *bolt.Bucket, apiID string) error {
|
||||
func mailboxDeleteMessage(b *bolt.Bucket, apiID string) (seqNum uint32, err error) {
|
||||
want := []byte(apiID)
|
||||
c := b.Cursor()
|
||||
var n uint32 = 1
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if bytes.Equal(v, want) {
|
||||
return b.Delete(k)
|
||||
return n, b.Delete(k)
|
||||
}
|
||||
n++
|
||||
}
|
||||
return nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
type Mailbox struct {
|
||||
|
@ -70,7 +74,7 @@ func (mbox *Mailbox) Sync(messages []*protonmail.Message) error {
|
|||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
if err := mailboxCreateMessage(b, msg.ID); err != nil {
|
||||
if _, err := mailboxCreateMessage(b, msg.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -93,9 +97,8 @@ func (mbox *Mailbox) UidNext() (uint32, error) {
|
|||
return uid, err
|
||||
}
|
||||
|
||||
func (mbox *Mailbox) FromUid(uid uint32) (string, error) {
|
||||
var apiID string
|
||||
err := mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
func (mbox *Mailbox) FromUid(uid uint32) (apiID string, err error) {
|
||||
err = mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
b, err := mbox.bucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -109,12 +112,11 @@ func (mbox *Mailbox) FromUid(uid uint32) (string, error) {
|
|||
apiID = string(v)
|
||||
return nil
|
||||
})
|
||||
return apiID, err
|
||||
return
|
||||
}
|
||||
|
||||
func (mbox *Mailbox) FromSeqNum(seqNum uint32) (string, error) {
|
||||
var apiID string
|
||||
err := mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
func (mbox *Mailbox) FromSeqNum(seqNum uint32) (apiID string, err error) {
|
||||
err = mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
b, err := mbox.bucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -132,12 +134,11 @@ func (mbox *Mailbox) FromSeqNum(seqNum uint32) (string, error) {
|
|||
|
||||
return ErrNotFound
|
||||
})
|
||||
return apiID, err
|
||||
return
|
||||
}
|
||||
|
||||
func (mbox *Mailbox) FromApiID(apiID string) (uint32, uint32, error) {
|
||||
var seqNum, uid uint32
|
||||
err := mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
func (mbox *Mailbox) FromApiID(apiID string) (seqNum uint32, uid uint32, err error) {
|
||||
err = mbox.u.db.View(func(tx *bolt.Tx) error {
|
||||
b, err := mbox.bucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -157,7 +158,7 @@ func (mbox *Mailbox) FromApiID(apiID string) (uint32, uint32, error) {
|
|||
|
||||
return ErrNotFound
|
||||
})
|
||||
return seqNum, uid, err
|
||||
return
|
||||
}
|
||||
|
||||
func (mbox *Mailbox) ForEach(f func(seqNum, uid uint32, apiID string) error) error {
|
||||
|
|
|
@ -93,8 +93,9 @@ func (u *User) ResetMessages() error {
|
|||
})
|
||||
}
|
||||
|
||||
func (u *User) CreateMessage(msg *protonmail.Message) error {
|
||||
return u.db.Update(func(tx *bolt.Tx) error {
|
||||
func (u *User) CreateMessage(msg *protonmail.Message) (seqNums map[string]uint32, err error) {
|
||||
seqNums = make(map[string]uint32)
|
||||
err = u.db.Update(func(tx *bolt.Tx) error {
|
||||
messages, err := tx.CreateBucketIfNotExists(messagesBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -114,17 +115,22 @@ func (u *User) CreateMessage(msg *protonmail.Message) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := mailboxCreateMessage(mbox, msg.ID); err != nil {
|
||||
seqNum, err := mailboxCreateMessage(mbox, msg.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seqNums[labelID] = seqNum
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate) error {
|
||||
return u.db.Update(func(tx *bolt.Tx) error {
|
||||
func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate) (createdSeqNums map[string]uint32, deletedSeqNums map[string]uint32, err error) {
|
||||
createdSeqNums = make(map[string]uint32)
|
||||
deletedSeqNums = make(map[string]uint32)
|
||||
err = u.db.Update(func(tx *bolt.Tx) error {
|
||||
messages := tx.Bucket(messagesBucket)
|
||||
if messages == nil {
|
||||
return errors.New("cannot update message in local DB: messages bucket doesn't exist")
|
||||
|
@ -147,9 +153,11 @@ func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate
|
|||
return err
|
||||
}
|
||||
|
||||
if err := mailboxCreateMessage(mbox, apiID); err != nil {
|
||||
seqNum, err := mailboxCreateMessage(mbox, apiID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
createdSeqNums[labelID] = seqNum
|
||||
}
|
||||
for _, labelID := range removedLabels {
|
||||
mbox := mailboxes.Bucket([]byte(labelID))
|
||||
|
@ -157,18 +165,22 @@ func (u *User) UpdateMessage(apiID string, update *protonmail.EventMessageUpdate
|
|||
continue
|
||||
}
|
||||
|
||||
if err := mailboxDeleteMessage(mbox, apiID); err != nil {
|
||||
seqNum, err := mailboxDeleteMessage(mbox, apiID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deletedSeqNums[labelID] = seqNum
|
||||
}
|
||||
|
||||
update.Patch(msg)
|
||||
return userCreateMessage(messages, msg)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (u *User) DeleteMessage(apiID string) error {
|
||||
return u.db.Update(func(tx *bolt.Tx) error {
|
||||
func (u *User) DeleteMessage(apiID string) (seqNums map[string]uint32, err error) {
|
||||
seqNums = make(map[string]uint32)
|
||||
err = u.db.Update(func(tx *bolt.Tx) error {
|
||||
messages:= tx.Bucket(messagesBucket)
|
||||
if messages == nil {
|
||||
return nil
|
||||
|
@ -195,13 +207,16 @@ func (u *User) DeleteMessage(apiID string) error {
|
|||
continue
|
||||
}
|
||||
|
||||
if err := mailboxDeleteMessage(mbox, msg.ID); err != nil {
|
||||
seqNum, err := mailboxDeleteMessage(mbox, msg.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seqNums[labelID] = seqNum
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (u *User) Close() error {
|
||||
|
|
90
imap/user.go
90
imap/user.go
|
@ -61,7 +61,7 @@ func newUser(be *backend, c *protonmail.Client, u *protonmail.User, privateKeys
|
|||
done := make(chan struct{})
|
||||
uu.done = done
|
||||
ch := make(chan *protonmail.Event)
|
||||
go uu.receiveEvents(ch)
|
||||
go uu.receiveEvents(be.updates, ch)
|
||||
be.eventsManager.Register(c, u.Name, ch, done)
|
||||
|
||||
return uu, nil
|
||||
|
@ -118,6 +118,12 @@ func (u *user) ListMailboxes(subscribed bool) ([]imapbackend.Mailbox, error) {
|
|||
return list, nil
|
||||
}
|
||||
|
||||
func (u *user) getMailboxByLabel(labelID string) *mailbox {
|
||||
u.locker.Lock()
|
||||
defer u.locker.Unlock()
|
||||
return u.mailboxes[labelID]
|
||||
}
|
||||
|
||||
func (u *user) GetMailbox(name string) (imapbackend.Mailbox, error) {
|
||||
u.locker.Lock()
|
||||
defer u.locker.Unlock()
|
||||
|
@ -155,7 +161,7 @@ func (u *user) Logout() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (u *user) receiveEvents(events <-chan *protonmail.Event) {
|
||||
func (u *user) receiveEvents(updates chan<- interface{}, events <-chan *protonmail.Event) {
|
||||
for event := range events {
|
||||
if event.Refresh&protonmail.EventRefreshMail != 0 {
|
||||
log.Println("Reinitializing the whole IMAP database")
|
||||
|
@ -180,28 +186,96 @@ func (u *user) receiveEvents(events <-chan *protonmail.Event) {
|
|||
switch eventMessage.Action {
|
||||
case protonmail.EventCreate:
|
||||
log.Println("Received create event for message", eventMessage.ID)
|
||||
if err := u.db.CreateMessage(eventMessage.Created); err != nil {
|
||||
seqNums, err := u.db.CreateMessage(eventMessage.Created)
|
||||
if err != nil {
|
||||
log.Printf("cannot handle create event for message %s: cannot create message in local DB: %v", eventMessage.ID, err)
|
||||
break
|
||||
}
|
||||
|
||||
// TODO: send updates
|
||||
// TODO: what if the message was already in the local DB?
|
||||
for labelID, seqNum := range seqNums {
|
||||
if mbox := u.getMailboxByLabel(labelID); mbox != nil {
|
||||
update := new(imapbackend.MailboxUpdate)
|
||||
update.Username = u.u.Name
|
||||
update.Mailbox = mbox.name
|
||||
update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages})
|
||||
update.MailboxStatus.Messages = seqNum
|
||||
updates <- update
|
||||
}
|
||||
}
|
||||
case protonmail.EventUpdate, protonmail.EventUpdateFlags:
|
||||
log.Println("Received update event for message", eventMessage.ID)
|
||||
if err := u.db.UpdateMessage(eventMessage.ID, eventMessage.Updated); err != nil {
|
||||
createdSeqNums, deletedSeqNums, err := u.db.UpdateMessage(eventMessage.ID, eventMessage.Updated)
|
||||
if err != nil {
|
||||
log.Printf("cannot handle update event for message %s: cannot update message in local DB: %v", eventMessage.ID, err)
|
||||
break
|
||||
}
|
||||
|
||||
// TODO: send updates
|
||||
for labelID, seqNum := range createdSeqNums {
|
||||
if mbox := u.getMailboxByLabel(labelID); mbox != nil {
|
||||
update := new(imapbackend.MailboxUpdate)
|
||||
update.Username = u.u.Name
|
||||
update.Mailbox = mbox.name
|
||||
update.MailboxStatus = imap.NewMailboxStatus(mbox.name, []imap.StatusItem{imap.StatusMessages})
|
||||
update.MailboxStatus.Messages = seqNum
|
||||
updates <- update
|
||||
}
|
||||
}
|
||||
for labelID, seqNum := range deletedSeqNums {
|
||||
if mbox := u.getMailboxByLabel(labelID); mbox != nil {
|
||||
update := new(imapbackend.ExpungeUpdate)
|
||||
update.Username = u.u.Name
|
||||
update.Mailbox = mbox.name
|
||||
update.SeqNum = seqNum
|
||||
updates <- update
|
||||
}
|
||||
}
|
||||
|
||||
// Send message updates
|
||||
msg, err := u.db.Message(eventMessage.ID)
|
||||
if err != nil {
|
||||
log.Printf("cannot handle update event for message %s: cannot get updated message from local DB: %v", eventMessage.ID, err)
|
||||
break
|
||||
}
|
||||
for _, labelID := range msg.LabelIDs {
|
||||
if _, created := createdSeqNums[labelID]; created {
|
||||
// This message has been added to the label's mailbox
|
||||
// No need to send a message update
|
||||
continue
|
||||
}
|
||||
|
||||
if mbox := u.getMailboxByLabel(labelID); mbox != nil {
|
||||
seqNum, _, err := mbox.db.FromApiID(eventMessage.ID)
|
||||
if err != nil {
|
||||
log.Printf("cannot handle update event for message %s: cannot get message sequence number in %s: %v", eventMessage.ID, mbox.name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
update := new(imapbackend.MessageUpdate)
|
||||
update.Username = u.u.Name
|
||||
update.Mailbox = mbox.name
|
||||
update.Message = imap.NewMessage(seqNum, []imap.FetchItem{imap.FetchFlags})
|
||||
update.Message.Flags = fetchFlags(msg)
|
||||
updates <- update
|
||||
}
|
||||
}
|
||||
case protonmail.EventDelete:
|
||||
log.Println("Received delete event for message", eventMessage.ID)
|
||||
if err := u.db.DeleteMessage(eventMessage.ID); err != nil {
|
||||
seqNums, err := u.db.DeleteMessage(eventMessage.ID)
|
||||
if err != nil {
|
||||
log.Printf("cannot handle delete event for message %s: cannot delete message from local DB: %v", eventMessage.ID, err)
|
||||
break
|
||||
}
|
||||
|
||||
// TODO: send updates
|
||||
for labelID, seqNum := range seqNums {
|
||||
if mbox := u.getMailboxByLabel(labelID); mbox != nil {
|
||||
update := new(imapbackend.ExpungeUpdate)
|
||||
update.Username = u.u.Name
|
||||
update.Mailbox = mbox.name
|
||||
update.SeqNum = seqNum
|
||||
updates <- update
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue