Skip to content
Snippets Groups Projects
Commit 879a3329 authored by Manish R Jain's avatar Manish R Jain
Browse files

Loads of fixes and tweaks to make data loading work. Largely, the system is...

Loads of fixes and tweaks to make data loading work. Largely, the system is really fast in inserting data, but memory becomes a problem. So, running continuous memory checks, and if the db exceeds set memory, then stopping in world in batch load mode to deal with it first. In server mode, same code runs but without stopping the world.
parent 2b6a2e66
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrent
import (
"log"
"math/rand"
"sync/atomic"
"unsafe"
)
type kv struct {
k uint64
v unsafe.Pointer
}
type bucket struct {
elems [8]kv
}
const (
MUTABLE = iota
IMMUTABLE
)
type container struct {
status int32
sz uint64
list []*bucket
numElems uint32
}
type Map struct {
cs [2]unsafe.Pointer
size uint32
}
func powOf2(sz int) bool {
return sz > 0 && (sz&(sz-1)) == 0
}
func initContainer(cs *container, sz uint64) {
cs.status = MUTABLE
cs.sz = sz
cs.list = make([]*bucket, sz)
for i := range cs.list {
cs.list[i] = new(bucket)
}
}
func NewMap(sz int) *Map {
if !powOf2(sz) {
log.Fatal("Map can only be created for a power of 2.")
}
c := new(container)
initContainer(c, uint64(sz))
m := new(Map)
m.cs[MUTABLE] = unsafe.Pointer(c)
m.cs[IMMUTABLE] = nil
return m
}
func (c *container) get(k uint64) unsafe.Pointer {
bi := k & (c.sz - 1)
b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
if ek := atomic.LoadUint64(&e.k); ek == k {
return e.v
}
}
return nil
}
func (c *container) getOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer {
bi := k & (c.sz - 1)
b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
// Once allocated a valid key, it would never change. So, first check if
// it's allocated. If not, then allocate it. If can't, or not allocated,
// then check if it's k. If it is, then replace value. Otherwise continue.
// This sequence could be problematic, if this happens:
// Main thread runs Step 1. Check
if atomic.CompareAndSwapUint64(&e.k, 0, k) { // Step 1.
atomic.AddUint32(&c.numElems, 1)
if atomic.CompareAndSwapPointer(&e.v, nil, v) {
return v
}
return atomic.LoadPointer(&e.v)
}
if atomic.LoadUint64(&e.k) == k {
// Swap if previous pointer is nil.
if atomic.CompareAndSwapPointer(&e.v, nil, v) {
return v
}
return atomic.LoadPointer(&e.v)
}
}
return nil
}
func (m *Map) GetOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer {
if v == nil {
log.Fatal("GetOrInsert doesn't allow setting nil pointers.")
return nil
}
// Check immutable first.
cval := atomic.LoadPointer(&m.cs[IMMUTABLE])
if cval != nil {
c := (*container)(cval)
if pv := c.get(k); pv != nil {
return pv
}
}
// Okay, deal with mutable container now.
cval = atomic.LoadPointer(&m.cs[MUTABLE])
if cval == nil {
log.Fatal("This is disruptive in a bad way.")
}
c := (*container)(cval)
if pv := c.getOrInsert(k, v); pv != nil {
return pv
}
// We still couldn't insert the key. Time to grow.
// TODO: Handle this case.
return nil
}
func (m *Map) SetNilIfPresent(k uint64) bool {
for _, c := range m.cs {
if atomic.LoadInt32(&c.status) == 0 {
continue
}
bi := k & (c.sz - 1)
b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
if atomic.LoadUint64(&e.k) == k {
// Set to nil.
atomic.StorePointer(&e.v, nil)
return true
}
}
}
return false
}
func (m *Map) StreamUntilCap(ch chan uint64) {
for {
ci := rand.Intn(2)
c := m.cs[ci]
if atomic.LoadInt32(&c.status) == 0 {
ci += 1
c = m.cs[ci%2] // use the other.
}
bi := rand.Intn(int(c.sz))
for _, e := range c.list[bi].elems {
if len(ch) >= cap(ch) {
return
}
if k := atomic.LoadUint64(&e.k); k > 0 {
ch <- k
}
}
}
}
func (m *Map) StreamAll(ch chan uint64) {
for _, c := range m.cs {
if atomic.LoadInt32(&c.status) == 0 {
continue
}
for i := 0; i < int(c.sz); i++ {
for _, e := range c.list[i].elems {
if k := atomic.LoadUint64(&e.k); k > 0 {
ch <- k
}
}
}
}
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrent
import (
"math/rand"
"testing"
"unsafe"
"github.com/dgraph-io/dgraph/posting"
"github.com/zond/gotomic"
)
func TestGetAndPut(t *testing.T) {
m := NewMap(1024)
var i uint64
for i = 1; i < 100; i++ {
v := new(uint64)
*v = i
b := unsafe.Pointer(v)
if ok := m.Put(i, b); !ok {
t.Errorf("Couldn't put key: %v", i)
}
}
for i = 1; i < 100; i++ {
p := m.Get(i)
v := (*uint64)(p)
if v == nil {
t.Errorf("Didn't expect nil for i: %v", i)
return
}
if *v != i {
t.Errorf("Expected: %v. Got: %v", i, *v)
}
}
}
func BenchmarkGetAndPut(b *testing.B) {
m := NewMap(1 << 16)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
key := uint64(rand.Int63())
p := m.Get(key)
if p == nil {
l := posting.NewList()
m.Put(key, unsafe.Pointer(l))
}
}
})
}
func BenchmarkGotomic(b *testing.B) {
h := gotomic.NewHash()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
key := uint64(rand.Int63())
_, has := h.Get(gotomic.IntKey(key))
if !has {
l := posting.NewList()
h.Put(gotomic.IntKey(key), l)
}
}
})
}
......@@ -50,10 +50,15 @@ type state struct {
}
func (s *state) printCounters(ticker *time.Ticker) {
var prev uint64
for _ = range ticker.C {
processed := atomic.LoadUint64(&s.ctr.processed)
if prev == processed {
continue
}
prev = processed
parsed := atomic.LoadUint64(&s.ctr.parsed)
ignored := atomic.LoadUint64(&s.ctr.ignored)
processed := atomic.LoadUint64(&s.ctr.processed)
pending := parsed - ignored - processed
glog.WithFields(logrus.Fields{
"read": atomic.LoadUint64(&s.ctr.read),
......@@ -123,13 +128,21 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
}
edge, err := nq.ToEdge()
if err != nil {
glog.WithError(err).WithField("nq", nq).Error("While converting to edge")
return
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
time.Sleep(time.Microsecond)
} else {
glog.WithError(err).WithField("nq", nq).
Error("While converting to edge")
return
}
edge, err = nq.ToEdge()
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.Get(key)
plist := posting.GetOrCreate(key)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
}
......
......@@ -24,7 +24,9 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
......@@ -40,6 +42,12 @@ var glog = x.Log("posting")
const Set = 0x01
const Del = 0x02
var E_TMP_ERROR = errors.New("Temporary Error. Please retry.")
type buffer struct {
d []byte
}
type MutationLink struct {
idx int
moveidx int
......@@ -50,11 +58,12 @@ type List struct {
sync.RWMutex
key []byte
hash uint32
buffer []byte
pbuffer unsafe.Pointer
pstore *store.Store // postinglist store
clog *commit.Logger
lastCompact time.Time
wg sync.WaitGroup
deleteMe bool
// Mutations
mlayer map[int]types.Posting // stores only replace instructions.
......@@ -212,15 +221,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
l.pstore = pstore
l.clog = clog
var err error
if l.buffer, err = pstore.Get(key); err != nil {
// glog.Debugf("While retrieving posting list from db: %v\n", err)
// Error. Just set to empty.
l.buffer = make([]byte, len(empty))
copy(l.buffer, empty)
}
posting := types.GetRootAsPostingList(l.buffer, 0)
posting := l.getPostingList()
l.maxMutationTs = posting.CommitTs()
l.hash = farm.Fingerprint32(key)
l.mlayer = make(map[int]types.Posting)
......@@ -248,12 +249,36 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
glog.WithError(err).Error("While streaming entries.")
}
glog.Debug("Done streaming entries.")
// l.regenerateIndex()
}
// There's no need for lock acquisition for this.
func (l *List) getPostingList() *types.PostingList {
pb := atomic.LoadPointer(&l.pbuffer)
buf := (*buffer)(pb)
if buf == nil || len(buf.d) == 0 {
nbuf := new(buffer)
var err error
if nbuf.d, err = l.pstore.Get(l.key); err != nil {
// glog.Debugf("While retrieving posting list from db: %v\n", err)
// Error. Just set to empty.
nbuf.d = make([]byte, len(empty))
copy(nbuf.d, empty)
}
if atomic.CompareAndSwapPointer(&l.pbuffer, pb, unsafe.Pointer(nbuf)) {
return types.GetRootAsPostingList(nbuf.d, 0)
} else {
// Someone else replaced the pointer in the meantime. Retry recursively.
return l.getPostingList()
}
}
return types.GetRootAsPostingList(buf.d, 0)
}
// Caller must hold at least a read lock.
func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
posting := types.GetRootAsPostingList(l.buffer, 0)
posting := l.getPostingList()
left, right := 0, posting.PostingsLength()-1
sofar := -1
p := new(types.Posting)
......@@ -447,7 +472,7 @@ func (l *List) mergeMutation(mp *types.Posting) {
// Caller must hold at least a read lock.
func (l *List) length() int {
plist := types.GetRootAsPostingList(l.buffer, 0)
plist := l.getPostingList()
return plist.PostingsLength() + l.mdelta
}
......@@ -469,7 +494,7 @@ func (l *List) Get(p *types.Posting, i int) bool {
// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
plist := types.GetRootAsPostingList(l.buffer, 0)
plist := l.getPostingList()
if len(l.mindex) == 0 {
if val, ok := l.mlayer[i]; ok {
*p = val
......@@ -529,6 +554,13 @@ func (l *List) get(p *types.Posting, i int) bool {
return plist.Postings(p, newidx)
}
func (l *List) SetForDeletion() {
l.wg.Wait()
l.Lock()
defer l.Unlock()
l.deleteMe = true
}
// In benchmarks, the time taken per AddMutation before was
// plateauing at 2.5 ms with sync per 10 log entries, and increasing
// for sync per 100 log entries (to 3 ms per AddMutation), largely because
......@@ -548,6 +580,9 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
l.wg.Wait()
l.Lock()
defer l.Unlock()
if l.deleteMe {
return E_TMP_ERROR
}
if t.Timestamp.UnixNano() < l.maxMutationTs {
return fmt.Errorf("Mutation ts lower than committed ts.")
......@@ -629,13 +664,13 @@ func (l *List) merge() error {
end := types.PostingListEnd(b)
b.Finish(end)
l.buffer = b.Bytes[b.Head():]
if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil {
glog.WithField("error", err).Errorf("While storing posting list")
return err
}
// Now reset the mutation variables.
atomic.StorePointer(&l.pbuffer, nil) // Make prev buffer eligible for GC.
l.lastCompact = time.Now()
l.mlayer = make(map[int]types.Posting)
l.mdelta = 0
......
......@@ -33,7 +33,7 @@ import (
func checkUids(t *testing.T, l *List, uids ...uint64) error {
if l.Length() != len(uids) {
return fmt.Errorf("Length: %d", l.Length())
return fmt.Errorf("Expected: %d. Length: %d", len(uids), l.Length())
}
for i := 0; i < len(uids); i++ {
var p types.Posting
......
......@@ -17,25 +17,28 @@
package posting
import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/concurrent"
"github.com/dgraph-io/dgraph/store"
"github.com/dgryski/go-farm"
"github.com/zond/gotomic"
)
type counters struct {
ticker *time.Ticker
added uint64
merged uint64
}
func (c *counters) periodicLog() {
for _ = range time.Tick(time.Second) {
for _ = range c.ticker.C {
mapSize := lhmap.Size()
added := atomic.LoadUint64(&c.added)
merged := atomic.LoadUint64(&c.merged)
pending := added - merged
......@@ -44,23 +47,59 @@ func (c *counters) periodicLog() {
"added": added,
"merged": merged,
"pending": pending,
}).Info("Merge counters")
"mapsize": mapSize,
}).Info("List Merge counters")
}
}
var lcmap *concurrent.Map
var MAX_MEMORY uint64
var MIB uint64
func checkMemoryUsage() {
MIB = 1 << 20
MAX_MEMORY = 2 * (1 << 30)
for _ = range time.Tick(5 * time.Second) {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
if ms.Alloc < MAX_MEMORY {
continue
}
// Okay, we exceed the max memory threshold.
// Stop the world, and deal with this first.
stopTheWorld.Lock()
megs := ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory usage over threshold. STOPPED THE WORLD!")
glog.Info("Calling merge on all lists.")
MergeLists(100 * runtime.GOMAXPROCS(-1))
glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup.
runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB
glog.WithField("allocated_MB", megs).
Info("Memory Usage after calling GC.")
stopTheWorld.Unlock()
}
}
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
var pstore *store.Store
var clog *commit.Logger
var ch chan uint64
var lc *lcounters
func Init(posting *store.Store, log *commit.Logger) {
lcmap = concurrent.NewMap(1 << 20)
lhmap = gotomic.NewHash()
pstore = posting
clog = log
ch = make(chan uint64, 10000)
lc = new(lcounters)
go lc.periodicLog()
go checkMemoryUsage()
}
type lcounters struct {
......@@ -77,73 +116,124 @@ func (lc *lcounters) periodicLog() {
}
}
func Get(key []byte) *List {
func GetOrCreate(key []byte) *List {
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
uid := farm.Fingerprint64(key)
lp := lcmap.Get(uid)
if lp == nil {
l := NewList()
l.init(key, pstore, clog)
lcmap.Put(uid, unsafe.Pointer(l))
ukey := gotomic.IntKey(uid)
lp, _ := lhmap.Get(ukey)
if lp != nil {
return lp.(*List)
}
l := NewList()
l.init(key, pstore, clog)
if inserted := lhmap.PutIfMissing(ukey, l); inserted {
return l
} else {
lp, _ = lhmap.Get(ukey)
return lp.(*List)
}
return (*List)(lp)
}
/*
func periodicQueueForProcessing(c *counters) {
ticker := time.NewTicker(time.Minute)
for _ = range ticker.C {
lmap.StreamUntilCap(ch)
func processOne(k gotomic.Hashable, c *counters) {
ret, _ := lhmap.Delete(k)
l := ret.(*List)
if l == nil {
return
}
l.SetForDeletion() // No more AddMutation.
if err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
}
atomic.AddUint64(&c.merged, 1)
}
*/
func process(c *counters, wg *sync.WaitGroup) {
for eid := range ch {
lp := lcmap.Get(eid)
if lp == nil {
continue
}
atomic.AddUint64(&c.merged, 1)
l := (*List)(lp)
if err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
}
// For on-demand merging of all lists.
func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) {
for l := range ch {
processOne(l, c)
}
if wg != nil {
wg.Done()
}
}
func periodicProcess(c *counters) {
ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
process(c, nil)
}
}
func queueAll(c *counters) {
lcmap.StreamAll(ch)
func queueAll(ch chan gotomic.Hashable, c *counters) {
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
ch <- k
atomic.AddUint64(&c.added, 1)
return false // If this returns true, Each would break.
})
close(ch)
}
/*
func StartPeriodicMerging() {
ctr := new(counters)
go periodicQueueForProcessing(ctr)
go periodicProcess(ctr)
}
*/
func MergeLists(numRoutines int) {
ch := make(chan gotomic.Hashable, 10000)
c := new(counters)
c.ticker = time.NewTicker(time.Second)
go c.periodicLog()
go queueAll(c)
go queueAll(ch, c)
wg := new(sync.WaitGroup)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go process(c, wg)
go process(ch, c, wg)
}
wg.Wait()
c.ticker.Stop()
}
// For periodic merging of lists.
func queueRandomLists(ch chan gotomic.Hashable, c *counters) {
var buf []gotomic.Hashable
var count int
needed := cap(ch) - len(ch)
if needed < 100 {
return
}
// Generate a random list of
lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool {
if count < needed {
buf = append(buf, k)
} else {
j := rand.Intn(count)
if j < len(buf) {
buf[j] = k
}
}
count += 1
return false
})
for _, k := range buf {
ch <- k
atomic.AddUint64(&c.added, 1)
}
}
func periodicQueueForProcessing(ch chan gotomic.Hashable, c *counters) {
ticker := time.NewTicker(time.Minute)
for _ = range ticker.C {
queueRandomLists(ch, c)
}
}
func periodicProcess(ch chan gotomic.Hashable, c *counters) {
ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
hid := <-ch
processOne(hid, c)
}
}
func StartPeriodicMerging() {
ctr := new(counters)
ch := make(chan gotomic.Hashable, 10000)
go periodicQueueForProcessing(ch, ctr)
go periodicProcess(ch, ctr)
}
......@@ -19,7 +19,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := Key(uid, attr)
pl := Get(key)
pl := GetOrCreate(key)
task.ValueStart(b)
var valoffset flatbuffers.UOffsetT
......
......@@ -63,22 +63,22 @@ func TestProcessTask(t *testing.T) {
Source: "author0",
Timestamp: time.Now(),
}
addEdge(t, edge, Get(Key(10, "friend")))
addEdge(t, edge, Get(Key(11, "friend")))
addEdge(t, edge, Get(Key(12, "friend")))
addEdge(t, edge, GetOrCreate(Key(10, "friend")))
addEdge(t, edge, GetOrCreate(Key(11, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
edge.ValueId = 25
addEdge(t, edge, Get(Key(12, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
edge.ValueId = 26
addEdge(t, edge, Get(Key(12, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
edge.ValueId = 31
addEdge(t, edge, Get(Key(10, "friend")))
addEdge(t, edge, Get(Key(12, "friend")))
addEdge(t, edge, GetOrCreate(Key(10, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
edge.Value = "photon"
addEdge(t, edge, Get(Key(12, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
query := NewQuery("friend", []uint64{10, 11, 12})
result, err := ProcessTask(query)
......
......@@ -214,42 +214,42 @@ func populateGraph(t *testing.T) {
Source: "testing",
Timestamp: time.Now(),
}
addEdge(t, edge, posting.Get(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
edge.ValueId = 24
addEdge(t, edge, posting.Get(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
edge.ValueId = 25
addEdge(t, edge, posting.Get(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
edge.ValueId = 31
addEdge(t, edge, posting.Get(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
edge.ValueId = 101
addEdge(t, edge, posting.Get(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
// Now let's add a few properties for the main user.
edge.Value = "Michonne"
addEdge(t, edge, posting.Get(posting.Key(1, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name")))
edge.Value = "female"
addEdge(t, edge, posting.Get(posting.Key(1, "gender")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender")))
edge.Value = "alive"
addEdge(t, edge, posting.Get(posting.Key(1, "status")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status")))
// Now let's add a name for each of the friends, except 101.
edge.Value = "Rick Grimes"
addEdge(t, edge, posting.Get(posting.Key(23, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name")))
edge.Value = "Glenn Rhee"
addEdge(t, edge, posting.Get(posting.Key(24, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name")))
edge.Value = "Daryl Dixon"
addEdge(t, edge, posting.Get(posting.Key(25, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name")))
edge.Value = "Andrea"
addEdge(t, edge, posting.Get(posting.Key(31, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name")))
}
func TestProcessGraph(t *testing.T) {
......
......@@ -58,7 +58,8 @@ func main() {
}
logrus.SetLevel(logrus.InfoLevel)
glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs")
numCpus := runtime.GOMAXPROCS(-1)
glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs")
if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified")
......@@ -98,5 +99,5 @@ func main() {
f.Close()
}
glog.Info("Calling merge lists")
posting.MergeLists(10000)
posting.MergeLists(100 * numCpus) // 100 per core.
}
......@@ -104,7 +104,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) {
// Check if this uid has already been allocated.
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.Get(key)
pl := posting.GetOrCreate(key)
if pl.Length() > 0 {
// Something already present here.
......@@ -175,7 +175,7 @@ func stringKey(xid string) []byte {
func GetOrAssign(xid string) (uid uint64, rerr error) {
key := stringKey(xid)
pl := posting.Get(key)
pl := posting.GetOrCreate(key)
if pl.Length() == 0 {
return assignNew(pl, xid)
......@@ -196,7 +196,7 @@ func GetOrAssign(xid string) (uid uint64, rerr error) {
func ExternalId(uid uint64) (xid string, rerr error) {
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.Get(key)
pl := posting.GetOrCreate(key)
if pl.Length() == 0 {
return "", errors.New("NO external id")
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment