From 0495f7d8384f591ce558c9332614cf472c2e3367 Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Wed, 25 Nov 2015 11:12:09 +1100 Subject: [PATCH] Also send header in the log iterator. --- commit/log.go | 6 ++++-- commit/log_test.go | 4 ++-- posting/list.go | 31 +++++++++++++++++-------------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/commit/log.go b/commit/log.go index 89b7964e..9a81304d 100644 --- a/commit/log.go +++ b/commit/log.go @@ -493,13 +493,15 @@ func streamEntries(cache *Cache, } if hdr.hash == hash && hdr.ts >= afterTs { + // Iterator expects a copy of the buffer, so create one, instead of + // creating a big buffer upfront and reusing it. data := make([]byte, hdr.size) _, err := reader.Read(data) if err != nil { flog.WithError(err).Fatal("While reading data.") return err } - iter(data) + iter(hdr, data) } else { reader.Discard(int(hdr.size)) @@ -508,7 +510,7 @@ func streamEntries(cache *Cache, return nil } -type LogIterator func(record []byte) +type LogIterator func(hdr Header, record []byte) func (l *Logger) StreamEntries(afterTs int64, hash uint32, iter LogIterator) error { diff --git a/commit/log_test.go b/commit/log_test.go index c0e2ab6b..37286e48 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -212,7 +212,7 @@ func TestReadEntries(t *testing.T) { { // Check for hash = 1, ts >= 2. count := 0 - err := l.StreamEntries(ts+2, uint32(1), func(entry []byte) { + err := l.StreamEntries(ts+2, uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") @@ -231,7 +231,7 @@ func TestReadEntries(t *testing.T) { } // Check for hash = 1, ts >= 2. count := 0 - err := l.StreamEntries(ts, uint32(1), func(entry []byte) { + err := l.StreamEntries(ts, uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") diff --git a/posting/list.go b/posting/list.go index 9c162f4f..e9095a83 100644 --- a/posting/list.go +++ b/posting/list.go @@ -247,20 +247,23 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { return } glog.Debug("Starting stream entries...") - err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) { - uo := flatbuffers.GetUOffsetT(buffer) - m := new(types.Posting) - m.Init(buffer, uo) - if m.Ts() > l.maxMutationTs { - l.maxMutationTs = m.Ts() - } - glog.WithFields(logrus.Fields{ - "uid": m.Uid(), - "source": string(m.Source()), - "ts": m.Ts(), - }).Debug("Got entry from log") - l.mergeMutation(m) - }) + + err := clog.StreamEntries(posting.CommitTs()+1, l.hash, + func(hdr commit.Header, buffer []byte) { + + uo := flatbuffers.GetUOffsetT(buffer) + m := new(types.Posting) + m.Init(buffer, uo) + if m.Ts() > l.maxMutationTs { + l.maxMutationTs = m.Ts() + } + glog.WithFields(logrus.Fields{ + "uid": m.Uid(), + "source": string(m.Source()), + "ts": m.Ts(), + }).Debug("Got entry from log") + l.mergeMutation(m) + }) if err != nil { glog.WithError(err).Error("While streaming entries.") } -- GitLab