Skip to content
Snippets Groups Projects
Commit 2b6a2e66 authored by Manish R Jain's avatar Manish R Jain
Browse files

BREAKING changes to map.go

parent 471cca3e
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ package concurrent ...@@ -18,6 +18,7 @@ package concurrent
import ( import (
"log" "log"
"math/rand"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
) )
...@@ -31,14 +32,21 @@ type bucket struct { ...@@ -31,14 +32,21 @@ type bucket struct {
elems [8]kv elems [8]kv
} }
const (
MUTABLE = iota
IMMUTABLE
)
type container struct { type container struct {
active int32 status int32
sz uint64 sz uint64
list []*bucket list []*bucket
numElems uint32
} }
type Map struct { type Map struct {
cs [2]container cs [2]unsafe.Pointer
size uint32
} }
func powOf2(sz int) bool { func powOf2(sz int) bool {
...@@ -46,7 +54,7 @@ func powOf2(sz int) bool { ...@@ -46,7 +54,7 @@ func powOf2(sz int) bool {
} }
func initContainer(cs *container, sz uint64) { func initContainer(cs *container, sz uint64) {
cs.active = 1 cs.status = MUTABLE
cs.sz = sz cs.sz = sz
cs.list = make([]*bucket, sz) cs.list = make([]*bucket, sz)
for i := range cs.list { for i := range cs.list {
...@@ -59,47 +67,98 @@ func NewMap(sz int) *Map { ...@@ -59,47 +67,98 @@ func NewMap(sz int) *Map {
log.Fatal("Map can only be created for a power of 2.") log.Fatal("Map can only be created for a power of 2.")
} }
c := new(container)
initContainer(c, uint64(sz))
m := new(Map) m := new(Map)
initContainer(&m.cs[0], uint64(sz)) m.cs[MUTABLE] = unsafe.Pointer(c)
m.cs[IMMUTABLE] = nil
return m return m
} }
func (m *Map) Get(k uint64) unsafe.Pointer { func (c *container) get(k uint64) unsafe.Pointer {
for _, c := range m.cs { bi := k & (c.sz - 1)
if atomic.LoadInt32(&c.active) == 0 { b := c.list[bi]
continue for i := range b.elems {
e := &b.elems[i]
if ek := atomic.LoadUint64(&e.k); ek == k {
return e.v
} }
bi := k & (c.sz - 1) }
b := c.list[bi] return nil
for i := range b.elems { }
e := &b.elems[i]
ek := atomic.LoadUint64(&e.k) func (c *container) getOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer {
if ek == k { bi := k & (c.sz - 1)
return e.v b := c.list[bi]
for i := range b.elems {
e := &b.elems[i]
// Once allocated a valid key, it would never change. So, first check if
// it's allocated. If not, then allocate it. If can't, or not allocated,
// then check if it's k. If it is, then replace value. Otherwise continue.
// This sequence could be problematic, if this happens:
// Main thread runs Step 1. Check
if atomic.CompareAndSwapUint64(&e.k, 0, k) { // Step 1.
atomic.AddUint32(&c.numElems, 1)
if atomic.CompareAndSwapPointer(&e.v, nil, v) {
return v
} }
return atomic.LoadPointer(&e.v)
}
if atomic.LoadUint64(&e.k) == k {
// Swap if previous pointer is nil.
if atomic.CompareAndSwapPointer(&e.v, nil, v) {
return v
}
return atomic.LoadPointer(&e.v)
}
}
return nil
}
func (m *Map) GetOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer {
if v == nil {
log.Fatal("GetOrInsert doesn't allow setting nil pointers.")
return nil
}
// Check immutable first.
cval := atomic.LoadPointer(&m.cs[IMMUTABLE])
if cval != nil {
c := (*container)(cval)
if pv := c.get(k); pv != nil {
return pv
} }
} }
// Okay, deal with mutable container now.
cval = atomic.LoadPointer(&m.cs[MUTABLE])
if cval == nil {
log.Fatal("This is disruptive in a bad way.")
}
c := (*container)(cval)
if pv := c.getOrInsert(k, v); pv != nil {
return pv
}
// We still couldn't insert the key. Time to grow.
// TODO: Handle this case.
return nil return nil
} }
func (m *Map) Put(k uint64, v unsafe.Pointer) bool { func (m *Map) SetNilIfPresent(k uint64) bool {
for _, c := range m.cs { for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 { if atomic.LoadInt32(&c.status) == 0 {
continue continue
} }
bi := k & (c.sz - 1) bi := k & (c.sz - 1)
b := c.list[bi] b := c.list[bi]
for i := range b.elems { for i := range b.elems {
e := &b.elems[i] e := &b.elems[i]
// Once allocated a valid key, it would never change. So, first check if
// it's allocated. If not, then allocate it. If can't, or not allocated,
// then check if it's k. If it is, then replace value. Otherwise continue.
if atomic.CompareAndSwapUint64(&e.k, 0, k) {
atomic.StorePointer(&e.v, v)
return true
}
if atomic.LoadUint64(&e.k) == k { if atomic.LoadUint64(&e.k) == k {
atomic.StorePointer(&e.v, v) // Set to nil.
atomic.StorePointer(&e.v, nil)
return true return true
} }
} }
...@@ -107,24 +166,30 @@ func (m *Map) Put(k uint64, v unsafe.Pointer) bool { ...@@ -107,24 +166,30 @@ func (m *Map) Put(k uint64, v unsafe.Pointer) bool {
return false return false
} }
/*
func (m *Map) StreamUntilCap(ch chan uint64) { func (m *Map) StreamUntilCap(ch chan uint64) {
for _, c := range m.cs { for {
if atomic.LoadInt32(&c.active) == 0 { ci := rand.Intn(2)
continue c := m.cs[ci]
if atomic.LoadInt32(&c.status) == 0 {
ci += 1
c = m.cs[ci%2] // use the other.
} }
for { bi := rand.Intn(int(c.sz))
bi := rand.Intn(int(c.sz))
for len(ch) < cap(ch) { for _, e := range c.list[bi].elems {
if len(ch) >= cap(ch) {
return
}
if k := atomic.LoadUint64(&e.k); k > 0 {
ch <- k
} }
} }
} }
} }
*/
func (m *Map) StreamAll(ch chan uint64) { func (m *Map) StreamAll(ch chan uint64) {
for _, c := range m.cs { for _, c := range m.cs {
if atomic.LoadInt32(&c.active) == 0 { if atomic.LoadInt32(&c.status) == 0 {
continue continue
} }
for i := 0; i < int(c.sz); i++ { for i := 0; i < int(c.sz); i++ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment