From 86510f1d02448b0ba8b14c41b02f8962aaef1b6c Mon Sep 17 00:00:00 2001 From: Pawan Rawal <pawan@dgraph.io> Date: Thu, 19 May 2016 15:40:27 +0530 Subject: [PATCH] Modifying AddMutation and AddLog to fix the commit timestamp bug. --- commit/log.go | 53 +++++++++++++++++++------------------ commit/log_test.go | 66 ++++++++++++++++++++-------------------------- posting/list.go | 46 ++++++++++++++++---------------- 3 files changed, 79 insertions(+), 86 deletions(-) diff --git a/commit/log.go b/commit/log.go index a41ac242..6311bbe3 100644 --- a/commit/log.go +++ b/commit/log.go @@ -379,7 +379,6 @@ func lastTimestamp(c *Cache) (int64, error) { "numrecords": count, }).Fatal("Log file doesn't have monotonically increasing records.") } - reader.Discard(int(h.size)) } return maxTs, nil @@ -448,9 +447,28 @@ func setError(prev *error, n error) { return } -func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { - if ts < atomic.LoadInt64(&l.lastLogTs) { - return fmt.Errorf("Timestamp lower than last log timestamp.") +func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) { + lbuf := int64(len(value)) + 16 + if l.curFile().Size()+lbuf > l.maxSize { + if err := l.rotateCurrent(); err != nil { + glog.WithError(err).Error("While rotating current file out.") + return 0, err + } + } + + cf := l.curFile() + if cf == nil { + glog.Fatalf("Current file isn't initialized.") + } + + cf.Lock() + defer cf.Unlock() + + ts := time.Now().UnixNano() + lts := atomic.LoadInt64(&l.lastLogTs) + if ts < lts { + ts = lts + 1 + // We don't have to do CompareAndSwap because we've a mutex lock. } buf := new(bytes.Buffer) @@ -461,32 +479,17 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { _, nerr := buf.Write(value) setError(&err, nerr) if err != nil { - return err + return ts, err } - glog.WithField("bytes", buf.Len()).WithField("ts", ts). - Debug("Log entry buffer.") + glog.WithField("bytes", buf.Len()).Debug("Log entry buffer.") - if l.curFile().Size()+int64(buf.Len()) > l.maxSize { - if err = l.rotateCurrent(); err != nil { - glog.WithError(err).Error("While rotating current file out.") - return err - } - } - - cf := l.curFile() - if cf == nil { - glog.Fatalf("Current file isn't initialized.") - } - - cf.Lock() - defer cf.Unlock() if _, err = cf.f.Write(buf.Bytes()); err != nil { glog.WithError(err).Error("While writing to current file.") - return err + return ts, err } if _, err = cf.cache().Write(hash, buf.Bytes()); err != nil { glog.WithError(err).Error("While writing to current cache.") - return err + return ts, err } cf.dirtyLogs += 1 cf.size += int64(buf.Len()) @@ -494,9 +497,9 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { if l.SyncEvery <= 0 || cf.dirtyLogs >= l.SyncEvery { cf.dirtyLogs = 0 glog.Debug("Syncing file") - return cf.f.Sync() + return ts, cf.f.Sync() } - return nil + return ts, nil } // streamEntries allows for hash to be zero. diff --git a/commit/log_test.go b/commit/log_test.go index 4b6284a8..532d53b1 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -22,7 +22,6 @@ import ( "math/rand" "os" "path/filepath" - "sync/atomic" "testing" "time" ) @@ -68,24 +67,19 @@ func TestAddLog(t *testing.T) { l.Init() defer l.Close() - ts := time.Now().UnixNano() for i := 0; i < 10; i++ { - curts := ts + int64(i) - if err := l.AddLog(curts, 0, []byte("hey")); err != nil { + if _, err := l.AddLog(0, []byte("hey")); err != nil { t.Error(err) - return + t.Fail() } time.Sleep(500 * time.Microsecond) } glog.Debugf("Test curfile path: %v", l.cf.f.Name()) - last, err := lastTimestamp(l.cf.cache()) + _, err = lastTimestamp(l.cf.cache()) if err != nil { t.Error(err) } - if last != ts+9 { - t.Errorf("Expected %v. Got: %v\n", ts+9, last) - } } func TestRotatingLog(t *testing.T) { @@ -102,12 +96,13 @@ func TestRotatingLog(t *testing.T) { l.Init() data := make([]byte, 400) - ts := time.Now().UnixNano() + var ts []int64 for i := 0; i < 9; i++ { - curts := ts + int64(i) - if err := l.AddLog(curts, 0, data); err != nil { + if logts, err := l.AddLog(0, data); err != nil { t.Error(err) return + } else { + ts = append(ts, logts) } } // This should have created 4 files of 832 bytes each (header + data), and @@ -116,7 +111,7 @@ func TestRotatingLog(t *testing.T) { t.Errorf("Expected 4 files. Got: %v", len(l.list)) } for i, lf := range l.list { - exp := ts + int64(2*i+1) + exp := ts[i*2+1] if lf.endTs != exp { t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs) } @@ -124,9 +119,6 @@ func TestRotatingLog(t *testing.T) { if l.curFile().Size() != 416 { t.Errorf("Expected size 416. Got: %v", l.curFile().Size()) } - if atomic.LoadInt64(&l.lastLogTs) != ts+int64(8) { - t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs) - } l.Close() l = nil // Important to avoid re-use later. @@ -140,31 +132,30 @@ func TestRotatingLog(t *testing.T) { if nl.curFile().Size() != 416 { t.Errorf("Expected size 416. Got: %v", nl.curFile().Size()) } - if err := nl.AddLog(ts, 0, data); err == nil { - t.Error("Adding an entry with older ts should fail.") - } - if err := nl.AddLog(ts+int64(100), 0, data); err != nil { + secondlast, err := nl.AddLog(0, data) + if err != nil { t.Error(err) return } if nl.curFile().Size() != 832 { t.Errorf("Expected size 832. Got: %v", nl.curFile().Size()) } - if err := nl.AddLog(ts+int64(113), 0, data); err != nil { + last, err := nl.AddLog(0, data) + if err != nil { t.Error(err) return } if len(nl.list) != 5 { t.Errorf("Expected 5 files. Got: %v", len(nl.list)) } - if nl.list[4].endTs != ts+int64(100) { - t.Errorf("Expected ts: %v. Got: %v", ts+int64(100), nl.list[4].endTs) + if nl.list[4].endTs != secondlast { + t.Errorf("Expected ts: %v. Got: %v", secondlast, nl.list[4].endTs) } if nl.curFile().Size() != 416 { t.Errorf("Expected size 416. Got: %v", nl.curFile().Size()) } - if nl.lastLogTs != ts+int64(113) { - t.Errorf("Expected last log ts: %v. Got: %v", ts+int64(113), nl.lastLogTs) + if nl.lastLogTs != last { + t.Errorf("Expected last log ts: %v. Got: %v", last, nl.lastLogTs) } } @@ -183,12 +174,13 @@ func TestReadEntries(t *testing.T) { defer l.Close() data := make([]byte, 400) - ts := time.Now().UnixNano() + var ts []int64 for i := 0; i < 9; i++ { - curts := ts + int64(i) - if err := l.AddLog(curts, uint32(i%3), data); err != nil { + if lts, err := l.AddLog(uint32(i%3), data); err != nil { t.Error(err) return + } else { + ts = append(ts, lts) } } // This should have created 4 files of 832 bytes each (header + data), and @@ -197,7 +189,7 @@ func TestReadEntries(t *testing.T) { t.Errorf("Expected 4 files. Got: %v", len(l.list)) } for i, lf := range l.list { - exp := ts + int64(2*i+1) + exp := ts[i*2+1] if lf.endTs != exp { t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs) } @@ -205,14 +197,14 @@ func TestReadEntries(t *testing.T) { if l.curFile().Size() != 416 { t.Errorf("Expected size 416. Got: %v", l.curFile().Size()) } - if l.lastLogTs != ts+int64(8) { - t.Errorf("Expected ts: %v. Got: %v", ts+int64(8), l.lastLogTs) + if l.lastLogTs != ts[8] { + t.Errorf("Expected ts: %v. Got: %v", ts[8], l.lastLogTs) } { // Check for hash = 1, ts >= 2. count := 0 - err := l.StreamEntries(ts+2, uint32(1), func(hdr Header, entry []byte) { + err := l.StreamEntries(ts[2], uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") @@ -226,12 +218,13 @@ func TestReadEntries(t *testing.T) { } } { - if err := l.AddLog(ts+int64(9), 1, data); err != nil { + // Add another entry for hash = 1. + if _, err := l.AddLog(1, data); err != nil { t.Error(err) } - // Check for hash = 1, ts >= 2. + // Check for hash = 1, ts >= 0. count := 0 - err := l.StreamEntries(ts, uint32(1), func(hdr Header, entry []byte) { + err := l.StreamEntries(ts[0], uint32(1), func(hdr Header, entry []byte) { count += 1 if bytes.Compare(data, entry) != 0 { t.Error("Data doesn't equate.") @@ -260,10 +253,9 @@ func benchmarkAddLog(n int, b *testing.B) { data := make([]byte, 100) b.ResetTimer() - ts := time.Now().UnixNano() for i := 0; i < b.N; i++ { end := rand.Intn(50) - if err := l.AddLog(ts+int64(i), 0, data[:50+end]); err != nil { + if _, err := l.AddLog(0, data[:50+end]); err != nil { b.Error(err) } } diff --git a/posting/list.go b/posting/list.go index 17450f24..7b695a9e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -62,7 +62,7 @@ type List struct { clog *commit.Logger lastCompact time.Time wg sync.WaitGroup - deleteMe bool + deleteMe int32 // Mutations mlayer map[int]types.Posting // stores only replace instructions. @@ -559,9 +559,7 @@ func (l *List) get(p *types.Posting, i int) bool { func (l *List) SetForDeletion() { l.wg.Wait() - l.Lock() - defer l.Unlock() - l.deleteMe = true + atomic.StoreInt32(&l.deleteMe, 1) } // In benchmarks, the time taken per AddMutation before was @@ -581,23 +579,10 @@ func (l *List) SetForDeletion() { // ok github.com/dgraph-io/dgraph/posting 10.291s func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.wg.Wait() - l.Lock() - defer l.Unlock() - if l.deleteMe { + if atomic.LoadInt32(&l.deleteMe) == 1 { return E_TMP_ERROR } - if t.Timestamp.UnixNano() < l.maxMutationTs { - return fmt.Errorf("Mutation ts lower than committed ts.") - } - - // Mutation arrives: - // - Check if we had any(SET/DEL) before this, stored in the mutation list. - // - If yes, then replace that mutation. Jump to a) - // a) check if the entity exists in main posting list. - // - If yes, store the mutation. - // - If no, disregard this mutation. - // All edges with a value set, have the same uid. In other words, // an (entity, attribute) can only have one value. if !bytes.Equal(t.Value, nil) { @@ -606,8 +591,24 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { if t.ValueId == 0 { return fmt.Errorf("ValueId cannot be zero.") } - mbuf := newPosting(t, op) + var err error + var ts int64 + if l.clog != nil { + ts, err = l.clog.AddLog(l.hash, mbuf) + if err != nil { + return err + } + } + // Mutation arrives: + // - Check if we had any(SET/DEL) before this, stored in the mutation list. + // - If yes, then replace that mutation. Jump to a) + // a) check if the entity exists in main posting list. + // - If yes, store the mutation. + // - If no, disregard this mutation. + l.Lock() + defer l.Unlock() + uo := flatbuffers.GetUOffsetT(mbuf) mpost := new(types.Posting) mpost.Init(mbuf, uo) @@ -619,17 +620,14 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { }).Debug("Add mutation") l.mergeMutation(mpost) - l.maxMutationTs = t.Timestamp.UnixNano() if len(l.mindex)+len(l.mlayer) > 0 { atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano()) if dirtymap != nil { dirtymap.Put(l.ghash, true) } } - if l.clog == nil { - return nil - } - return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf) + l.maxMutationTs = ts + return nil } func (l *List) MergeIfDirty() (merged bool, err error) { -- GitLab