Skip to content
Snippets Groups Projects
Commit 471cca3e authored by Manish R Jain's avatar Manish R Jain
Browse files

Using concurrent.map, able to load up 1M edges in under 30 seconds. Memory is the new issue.

parent 3c60a5e6
No related branches found
No related tags found
No related merge requests found
......@@ -385,6 +385,9 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error {
func streamEntriesInFile(path string,
afterTs int64, hash uint32, ch chan []byte) error {
// HACK HACK HACK
return nil
flog := glog.WithField("path", path)
f, err := os.Open(path)
if err != nil {
......
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrent
import (
"log"
"sync/atomic"
"unsafe"
)
type kv struct {
k uint64
v unsafe.Pointer
}
type bucket struct {
elems [8]kv
}
type container struct {
active int32
sz uint64
list []*bucket
}
type Map struct {
cs [2]container
}
func powOf2(sz int) bool {
return sz > 0 && (sz&(sz-1)) == 0
}
func initContainer(cs *container, sz uint64) {
cs.active = 1
cs.sz = sz
cs.list = make([]*bucket, sz)
for i := range cs.list {
cs.list[i] = new(bucket)
}
}
func NewMap(sz int) *Map {
if !powOf2(sz) {
log.Fatal("Map can only be created for a power of 2.")
}
m := new(Map)
initContainer(&m.cs[0], uint64(sz))
return m
}
func (m *Map) Get(k uint64) unsafe.Pointer {
for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 {
continue
}
bi := k & (c.sz - 1)
b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
ek := atomic.LoadUint64(&e.k)
if ek == k {
return e.v
}
}
}
return nil
}
func (m *Map) Put(k uint64, v unsafe.Pointer) bool {
for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 {
continue
}
bi := k & (c.sz - 1)
b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
// Once allocated a valid key, it would never change. So, first check if
// it's allocated. If not, then allocate it. If can't, or not allocated,
// then check if it's k. If it is, then replace value. Otherwise continue.
if atomic.CompareAndSwapUint64(&e.k, 0, k) {
atomic.StorePointer(&e.v, v)
return true
}
if atomic.LoadUint64(&e.k) == k {
atomic.StorePointer(&e.v, v)
return true
}
}
}
return false
}
/*
func (m *Map) StreamUntilCap(ch chan uint64) {
for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 {
continue
}
for {
bi := rand.Intn(int(c.sz))
for len(ch) < cap(ch) {
}
}
}
}
*/
func (m *Map) StreamAll(ch chan uint64) {
for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 {
continue
}
for i := 0; i < int(c.sz); i++ {
for _, e := range c.list[i].elems {
if k := atomic.LoadUint64(&e.k); k > 0 {
ch <- k
}
}
}
}
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrent
import (
"math/rand"
"testing"
"unsafe"
"github.com/dgraph-io/dgraph/posting"
"github.com/zond/gotomic"
)
func TestGetAndPut(t *testing.T) {
m := NewMap(1024)
var i uint64
for i = 1; i < 100; i++ {
v := new(uint64)
*v = i
b := unsafe.Pointer(v)
if ok := m.Put(i, b); !ok {
t.Errorf("Couldn't put key: %v", i)
}
}
for i = 1; i < 100; i++ {
p := m.Get(i)
v := (*uint64)(p)
if v == nil {
t.Errorf("Didn't expect nil for i: %v", i)
return
}
if *v != i {
t.Errorf("Expected: %v. Got: %v", i, *v)
}
}
}
func BenchmarkGetAndPut(b *testing.B) {
m := NewMap(1 << 16)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
key := uint64(rand.Int63())
p := m.Get(key)
if p == nil {
l := posting.NewList()
m.Put(key, unsafe.Pointer(l))
}
}
})
}
func BenchmarkGotomic(b *testing.B) {
h := gotomic.NewHash()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
key := uint64(rand.Int63())
_, has := h.Get(gotomic.IntKey(key))
if !has {
l := posting.NewList()
h.Put(gotomic.IntKey(key), l)
}
}
})
}
......@@ -20,9 +20,11 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/concurrent"
"github.com/dgraph-io/dgraph/store"
"github.com/dgryski/go-farm"
)
......@@ -46,14 +48,14 @@ func (c *counters) periodicLog() {
}
}
var lmap *Map
var lcmap *concurrent.Map
var pstore *store.Store
var clog *commit.Logger
var ch chan uint64
var lc *lcounters
func Init(posting *store.Store, log *commit.Logger) {
lmap = NewMap(true)
lcmap = concurrent.NewMap(1 << 20)
pstore = posting
clog = log
ch = make(chan uint64, 10000)
......@@ -77,31 +79,33 @@ func (lc *lcounters) periodicLog() {
func Get(key []byte) *List {
uid := farm.Fingerprint64(key)
l, added := lmap.Get(uid)
if added {
atomic.AddUint64(&lc.miss, 1)
lp := lcmap.Get(uid)
if lp == nil {
l := NewList()
l.init(key, pstore, clog)
} else {
atomic.AddUint64(&lc.hit, 1)
lcmap.Put(uid, unsafe.Pointer(l))
return l
}
return l
return (*List)(lp)
}
/*
func periodicQueueForProcessing(c *counters) {
ticker := time.NewTicker(time.Minute)
for _ = range ticker.C {
lmap.StreamUntilCap(ch)
}
}
*/
func process(c *counters, wg *sync.WaitGroup) {
for eid := range ch {
l, added := lmap.Get(eid)
if l == nil || added {
lp := lcmap.Get(eid)
if lp == nil {
continue
}
atomic.AddUint64(&c.merged, 1)
l := (*List)(lp)
if err := l.MergeIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
}
......@@ -119,15 +123,17 @@ func periodicProcess(c *counters) {
}
func queueAll(c *counters) {
lmap.StreamAllKeys(ch)
lcmap.StreamAll(ch)
close(ch)
}
/*
func StartPeriodicMerging() {
ctr := new(counters)
go periodicQueueForProcessing(ctr)
go periodicProcess(ctr)
}
*/
func MergeLists(numRoutines int) {
c := new(counters)
......
......@@ -13,3 +13,6 @@ In this case, u1 has 110K Gets, while u10 has only ~3000. So, that's good.
But we see a sudden jump to 37k Gets (16%), which took over 1 millisecond!
And 24k requests (~10%) took over a second to return. That's very erratic behaviour.
Overall, only 72% of Gets took under 10 microseconds.
Using my own concurrent map, without mutex locks is *really* *really* fast. Able to
load a million edges in under 30 seconds. Memory usage is now a concern.
......@@ -98,5 +98,5 @@ func main() {
f.Close()
}
glog.Info("Calling merge lists")
posting.MergeLists(100)
posting.MergeLists(10000)
}
......@@ -97,7 +97,7 @@ func main() {
defer clog.Close()
posting.Init(ps, clog)
posting.StartPeriodicMerging()
// posting.StartPeriodicMerging()
http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment