Skip to content
Snippets Groups Projects
Unverified Commit 24189bfe authored by Manish R Jain's avatar Manish R Jain
Browse files

Improve dgraphloader output. Also, reuse byte slices to avoid a 35% memory usage by Marshal.

parent b2addd6b
Branches
No related tags found
No related merge requests found
......@@ -41,15 +41,24 @@ type response struct {
Message string `json:"message"`
}
type status struct {
rdfs uint64
mutations uint64
start time.Time
}
var hc http.Client
var r response
var s *status
func makeRequest(mutation chan string, c *uint64, wg *sync.WaitGroup) {
var counter uint64
func makeRequests(mutation chan string, wg *sync.WaitGroup) {
for m := range mutation {
counter = atomic.AddUint64(c, 1)
counter := atomic.AddUint64(&s.mutations, 1)
if counter%100 == 0 {
fmt.Printf("Request: %v\n", counter)
num := atomic.LoadUint64(&s.rdfs)
dur := time.Since(s.start)
rate := float64(num) / dur.Seconds()
fmt.Printf("[Request: %6d] Total RDFs done: %8d RDFs per second: %7.0f\r", counter, num, rate)
}
RETRY:
req, err := http.NewRequest("POST", *dgraph, strings.NewReader(body(m)))
......@@ -99,7 +108,7 @@ func readLine(r *bufio.Reader, buf *bytes.Buffer) error {
// processFile sends mutations for a given gz file.
func processFile(file string) {
fmt.Printf("Processing %s\n", file)
fmt.Printf("Processing %s\n\n", file)
f, err := os.Open(file)
x.Check(err)
defer f.Close()
......@@ -109,31 +118,29 @@ func processFile(file string) {
hc = http.Client{Timeout: time.Minute}
mutation := make(chan string, 3*(*concurrent))
var count uint64
var wg sync.WaitGroup
for i := 0; i < *concurrent; i++ {
wg.Add(1)
go makeRequest(mutation, &count, &wg)
go makeRequests(mutation, &wg)
}
var buf bytes.Buffer
bufReader := bufio.NewReader(gr)
num := 0
var rdfCount uint64
for {
err = readLine(bufReader, &buf)
if err != nil {
break
}
buf.WriteRune('\n')
atomic.AddUint64(&s.rdfs, 1)
num++
if num >= *numRdf {
mutation <- buf.String()
buf.Reset()
num = 0
}
rdfCount++
num++
}
if err != io.EOF {
x.Checkf(err, "Error while reading file")
......@@ -144,16 +151,21 @@ func processFile(file string) {
close(mutation)
wg.Wait()
fmt.Println("Number of RDF's parsed: ", rdfCount)
fmt.Println("Number of mutations run: ", count)
}
func main() {
x.Init()
s = &status{
start: time.Now(),
}
filesList := strings.Split(*files, ",")
x.AssertTrue(len(filesList) > 0)
for _, file := range filesList {
processFile(file)
}
fmt.Printf("Number of mutations run : %d\n", s.mutations)
fmt.Printf("Number of RDFs processed : %d\n", s.rdfs)
fmt.Printf("Time spent : %v\n", time.Since(s.start))
fmt.Printf("RDFs processed per second : %d\n", s.rdfs/uint64(time.Since(s.start).Seconds()))
}
......@@ -175,16 +175,30 @@ func (h *header) Decode(in []byte) {
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
var slicePool = sync.Pool{
New: func() interface{} {
return make([]byte, 256<<10)
},
}
func (n *node) ProposeAndWait(ctx context.Context, proposal *task.Proposal) error {
if n.raft == nil {
return x.Errorf("RAFT isn't initialized yet")
}
proposal.Id = rand.Uint32()
proposalData, err := proposal.Marshal()
slice := slicePool.Get().([]byte)
if len(slice) < proposal.Size() {
slice = make([]byte, proposal.Size())
}
defer slicePool.Put(slice)
upto, err := proposal.MarshalTo(slice)
if err != nil {
return err
}
proposalData := slice[:upto]
che := make(chan error, 1)
n.props.Store(proposal.Id, che)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment