From d47d5c6930a345e825f50eb1629e82c2ae6d3a9c Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Fri, 16 Oct 2015 18:39:48 +1100 Subject: [PATCH] Posting List can store posting lists and mutations in/from different stores. --- posting/list.go | 56 ++++++++++++++++++++++++++++++--------- posting/list_test.go | 63 +++++++++++++++++++++++++++++++++++++------- store/store.go | 22 +++++++--------- store/store_test.go | 9 ++++--- 4 files changed, 112 insertions(+), 38 deletions(-) diff --git a/posting/list.go b/posting/list.go index 3fe43415..ee4d5183 100644 --- a/posting/list.go +++ b/posting/list.go @@ -21,6 +21,7 @@ import ( "github.com/google/flatbuffers/go" "github.com/manishrjain/dgraph/posting/types" + "github.com/manishrjain/dgraph/store" "github.com/manishrjain/dgraph/x" linked "container/list" @@ -32,9 +33,12 @@ const Set = 0x01 const Del = 0x02 type List struct { + key []byte mutex sync.RWMutex buffer []byte mutations []byte + pstore *store.Store // postinglist store + mstore *store.Store // mutation store } func addTripleToPosting(b *flatbuffers.Builder, @@ -61,21 +65,38 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT { var empty []byte -func (l *List) Init() { +// package level init +func init() { + b := flatbuffers.NewBuilder(0) + types.PostingListStart(b) + of := types.PostingListEnd(b) + b.Finish(of) + empty = b.Bytes[b.Head():] +} + +func (l *List) Init(key []byte, pstore, mstore *store.Store) { l.mutex.Lock() defer l.mutex.Unlock() if len(empty) == 0 { - b := flatbuffers.NewBuilder(0) - types.PostingListStart(b) - of := types.PostingListEnd(b) - b.Finish(of) - empty = b.Bytes[b.Head():] + log.Fatal("empty should have some bytes.") + } + l.key = key + l.pstore = pstore + l.mstore = mstore + + var err error + if l.buffer, err = pstore.Get(key); err != nil { + log.Errorf("While retrieving posting list from db: %v\n", err) + l.buffer = make([]byte, len(empty)) + copy(l.buffer, empty) + } + + if l.mutations, err = mstore.Get(key); err != nil { + log.Debugf("While retrieving mutation list from db: %v\n", err) + l.mutations = make([]byte, len(empty)) + copy(l.mutations, empty) } - l.buffer = make([]byte, len(empty)) - l.mutations = make([]byte, len(empty)) - copy(l.buffer, empty) - copy(l.mutations, empty) } func (l *List) Root() *types.PostingList { @@ -85,7 +106,7 @@ func (l *List) Root() *types.PostingList { return types.GetRootAsPostingList(l.buffer, 0) } -func (l *List) AddMutation(t x.Triple, op byte) { +func (l *List) AddMutation(t x.Triple, op byte) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -114,6 +135,7 @@ func (l *List) AddMutation(t x.Triple, op byte) { b.Finish(end) l.mutations = b.Bytes[b.Head():] + return l.mstore.SetOne(l.key, l.mutations) } func addOrSet(ll *linked.List, p *types.Posting) { @@ -180,7 +202,7 @@ func (l *List) generateLinkedList() *linked.List { return ll } -func (l *List) Commit() { +func (l *List) Commit() error { l.mutex.Lock() defer l.mutex.Unlock() @@ -206,6 +228,16 @@ func (l *List) Commit() { b.Finish(end) l.buffer = b.Bytes[b.Head():] + if err := l.pstore.SetOne(l.key, l.buffer); err != nil { + log.WithField("error", err).Errorf("While storing posting list") + return err + } + + if err := l.mstore.Delete(l.key); err != nil { + log.WithField("error", err).Errorf("While deleting mutation list") + return err + } l.mutations = make([]byte, len(empty)) copy(l.mutations, empty) + return nil } diff --git a/posting/list_test.go b/posting/list_test.go index c21b314d..66ce0e81 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -17,10 +17,13 @@ package posting import ( + "io/ioutil" + "os" "testing" "time" "github.com/manishrjain/dgraph/posting/types" + "github.com/manishrjain/dgraph/store" "github.com/manishrjain/dgraph/x" ) @@ -40,17 +43,42 @@ func checkUids(t *testing.T, l List, uids ...uint64) { } } +func NewStore(t *testing.T) string { + path, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + t.Fail() + return "" + } + return path +} + func TestAddTriple(t *testing.T) { var l List - l.Init() + key := store.Key("name", 1) + pdir := NewStore(t) + defer os.RemoveAll(pdir) + ps := new(store.Store) + ps.Init(pdir) + + mdir := NewStore(t) + defer os.RemoveAll(mdir) + ms := new(store.Store) + ms.Init(mdir) + + l.Init(key, ps, ms) triple := x.Triple{ ValueId: 9, Source: "testing", Timestamp: time.Now(), } - l.AddMutation(triple, Set) - l.Commit() + if err := l.AddMutation(triple, Set); err != nil { + t.Error(err) + } + if err := l.Commit(); err != nil { + t.Error(err) + } if l.Root().PostingsLength() != 1 { t.Error("Unable to find added elements in posting list") @@ -93,21 +121,33 @@ func TestAddTriple(t *testing.T) { 9, 49, 81, } triple.ValueId = 49 - l.AddMutation(triple, Set) - l.Commit() + if err := l.AddMutation(triple, Set); err != nil { + t.Error(err) + } + if err := l.Commit(); err != nil { + t.Error(err) + } checkUids(t, l, uids...) // Delete a triple, add a triple, replace a triple triple.ValueId = 49 - l.AddMutation(triple, Del) + if err := l.AddMutation(triple, Del); err != nil { + t.Error(err) + } triple.ValueId = 69 - l.AddMutation(triple, Set) + if err := l.AddMutation(triple, Set); err != nil { + t.Error(err) + } triple.ValueId = 9 triple.Source = "anti-testing" - l.AddMutation(triple, Set) - l.Commit() + if err := l.AddMutation(triple, Set); err != nil { + t.Error(err) + } + if err := l.Commit(); err != nil { + t.Error(err) + } uids = []uint64{9, 69, 81} checkUids(t, l, uids...) @@ -116,4 +156,9 @@ func TestAddTriple(t *testing.T) { if string(p.Source()) != "anti-testing" { t.Errorf("Expected: anti-testing. Got: %v", p.Source()) } + + // Try reading the same data in another PostingList. + var dl List + dl.Init(key, ps, ms) + checkUids(t, dl, uids...) } diff --git a/store/store.go b/store/store.go index 7a7a2454..51090b31 100644 --- a/store/store.go +++ b/store/store.go @@ -47,33 +47,29 @@ func (s *Store) IsNew(id uint64) bool { } // key = (attribute, entity id) -func key(attr string, eid uint64) (ret []byte, rerr error) { +func Key(attr string, eid uint64) []byte { buf := new(bytes.Buffer) buf.WriteString(attr) if err := binary.Write(buf, binary.LittleEndian, eid); err != nil { - return ret, err + log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid) } - return buf.Bytes(), nil + return buf.Bytes() } -func (s *Store) Get(attr string, eid uint64) (val []byte, rerr error) { - k, err := key(attr, eid) - if err != nil { - return val, err - } +func (s *Store) Get(k []byte) (val []byte, rerr error) { return s.db.Get(k, nil) } -func (s *Store) SetOne(attr string, eid uint64, val []byte) error { - k, err := key(attr, eid) - if err != nil { - return err - } +func (s *Store) SetOne(k []byte, val []byte) error { wb := new(leveldb.Batch) wb.Put(k, val) return s.db.Write(wb, nil) } +func (s *Store) Delete(k []byte) error { + return s.db.Delete(k, nil) +} + func (s *Store) Close() error { return s.db.Close() } diff --git a/store/store_test.go b/store/store_test.go index 76f02481..cffc93ca 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -33,24 +33,25 @@ func TestGet(t *testing.T) { var s Store s.Init(path) - if err := s.SetOne("name", 1, []byte("neo")); err != nil { + k := Key("name", 1) + if err := s.SetOne(k, []byte("neo")); err != nil { t.Error(err) t.Fail() } - if val, err := s.Get("name", 1); err != nil { + if val, err := s.Get(k); err != nil { t.Error(err) t.Fail() } else if string(val) != "neo" { t.Errorf("Expected 'neo'. Found: %s", string(val)) } - if err := s.SetOne("name", 1, []byte("the one")); err != nil { + if err := s.SetOne(k, []byte("the one")); err != nil { t.Error(err) t.Fail() } - if val, err := s.Get("name", 1); err != nil { + if val, err := s.Get(k); err != nil { t.Error(err) t.Fail() } else if string(val) != "the one" { -- GitLab