diff --git a/commit/log.go b/commit/log.go index 89b7964ecab01f4f356477e564ad82e0e6c17317..9a81304da61dcc85ea74ee3c96fc93dfd0148e43 100644 --- a/commit/log.go +++ b/commit/log.go @@ -493,13 +493,15 @@ func streamEntries(cache *Cache, } if hdr.hash == hash && hdr.ts >= afterTs { + // Iterator expects a copy of the buffer, so create one, instead of + // creating a big buffer upfront and reusing it. data := make([]byte, hdr.size) _, err := reader.Read(data) if err != nil { flog.WithError(err).Fatal("While reading data.") return err } - iter(data) + iter(hdr, data) } else { reader.Discard(int(hdr.size)) @@ -508,7 +510,7 @@ func streamEntries(cache *Cache, return nil } -type LogIterator func(record []byte) +type LogIterator func(hdr Header, record []byte) func (l *Logger) StreamEntries(afterTs int64, hash uint32, iter LogIterator) error { diff --git a/commit/log_test.go b/commit/log_test.go index c0e2ab6bd87e66ab5bea72345a8098ed7aab6e22..37286e4854a1067342a155a7e7786541c280323f 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -212,7 +212,7 @@ func TestReadEntries(t *testing.T) { { // Check for hash = 1, ts >= 2. count := 0 - err := l.StreamEntries(ts+2, uint32(1), func(entry []byte) { + err := l.StreamEntries(ts+2, uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") @@ -231,7 +231,7 @@ func TestReadEntries(t *testing.T) { } // Check for hash = 1, ts >= 2. count := 0 - err := l.StreamEntries(ts, uint32(1), func(entry []byte) { + err := l.StreamEntries(ts, uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") diff --git a/posting/list.go b/posting/list.go index 9c162f4f8a6216e6efb165a6b3c0016342bef3dc..e9095a8366a1fce890d12faf93d47a947f9e6be5 100644 --- a/posting/list.go +++ b/posting/list.go @@ -247,20 +247,23 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { return } glog.Debug("Starting stream entries...") - err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(buffer []byte) { - uo := flatbuffers.GetUOffsetT(buffer) - m := new(types.Posting) - m.Init(buffer, uo) - if m.Ts() > l.maxMutationTs { - l.maxMutationTs = m.Ts() - } - glog.WithFields(logrus.Fields{ - "uid": m.Uid(), - "source": string(m.Source()), - "ts": m.Ts(), - }).Debug("Got entry from log") - l.mergeMutation(m) - }) + + err := clog.StreamEntries(posting.CommitTs()+1, l.hash, + func(hdr commit.Header, buffer []byte) { + + uo := flatbuffers.GetUOffsetT(buffer) + m := new(types.Posting) + m.Init(buffer, uo) + if m.Ts() > l.maxMutationTs { + l.maxMutationTs = m.Ts() + } + glog.WithFields(logrus.Fields{ + "uid": m.Uid(), + "source": string(m.Source()), + "ts": m.Ts(), + }).Debug("Got entry from log") + l.mergeMutation(m) + }) if err != nil { glog.WithError(err).Error("While streaming entries.") }