diff --git a/loader/loader.go b/loader/loader.go index 64fd1b664388cd04cde197310074caa9e0a52cf8..0bf9bd3477e82f8072304813fa41a4403b2833a0 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -21,6 +21,7 @@ import ( "io" "math/rand" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) @@ -45,11 +47,13 @@ type counters struct { } type state struct { + sync.RWMutex input chan string cnq chan rdf.NQuad ctr *counters instanceIdx uint64 numInstances uint64 + err error } func Init(uidstore, datastore *store.Store) { @@ -57,6 +61,20 @@ func Init(uidstore, datastore *store.Store) { dataStore = datastore } +func (s *state) Error() error { + s.RLock() + defer s.RUnlock() + return s.err +} + +func (s *state) SetError(err error) { + s.Lock() + defer s.Unlock() + if s.err == nil { + s.err = err + } +} + func (s *state) printCounters(ticker *time.Ticker) { var prev uint64 for _ = range ticker.C { @@ -79,6 +97,7 @@ func (s *state) printCounters(ticker *time.Ticker) { } } +// Only run this in a single goroutine. This function closes s.input channel. func (s *state) readLines(r io.Reader) { var buf []string scanner := bufio.NewScanner(r) @@ -106,8 +125,14 @@ func (s *state) readLines(r io.Reader) { close(s.input) } -func (s *state) parseStream(done chan error) { +func (s *state) parseStream(wg *sync.WaitGroup) { + defer wg.Done() + for line := range s.input { + if s.Error() != nil { + return + } + line = strings.Trim(line, " \t") if len(line) == 0 { glog.Info("Empty line.") @@ -117,30 +142,34 @@ func (s *state) parseStream(done chan error) { glog.Debugf("Got line: %q", line) nq, err := rdf.Parse(line) if err != nil { - glog.WithError(err).Errorf("While parsing: %q", line) - done <- err + s.SetError(err) return } s.cnq <- nq atomic.AddUint64(&s.ctr.parsed, 1) } - done <- nil } func (s *state) handleNQuads(wg *sync.WaitGroup) { + defer wg.Done() + for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) + if s.Error() != nil { + return + } + edge, err := nq.ToEdge() for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { time.Sleep(time.Microsecond) } else { + s.SetError(err) glog.WithError(err).WithField("nq", nq). Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err = nq.ToEdge() } // Only handle this edge if the attribute satisfies the modulo rule @@ -153,25 +182,28 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } else { atomic.AddUint64(&s.ctr.ignored, 1) } - } - wg.Done() } -func (s *state) getUidForString(str string) error { - _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) +func (s *state) assignUid(xid string) error { + if strings.HasPrefix(xid, "_uid_:") { + _, err := strconv.ParseUint(xid[6:], 0, 64) + return err + } + + _, err := uid.GetOrAssign(xid, s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { time.Sleep(time.Microsecond) - glog.WithError(err).WithField("nq.Subject", str). + glog.WithError(err).WithField("xid", xid). Debug("Temporary error") } else { - glog.WithError(err).WithField("nq.Subject", str). + glog.WithError(err).WithField("xid", xid). Error("While getting UID") return err } - _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) + _, err = uid.GetOrAssign(xid, s.instanceIdx, s.numInstances) } return nil } @@ -180,18 +212,25 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { defer wg.Done() for nq := range s.cnq { + if s.Error() != nil { + return + } ignored := true if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx { - if err := s.getUidForString(nq.Subject); err != nil { - glog.WithError(err).Fatal("While assigning Uid to subject.") + if err := s.assignUid(nq.Subject); err != nil { + s.SetError(err) + glog.WithError(err).Error("While assigning Uid to subject.") + return } ignored = false } if len(nq.ObjectId) > 0 && farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx { - if err := s.getUidForString(nq.ObjectId); err != nil { - glog.WithError(err).Fatal("While assigning Uid to object.") + if err := s.assignUid(nq.ObjectId); err != nil { + s.SetError(err) + glog.WithError(err).Error("While assigning Uid to object.") + return } ignored = false } @@ -205,7 +244,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64, +func LoadEdges(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) @@ -221,31 +260,28 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, s.cnq = make(chan rdf.NQuad, 10000) numr := runtime.GOMAXPROCS(-1) - done := make(chan error, numr) + var pwg sync.WaitGroup + pwg.Add(numr) for i := 0; i < numr; i++ { - go s.parseStream(done) // Input --> NQuads + go s.parseStream(&pwg) // Input --> NQuads } - wg := new(sync.WaitGroup) - for i := 0; i < 3000; i++ { - wg.Add(1) - go s.handleNQuads(wg) // NQuads --> Posting list [slow]. + nrt := 3000 + var wg sync.WaitGroup + wg.Add(nrt) + for i := 0; i < nrt; i++ { + go s.handleNQuads(&wg) // NQuads --> Posting list [slow]. } // Block until all parseStream goroutines are finished. - for i := 0; i < numr; i++ { - if err := <-done; err != nil { - glog.WithError(err).Fatal("While reading input.") - } - } - + pwg.Wait() close(s.cnq) // Okay, we've stopped input to cnq, and closed it. // Now wait for handleNQuads to finish. wg.Wait() ticker.Stop() - return atomic.LoadUint64(&s.ctr.processed), nil + return atomic.LoadUint64(&s.ctr.processed), s.Error() } // AssignUids would pick up all the external ids in RDFs read, @@ -266,9 +302,10 @@ func AssignUids(reader io.Reader, instanceIdx uint64, s.cnq = make(chan rdf.NQuad, 10000) numr := runtime.GOMAXPROCS(-1) - done := make(chan error, numr) + var pwg sync.WaitGroup + pwg.Add(numr) for i := 0; i < numr; i++ { - go s.parseStream(done) // Input --> NQuads + go s.parseStream(&pwg) // Input --> NQuads } wg := new(sync.WaitGroup) @@ -278,17 +315,12 @@ func AssignUids(reader io.Reader, instanceIdx uint64, } // Block until all parseStream goroutines are finished. - for i := 0; i < numr; i++ { - if err := <-done; err != nil { - glog.WithError(err).Fatal("While reading input.") - } - } - + pwg.Wait() close(s.cnq) // Okay, we've stopped input to cnq, and closed it. // Now wait for handleNQuads to finish. wg.Wait() ticker.Stop() - return atomic.LoadUint64(&s.ctr.processed), nil + return atomic.LoadUint64(&s.ctr.processed), s.Error() } diff --git a/posting/list.go b/posting/list.go index cef683f13c7acc948d81354b98e8c7b5f795fa89..73aa2d2f250fdeea0267d16176e6bbf7266abc82 100644 --- a/posting/list.go +++ b/posting/list.go @@ -105,8 +105,7 @@ func samePosting(a *types.Posting, b *types.Posting) bool { // key = (entity uid, attribute) func Key(uid uint64, attr string) []byte { - buf := new(bytes.Buffer) - buf.WriteString(attr) + buf := bytes.NewBufferString(attr) if err := binary.Write(buf, binary.LittleEndian, uid); err != nil { glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) } diff --git a/posting/lists.go b/posting/lists.go index a29474cf1c7273549e371065696a7e051fe2deb1..fab45d584aaf9a9744dd5283a2e95360988cee85 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,6 +18,7 @@ package posting import ( "flag" + "math/rand" "runtime" "runtime/debug" "sync" @@ -34,6 +35,23 @@ import ( var maxmemory = flag.Uint64("stw_ram_mb", 4096, "If RAM usage exceeds this, we stop the world, and flush our buffers.") +type mergeRoutines struct { + sync.RWMutex + count int +} + +func (mr *mergeRoutines) Count() int { + mr.RLock() + defer mr.RUnlock() + return mr.count +} + +func (mr *mergeRoutines) Add(delta int) { + mr.Lock() + mr.count += delta + mr.Unlock() +} + type counters struct { ticker *time.Ticker added uint64 @@ -97,17 +115,35 @@ func aggressivelyEvict(ms runtime.MemStats) { Info("Memory Usage after calling GC.") } -func gentlyMerge(ms runtime.MemStats) { +func gentlyMerge(mr *mergeRoutines) { + defer mr.Add(-1) ctr := NewCounters() defer ctr.ticker.Stop() - // Pick 1% of the dirty map or 400 keys, whichever is higher. - pick := int(float64(dirtymap.Size()) * 0.01) + // Pick 5% of the dirty map or 400 keys, whichever is higher. + pick := int(float64(dirtymap.Size()) * 0.05) if pick < 400 { pick = 400 } + // We should start picking up elements from a randomly selected index, + // otherwise, the same keys would keep on getting merged, while the + // rest would never get a chance. + var start int + n := dirtymap.Size() - pick + if n <= 0 { + start = 0 + } else { + start = rand.Intn(n) + } + var hs []gotomic.Hashable + idx := 0 dirtymap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + if idx < start { + idx += 1 + return false + } + hs = append(hs, k) return len(hs) >= pick }) @@ -136,6 +172,7 @@ func checkMemoryUsage() { MIB = 1 << 20 MAX_MEMORY = *maxmemory * MIB + var mr mergeRoutines for _ = range time.Tick(5 * time.Second) { var ms runtime.MemStats runtime.ReadMemStats(&ms) @@ -143,7 +180,17 @@ func checkMemoryUsage() { aggressivelyEvict(ms) } else { - gentlyMerge(ms) + // If merging is slow, we don't want to end up having too many goroutines + // merging the dirty list. This should keep them in check. + // With a value of 18 and duration of 5 seconds, some goroutines are + // taking over 1.5 mins to finish. + if mr.Count() > 18 { + glog.Info("Skipping gentle merging.") + continue + } + mr.Add(1) + // gentlyMerge can take a while to finish. So, run it in a goroutine. + go gentlyMerge(&mr) } } } diff --git a/rdf/parse.go b/rdf/parse.go index 732e851bfb8857e937a9ea226e768a0648756374..3afe48bc1c30d5fd78eadeb0dc4ff3dae0f6d667 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,23 +36,26 @@ type NQuad struct { Language string } -func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { - if strings.HasPrefix(s, "_uid_:") { - return strconv.ParseUint(s[6:], 0, 64) +func getUid(xid string) (uint64, error) { + if strings.HasPrefix(xid, "_uid_:") { + return strconv.ParseUint(xid[6:], 0, 64) } - return uid.GetOrAssign(s, instanceIdx, numInstances) + return uid.Get(xid) } -func (nq NQuad) ToEdge(instanceIdx, - numInstances uint64) (result x.DirectedEdge, rerr error) { +// ToEdge is useful when you want to find the UID corresponding to XID for +// just one edge. ToEdgeUsing(map) is useful when you do this conversion +// in bulk, say over a network call. None of these methods generate a UID +// for an XID. +func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, instanceIdx, numInstances) + sid, err := getUid(nq.Subject) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) + oid, err := getUid(nq.ObjectId) if err != nil { return result, err } diff --git a/server/loader/main.go b/server/loader/main.go index 63302a10550b087a60a8ab4acf9e99db715f7286..24d00334751e1a44286e965d46e9b133170b51e1 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -99,7 +99,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) + count, err := loader.LoadEdges(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/loader/main_test.go b/server/loader/main_test.go index bbbe8affb9ac5a9812b63a5c46ec83cb8139e82e..1e73a3a80da75e3d003d9850d9b8f4699219193b 100644 --- a/server/loader/main_test.go +++ b/server/loader/main_test.go @@ -39,16 +39,58 @@ func TestQuery(t *testing.T) { uid.Init(ps) loader.Init(ps, ps1) - f, err := os.Open("test_input") - r := bufio.NewReader(f) - count, err := loader.HandleRdfReader(r, 1, 2) - t.Logf("count", count) + var count uint64 + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() + } + r := bufio.NewReader(f) + count, err = loader.AssignUids(r, 0, 1) // Assign uids for everything. + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() + } + r := bufio.NewReader(f) + count, err = loader.LoadEdges(r, 1, 2) + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } - posting.MergeLists(100) + if farm.Fingerprint64([]byte("follows"))%2 != 1 { + t.Error("Expected fp to be 1.") + t.Fail() + } + if count != 4 { + t.Error("loader assignment not as expected") + } - if farm.Fingerprint64([]byte("follows"))%2 == 1 { - if count != 4 { - t.Error("loader assignment not as expected") + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() } + r := bufio.NewReader(f) + count, err = loader.LoadEdges(r, 0, 2) + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } + + if farm.Fingerprint64([]byte("enemy"))%2 != 0 { + t.Error("Expected fp to be 0.") + t.Fail() + } + if count != 4 { + t.Error("loader assignment not as expected") } } diff --git a/server/loader/test_input b/server/loader/test_input index 642eec7fc58f2ffcefab11be805a72a0cf0c240b..3ed3dd853f6d4dbc4bdce2984aa21882c4342d93 100644 --- a/server/loader/test_input +++ b/server/loader/test_input @@ -1,7 +1,8 @@ -`_:alice1 <follows> _:bob0 .` -`_:alice2 <follows> _:bob1 .` -`_:alice3 <follows> _:bob2 .` -`_:alice4 <follows> _:bob3 .` -`_:alice1 <friend1> _:bob5 .` -`_:alice2 <friend1> _:bob6 .` -`_:alice3 <friend1> _:bob7 .` +<alice> <follows> <bob0> . +<alice> <enemy> <bob1> . +<alice> <follows> <bob2> . +<alice> <enemy> <bob3> . +<alice> <enemy> <bob4> . +<alice> <follows> <bob5> . +<alice> <enemy> <bob6> . +<alice> <follows> <bob7> . diff --git a/server/main_test.go b/server/main_test.go index 962256cdcbc3c1e314d34d0fd9f7272894b98f64..790647726e8ad80c5c6f69807a6006039a66922b 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -66,15 +66,31 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er uid.Init(ps) loader.Init(ps, ps) - f, err := os.Open("testdata.nq") - if err != nil { - return dir1, dir2, nil, clog, err + { + // Assign Uids first. + f, err := os.Open("testdata.nq") + if err != nil { + return dir1, dir2, nil, clog, err + } + _, err = loader.AssignUids(f, 0, 1) + f.Close() + if err != nil { + return dir1, dir2, nil, clog, err + } } - defer f.Close() - _, err = loader.HandleRdfReader(f, 0, 1) - if err != nil { - return dir1, dir2, nil, clog, err + { + // Then load data. + f, err := os.Open("testdata.nq") + if err != nil { + return dir1, dir2, nil, clog, err + } + _, err = loader.LoadEdges(f, 0, 1) + f.Close() + if err != nil { + return dir1, dir2, nil, clog, err + } } + return dir1, dir2, ps, clog, nil } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 0a00382461b9ad5487637999494c3067f46b0c92..3150f85e4b125473338a9dcc532bfb27774a6b9a 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -24,7 +24,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") -var uidDir = flag.String("uidpostings", "", +var uidDir = flag.String("uids", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var memprofile = flag.String("memprofile", "", "write memory profile to file") diff --git a/server/uidassigner/main_test.go b/server/uidassigner/main_test.go index 85ec4aa69831725a9512dc70e85848edaf96b5e7..948a657d0fb820a049c38f8e7114357e48529677 100644 --- a/server/uidassigner/main_test.go +++ b/server/uidassigner/main_test.go @@ -9,7 +9,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/uid" "github.com/dgryski/go-farm" @@ -41,18 +40,18 @@ func TestQuery(t *testing.T) { list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} for _, str := range list { if farm.Fingerprint64([]byte(str))%numInstances == 0 { - uid, err := rdf.GetUid(str, 0, numInstances) - if uid < minIdx0 || uid > minIdx0+mod-1 { + u, err := uid.GetOrAssign(str, 0, numInstances) + if u < minIdx0 || u > minIdx0+mod-1 { t.Error("Not the correct UID", err) } - t.Logf("Instance-0 Correct UID", str, uid) + t.Logf("Instance-0 Correct UID", str, u) } else { - uid, err := rdf.GetUid(str, 1, numInstances) - if uid < minIdx1 || uid > minIdx1+mod-1 { + u, err := uid.GetOrAssign(str, 1, numInstances) + if u < minIdx1 || u > minIdx1+mod-1 { t.Error("Not the correct UID", err) } - t.Logf("Instance-1 Correct UID", str, uid) + t.Logf("Instance-1 Correct UID", str, u) } } } diff --git a/uid/README.txt b/uid/README.txt new file mode 100644 index 0000000000000000000000000000000000000000..1b9eb3d9d1a80d6c206f61ef74efcc246184928b --- /dev/null +++ b/uid/README.txt @@ -0,0 +1,49 @@ +Dropped 61 nodes (cum <= 6.05MB) +Showing top 10 nodes out of 52 (cum >= 149.99MB) + flat flat% sum% cum cum% + 315.56MB 26.10% 26.10% 315.56MB 26.10% github.com/dgraph-io/dgraph/posting.NewList + 87.51MB 7.24% 33.33% 87.51MB 7.24% github.com/dgraph-io/dgraph/uid.stringKey + 80MB 6.62% 39.95% 105.50MB 8.72% github.com/dgraph-io/dgraph/uid.(*lockManager).newOrExisting + 78.01MB 6.45% 46.40% 78.01MB 6.45% github.com/dgraph-io/dgraph/posting.Key + 77.50MB 6.41% 52.81% 155MB 12.82% github.com/zond/gotomic.(*Hash).getBucketByIndex + 77.50MB 6.41% 59.22% 77.50MB 6.41% github.com/zond/gotomic.newMockEntry + 74.50MB 6.16% 65.38% 74.50MB 6.16% github.com/zond/gotomic.newRealEntryWithHashCode + 51MB 4.22% 69.60% 74.01MB 6.12% github.com/dgraph-io/dgraph/posting.(*List).merge + 48.50MB 4.01% 73.61% 48.50MB 4.01% github.com/dgraph-io/dgraph/loader.(*state).readLines + 43.50MB 3.60% 77.21% 149.99MB 12.40% github.com/zond/gotomic.(*Hash).PutIfMissing +(pprof) list uid.stringKey +Total: 1.18GB +ROUTINE ======================== github.com/dgraph-io/dgraph/uid.stringKey in /home/manishrjain/go/src/github.com/dgraph-io/dgraph/uid/assigner.go + 87.51MB 87.51MB (flat, cum) 7.24% of Total + . . 186: rerr := pl.AddMutation(t, posting.Set) + . . 187: return uid, rerr + . . 188:} + . . 189: + . . 190:func stringKey(xid string) []byte { + 87.51MB 87.51MB 191: var buf bytes.Buffer + . . 192: buf.WriteString("_uid_|") + . . 193: buf.WriteString(xid) + . . 194: return buf.Bytes() + . . 195:} + . . 196: + +After changing the code to return []byte("_uid_" + xid), the memory profiler no longer shows it. + +$ go tool pprof uidassigner mem.prof +Entering interactive mode (type "help" for commands) +(pprof) top10 +907.59MB of 1139.29MB total (79.66%) +Dropped 86 nodes (cum <= 5.70MB) +Showing top 10 nodes out of 48 (cum >= 45.01MB) + flat flat% sum% cum cum% + 310.56MB 27.26% 27.26% 310.56MB 27.26% github.com/dgraph-io/dgraph/posting.NewList + 89MB 7.81% 35.07% 89MB 7.81% github.com/zond/gotomic.newMockEntry + 81.50MB 7.15% 42.23% 170.51MB 14.97% github.com/zond/gotomic.(*Hash).getBucketByIndex + 81.50MB 7.15% 49.38% 109MB 9.57% github.com/dgraph-io/dgraph/uid.(*lockManager).newOrExisting + 76.51MB 6.72% 56.09% 76.51MB 6.72% github.com/dgraph-io/dgraph/posting.Key + 72.50MB 6.36% 62.46% 72.50MB 6.36% github.com/zond/gotomic.newRealEntryWithHashCode + 55.50MB 4.87% 67.33% 63.50MB 5.57% github.com/dgraph-io/dgraph/posting.(*List).merge + 50MB 4.39% 71.72% 50MB 4.39% github.com/dgraph-io/dgraph/loader.(*state).readLines + 45.50MB 3.99% 75.71% 150.52MB 13.21% github.com/zond/gotomic.(*Hash).PutIfMissing + 45.01MB 3.95% 79.66% 45.01MB 3.95% github.com/google/flatbuffers/go.(*Builder).growByteBuffer + diff --git a/uid/assigner.go b/uid/assigner.go index d7384fc3f63b191c9be1054c642f6e4f4a220037..ff8239341ec9e263999df51fc37e8bbb4b39e351 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -17,8 +17,8 @@ package uid import ( - "bytes" "errors" + "fmt" "math" "sync" "time" @@ -188,10 +188,23 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, } func stringKey(xid string) []byte { - var buf bytes.Buffer - buf.WriteString("_uid_|") - buf.WriteString(xid) - return buf.Bytes() + return []byte("_uid_|" + xid) +} + +func Get(xid string) (uid uint64, rerr error) { + key := stringKey(xid) + pl := posting.GetOrCreate(key, uidStore) + if pl.Length() == 0 { + return 0, fmt.Errorf("xid: %v doesn't have any uid assigned.", xid) + } + if pl.Length() > 1 { + glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + } + var p types.Posting + if ok := pl.Get(&p, 0); !ok { + return 0, fmt.Errorf("While retrieving entry from posting list") + } + return p.Uid(), nil } func GetOrAssign(xid string, instanceIdx uint64,