diff --git a/commit/log.go b/commit/log.go index 552f0d1efc83280e7900d72c66d2c5d70ed0719f..fc478310ee710b77b2b49819c794fc0e86b4982a 100644 --- a/commit/log.go +++ b/commit/log.go @@ -385,6 +385,9 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { func streamEntriesInFile(path string, afterTs int64, hash uint32, ch chan []byte) error { + // HACK HACK HACK + return nil + flog := glog.WithField("path", path) f, err := os.Open(path) if err != nil { diff --git a/concurrent/map.go b/concurrent/map.go new file mode 100644 index 0000000000000000000000000000000000000000..41612fe93ebaa52083bab491e0dcdc3e6da23dd2 --- /dev/null +++ b/concurrent/map.go @@ -0,0 +1,138 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package concurrent + +import ( + "log" + "sync/atomic" + "unsafe" +) + +type kv struct { + k uint64 + v unsafe.Pointer +} + +type bucket struct { + elems [8]kv +} + +type container struct { + active int32 + sz uint64 + list []*bucket +} + +type Map struct { + cs [2]container +} + +func powOf2(sz int) bool { + return sz > 0 && (sz&(sz-1)) == 0 +} + +func initContainer(cs *container, sz uint64) { + cs.active = 1 + cs.sz = sz + cs.list = make([]*bucket, sz) + for i := range cs.list { + cs.list[i] = new(bucket) + } +} + +func NewMap(sz int) *Map { + if !powOf2(sz) { + log.Fatal("Map can only be created for a power of 2.") + } + + m := new(Map) + initContainer(&m.cs[0], uint64(sz)) + return m +} + +func (m *Map) Get(k uint64) unsafe.Pointer { + for _, c := range m.cs { + if atomic.LoadInt32(&c.active) == 0 { + continue + } + bi := k & (c.sz - 1) + b := c.list[bi] + for i := range b.elems { + e := &b.elems[i] + ek := atomic.LoadUint64(&e.k) + if ek == k { + return e.v + } + } + } + return nil +} + +func (m *Map) Put(k uint64, v unsafe.Pointer) bool { + for _, c := range m.cs { + if atomic.LoadInt32(&c.active) == 0 { + continue + } + bi := k & (c.sz - 1) + b := c.list[bi] + for i := range b.elems { + e := &b.elems[i] + // Once allocated a valid key, it would never change. So, first check if + // it's allocated. If not, then allocate it. If can't, or not allocated, + // then check if it's k. If it is, then replace value. Otherwise continue. + if atomic.CompareAndSwapUint64(&e.k, 0, k) { + atomic.StorePointer(&e.v, v) + return true + } + if atomic.LoadUint64(&e.k) == k { + atomic.StorePointer(&e.v, v) + return true + } + } + } + return false +} + +/* +func (m *Map) StreamUntilCap(ch chan uint64) { + for _, c := range m.cs { + if atomic.LoadInt32(&c.active) == 0 { + continue + } + for { + bi := rand.Intn(int(c.sz)) + for len(ch) < cap(ch) { + } + } + } +} +*/ + +func (m *Map) StreamAll(ch chan uint64) { + for _, c := range m.cs { + if atomic.LoadInt32(&c.active) == 0 { + continue + } + for i := 0; i < int(c.sz); i++ { + for _, e := range c.list[i].elems { + if k := atomic.LoadUint64(&e.k); k > 0 { + ch <- k + } + } + } + } +} diff --git a/concurrent/map_test.go b/concurrent/map_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7090eff5a09a4fc76b1b69998a06e4bf487851f3 --- /dev/null +++ b/concurrent/map_test.go @@ -0,0 +1,78 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package concurrent + +import ( + "math/rand" + "testing" + "unsafe" + + "github.com/dgraph-io/dgraph/posting" + "github.com/zond/gotomic" +) + +func TestGetAndPut(t *testing.T) { + m := NewMap(1024) + var i uint64 + for i = 1; i < 100; i++ { + v := new(uint64) + *v = i + b := unsafe.Pointer(v) + if ok := m.Put(i, b); !ok { + t.Errorf("Couldn't put key: %v", i) + } + } + for i = 1; i < 100; i++ { + p := m.Get(i) + v := (*uint64)(p) + if v == nil { + t.Errorf("Didn't expect nil for i: %v", i) + return + } + if *v != i { + t.Errorf("Expected: %v. Got: %v", i, *v) + } + } +} + +func BenchmarkGetAndPut(b *testing.B) { + m := NewMap(1 << 16) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + key := uint64(rand.Int63()) + p := m.Get(key) + if p == nil { + l := posting.NewList() + m.Put(key, unsafe.Pointer(l)) + } + } + }) +} + +func BenchmarkGotomic(b *testing.B) { + h := gotomic.NewHash() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + key := uint64(rand.Int63()) + _, has := h.Get(gotomic.IntKey(key)) + if !has { + l := posting.NewList() + h.Put(gotomic.IntKey(key), l) + } + } + }) +} diff --git a/posting/lists.go b/posting/lists.go index 40d8410c173c02cf87a42c9ec18a66c6d999d1aa..0b7a79a1ae1b90641949af5461c9106e01218acf 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -20,9 +20,11 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" + "github.com/dgraph-io/dgraph/concurrent" "github.com/dgraph-io/dgraph/store" "github.com/dgryski/go-farm" ) @@ -46,14 +48,14 @@ func (c *counters) periodicLog() { } } -var lmap *Map +var lcmap *concurrent.Map var pstore *store.Store var clog *commit.Logger var ch chan uint64 var lc *lcounters func Init(posting *store.Store, log *commit.Logger) { - lmap = NewMap(true) + lcmap = concurrent.NewMap(1 << 20) pstore = posting clog = log ch = make(chan uint64, 10000) @@ -77,31 +79,33 @@ func (lc *lcounters) periodicLog() { func Get(key []byte) *List { uid := farm.Fingerprint64(key) - l, added := lmap.Get(uid) - if added { - atomic.AddUint64(&lc.miss, 1) + lp := lcmap.Get(uid) + if lp == nil { + l := NewList() l.init(key, pstore, clog) - - } else { - atomic.AddUint64(&lc.hit, 1) + lcmap.Put(uid, unsafe.Pointer(l)) + return l } - return l + return (*List)(lp) } +/* func periodicQueueForProcessing(c *counters) { ticker := time.NewTicker(time.Minute) for _ = range ticker.C { lmap.StreamUntilCap(ch) } } +*/ func process(c *counters, wg *sync.WaitGroup) { for eid := range ch { - l, added := lmap.Get(eid) - if l == nil || added { + lp := lcmap.Get(eid) + if lp == nil { continue } atomic.AddUint64(&c.merged, 1) + l := (*List)(lp) if err := l.MergeIfDirty(); err != nil { glog.WithError(err).Error("While commiting dirty list.") } @@ -119,15 +123,17 @@ func periodicProcess(c *counters) { } func queueAll(c *counters) { - lmap.StreamAllKeys(ch) + lcmap.StreamAll(ch) close(ch) } +/* func StartPeriodicMerging() { ctr := new(counters) go periodicQueueForProcessing(ctr) go periodicProcess(ctr) } +*/ func MergeLists(numRoutines int) { c := new(counters) diff --git a/posting/notes.txt b/posting/notes.txt index c3456c88b3e9b568721b0f6da1616a3c0d772ca1..24d4a343fb5dd3045cd7a74d391f8e233fb1e728 100644 --- a/posting/notes.txt +++ b/posting/notes.txt @@ -13,3 +13,6 @@ In this case, u1 has 110K Gets, while u10 has only ~3000. So, that's good. But we see a sudden jump to 37k Gets (16%), which took over 1 millisecond! And 24k requests (~10%) took over a second to return. That's very erratic behaviour. Overall, only 72% of Gets took under 10 microseconds. + +Using my own concurrent map, without mutex locks is *really* *really* fast. Able to +load a million edges in under 30 seconds. Memory usage is now a concern. diff --git a/server/loader/main.go b/server/loader/main.go index 3c3af7d3d05dff6ad9f1f855491481b086e77ab5..03ddb90c402368dbe7f5a3397d9cdb36d36898e6 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -98,5 +98,5 @@ func main() { f.Close() } glog.Info("Calling merge lists") - posting.MergeLists(100) + posting.MergeLists(10000) } diff --git a/server/main.go b/server/main.go index 880bf99b74543bdb21cca7e7fab7bc49b234c700..35d94a163ed15e0ed7b50df319a7b4c337616035 100644 --- a/server/main.go +++ b/server/main.go @@ -97,7 +97,7 @@ func main() { defer clog.Close() posting.Init(ps, clog) - posting.StartPeriodicMerging() + // posting.StartPeriodicMerging() http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...")