diff --git a/posting/list.go b/posting/list.go index ee4d51839472b56cb972608e2eeed4c56185cc86..0e9b757809a6fdfa51353735ecef3d8a45940d8e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -39,6 +39,8 @@ type List struct { mutations []byte pstore *store.Store // postinglist store mstore *store.Store // mutation store + dirty bool + postings *types.PostingList } func addTripleToPosting(b *flatbuffers.Builder, @@ -88,28 +90,38 @@ func (l *List) Init(key []byte, pstore, mstore *store.Store) { var err error if l.buffer, err = pstore.Get(key); err != nil { log.Errorf("While retrieving posting list from db: %v\n", err) + // Error. Just set to empty. l.buffer = make([]byte, len(empty)) copy(l.buffer, empty) } if l.mutations, err = mstore.Get(key); err != nil { log.Debugf("While retrieving mutation list from db: %v\n", err) + // Error. Just set to empty. l.mutations = make([]byte, len(empty)) copy(l.mutations, empty) } + + l.postings = types.GetRootAsPostingList(l.buffer, 0) } -func (l *List) Root() *types.PostingList { +func (l *List) Length() int { l.mutex.RLock() defer l.mutex.RUnlock() - return types.GetRootAsPostingList(l.buffer, 0) + return l.postings.PostingsLength() +} + +func (l *List) Get(p *types.Posting, i int) bool { + return l.postings.Postings(p, i) } func (l *List) AddMutation(t x.Triple, op byte) error { l.mutex.Lock() defer l.mutex.Unlock() + l.dirty = true // Mark as dirty. + b := flatbuffers.NewBuilder(0) muts := types.GetRootAsPostingList(l.mutations, 0) var offsets []flatbuffers.UOffsetT @@ -202,7 +214,17 @@ func (l *List) generateLinkedList() *linked.List { return ll } -func (l *List) Commit() error { +func (l *List) isDirty() bool { + l.mutex.RLock() + defer l.mutex.RUnlock() + return l.dirty +} + +func (l *List) CommitIfDirty() error { + if !l.isDirty() { + return nil + } + l.mutex.Lock() defer l.mutex.Unlock() @@ -239,5 +261,7 @@ func (l *List) Commit() error { } l.mutations = make([]byte, len(empty)) copy(l.mutations, empty) + + l.postings = types.GetRootAsPostingList(l.buffer, 0) return nil } diff --git a/posting/list_test.go b/posting/list_test.go index 66ce0e81622fdfd163733d086546800c657dd15e..1927cc2251166e93e521f2716c359f3b8b0bdc1a 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -28,13 +28,13 @@ import ( ) func checkUids(t *testing.T, l List, uids ...uint64) { - if l.Root().PostingsLength() != len(uids) { - t.Errorf("Length: %d", l.Root().PostingsLength()) + if l.Length() != len(uids) { + t.Errorf("Length: %d", l.Length()) t.Fail() } for i := 0; i < len(uids); i++ { var p types.Posting - if ok := l.Root().Postings(&p, i); !ok { + if ok := l.Get(&p, i); !ok { t.Error("Unable to retrieve posting at 2nd iter") } if p.Uid() != uids[i] { @@ -76,15 +76,15 @@ func TestAddTriple(t *testing.T) { if err := l.AddMutation(triple, Set); err != nil { t.Error(err) } - if err := l.Commit(); err != nil { + if err := l.CommitIfDirty(); err != nil { t.Error(err) } - if l.Root().PostingsLength() != 1 { + if l.Length() != 1 { t.Error("Unable to find added elements in posting list") } var p types.Posting - if ok := l.Root().Postings(&p, 0); !ok { + if ok := l.Get(&p, 0); !ok { t.Error("Unable to retrieve posting at 1st iter") t.Fail() } @@ -98,16 +98,16 @@ func TestAddTriple(t *testing.T) { // Add another triple now. triple.ValueId = 81 l.AddMutation(triple, Set) - l.Commit() - if l.Root().PostingsLength() != 2 { - t.Errorf("Length: %d", l.Root().PostingsLength()) + l.CommitIfDirty() + if l.Length() != 2 { + t.Errorf("Length: %d", l.Length()) t.Fail() } var uid uint64 uid = 1 - for i := 0; i < l.Root().PostingsLength(); i++ { - if ok := l.Root().Postings(&p, i); !ok { + for i := 0; i < l.Length(); i++ { + if ok := l.Get(&p, i); !ok { t.Error("Unable to retrieve posting at 2nd iter") } uid *= 9 @@ -124,7 +124,7 @@ func TestAddTriple(t *testing.T) { if err := l.AddMutation(triple, Set); err != nil { t.Error(err) } - if err := l.Commit(); err != nil { + if err := l.CommitIfDirty(); err != nil { t.Error(err) } checkUids(t, l, uids...) @@ -145,14 +145,14 @@ func TestAddTriple(t *testing.T) { if err := l.AddMutation(triple, Set); err != nil { t.Error(err) } - if err := l.Commit(); err != nil { + if err := l.CommitIfDirty(); err != nil { t.Error(err) } uids = []uint64{9, 69, 81} checkUids(t, l, uids...) - l.Root().Postings(&p, 0) + l.Get(&p, 0) if string(p.Source()) != "anti-testing" { t.Errorf("Expected: anti-testing. Got: %v", p.Source()) } diff --git a/posting/types.fbs b/posting/types.fbs index 193d011bd32d19728a66101b0afdd4bff1bdeb24..55e7edff3aaba575eb7dd2b9a0c1548e9cfdedbd 100644 --- a/posting/types.fbs +++ b/posting/types.fbs @@ -2,6 +2,7 @@ namespace types; table Posting { uid:ulong; + value:[ubyte]; source:string; ts:long; op:ubyte; @@ -9,7 +10,6 @@ table Posting { table PostingList { postings:[Posting]; - value:[byte]; } root_type PostingList; diff --git a/posting/types/Posting.go b/posting/types/Posting.go index e58b3ce87be1314904fd2fee8a89bd0c683bd747..58c2b36948296e935a2bdc8de05eeff752b691e2 100644 --- a/posting/types/Posting.go +++ b/posting/types/Posting.go @@ -22,7 +22,24 @@ func (rcv *Posting) Uid() uint64 { return 0 } -func (rcv *Posting) Source() []byte { +func (rcv *Posting) Value(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j * 1)) + } + return 0 +} + +func (rcv *Posting) ValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Posting) ValueBytes() []byte { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) @@ -30,8 +47,16 @@ func (rcv *Posting) Source() []byte { return nil } -func (rcv *Posting) Ts() int64 { +func (rcv *Posting) Source() []byte { o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Posting) Ts() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.GetInt64(o + rcv._tab.Pos) } @@ -39,16 +64,19 @@ func (rcv *Posting) Ts() int64 { } func (rcv *Posting) Op() byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return rcv._tab.GetByte(o + rcv._tab.Pos) } return 0 } -func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(4) } +func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(5) } func PostingAddUid(builder *flatbuffers.Builder, uid uint64) { builder.PrependUint64Slot(0, uid, 0) } -func PostingAddSource(builder *flatbuffers.Builder, source flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(source), 0) } -func PostingAddTs(builder *flatbuffers.Builder, ts int64) { builder.PrependInt64Slot(2, ts, 0) } -func PostingAddOp(builder *flatbuffers.Builder, op byte) { builder.PrependByteSlot(3, op, 0) } +func PostingAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) } +func PostingStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) +} +func PostingAddSource(builder *flatbuffers.Builder, source flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(source), 0) } +func PostingAddTs(builder *flatbuffers.Builder, ts int64) { builder.PrependInt64Slot(3, ts, 0) } +func PostingAddOp(builder *flatbuffers.Builder, op byte) { builder.PrependByteSlot(4, op, 0) } func PostingEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/posting/types/PostingList.go b/posting/types/PostingList.go index d477edaa81229d8df99cb0f9bc5e2cc6782fd0d1..c1b54045bd73ecaa82d4295b66ac96f558dbee79 100644 --- a/posting/types/PostingList.go +++ b/posting/types/PostingList.go @@ -44,28 +44,8 @@ func (rcv *PostingList) PostingsLength() int { return 0 } -func (rcv *PostingList) Value(j int) int8 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j * 1)) - } - return 0 -} - -func (rcv *PostingList) ValueLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(2) } +func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(1) } func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) } func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } -func PostingListAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) } -func PostingListStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) -} func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/uid/assigner.go b/uid/assigner.go new file mode 100644 index 0000000000000000000000000000000000000000..084b61b63c5926f205d00041030dfad82ef7e918 --- /dev/null +++ b/uid/assigner.go @@ -0,0 +1,134 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uid + +import ( + "bytes" + "errors" + "time" + + "github.com/dgryski/go-farm" + "github.com/manishrjain/dgraph/posting" + "github.com/manishrjain/dgraph/posting/types" + "github.com/manishrjain/dgraph/store" + "github.com/manishrjain/dgraph/x" +) + +var log = x.Log("uid") + +type Assigner struct { + mstore *store.Store + pstore *store.Store +} + +func (a *Assigner) Init(pstore, mstore *store.Store) { + a.mstore = mstore + a.pstore = pstore +} + +func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) { + for sp := ""; ; sp += " " { + txid := xid + sp + uid = farm.Hash64([]byte(txid)) // Generate from hash. + log.Debugf("xid: [%v] uid: [%v]", txid, uid) + + // Check if this uid has already been allocated. + pl := new(posting.List) + key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid + pl.Init(key, a.pstore, a.mstore) + if pl.Length() > 0 { + // Something already present here. + var p types.Posting + pl.Get(&p, 0) + log.Debug("Found existing xid: [%v]. Continuing...", string(p.ValueBytes())) + + } else { + // Uid hasn't been assigned yet. + t := x.Triple{ + Entity: uid, + Attribute: "_xid_", + Value: xid, + Source: "_assigner_", + Timestamp: time.Now(), + } + rerr = pl.AddMutation(t, posting.Set) + pl.CommitIfDirty() + return uid, rerr + } + } + return 0, errors.New("Some unhandled route lead me here." + + " Wake the stupid developer up.") +} + +func Key(xid string) []byte { + buf := new(bytes.Buffer) + buf.WriteString("_uid_") + buf.WriteString("|") + buf.WriteString(xid) + return buf.Bytes() +} + +func (a *Assigner) GetOrAssign(xid string) (uid uint64, rerr error) { + key := Key(xid) + pl := new(posting.List) + pl.Init(key, a.pstore, a.mstore) + if pl.Length() == 0 { + // No current id exists. Create one. + uid, err := a.allocateNew(xid) + if err != nil { + return 0, err + } + t := x.Triple{ + ValueId: uid, + Source: "_assigner_", + Timestamp: time.Now(), + } + rerr = pl.AddMutation(t, posting.Set) + return uid, rerr + + } else if pl.Length() > 1 { + log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + + } else { + // We found one posting. + var p types.Posting + if ok := pl.Get(&p, 0); !ok { + return 0, errors.New("While retrieving entry from posting list") + } + return p.Uid(), nil + } + return 0, errors.New("Some unhandled route lead me here." + + " Wake the stupid developer up.") +} + +func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) { + pl := new(posting.List) + key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid + pl.Init(key, a.pstore, a.mstore) + if pl.Length() == 0 { + return "", errors.New("NO_EXTERNAL_ID") + } + if pl.Length() > 1 { + log.WithField("uid", uid).Fatal("This shouldn't be happening.") + } + var p types.Posting + if ok := pl.Get(&p, 0); !ok { + log.WithField("uid", uid).Error("While retrieving posting") + return "", errors.New("While retrieving posting") + } + return string(p.ValueBytes()), nil +} diff --git a/uid/assigner_test.go b/uid/assigner_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e6936a87ef492b2068762dca3aa9321449b611ec --- /dev/null +++ b/uid/assigner_test.go @@ -0,0 +1,103 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uid + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/manishrjain/dgraph/store" +) + +func NewStore(t *testing.T) string { + path, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + t.Fail() + return "" + } + return path +} + +func TestGetOrAssign(t *testing.T) { + pdir := NewStore(t) + defer os.RemoveAll(pdir) + ps := new(store.Store) + ps.Init(pdir) + + mdir := NewStore(t) + defer os.RemoveAll(mdir) + ms := new(store.Store) + ms.Init(mdir) + + var a Assigner + a.Init(ps, ms) + + var u1, u2 uint64 + { + uid, err := a.GetOrAssign("externalid0") + if err != nil { + t.Error(err) + } + t.Logf("Found uid: [%v]", uid) + u1 = uid + } + + { + uid, err := a.GetOrAssign("externalid1") + if err != nil { + t.Error(err) + } + t.Logf("Found uid: [%v]", uid) + u2 = uid + } + + if u1 == u2 { + t.Error("Uid1 and Uid2 shouldn't be the same") + } + + { + uid, err := a.GetOrAssign("externalid0") + if err != nil { + t.Error(err) + } + t.Logf("Found uid: [%v]", uid) + if u1 != uid { + t.Error("Uid should be the same.") + } + } + + { + xid, err := a.ExternalId(u1) + if err != nil { + t.Error(err) + } + if xid != "externalid0" { + t.Errorf("Expected externalid0. Found: [%v]", xid) + } + } + { + xid, err := a.ExternalId(u2) + if err != nil { + t.Error(err) + } + if xid != "externalid1" { + t.Errorf("Expected externalid1. Found: [%v]", xid) + } + } +}