Skip to content

Commit 00c4d25

Browse files
committed
Fixes, updates and refactor.
1 parent 3b7b1f2 commit 00c4d25

29 files changed

+425
-102
lines changed

Gopkg.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

RequestStream.js

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

configuration.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ Backend = "chord-single-offer" # chord-random, chord-single-offer, chord-mult
1010
SpreadOffersInterval = "5m"
1111
RefreshingInterval = "5m"
1212
# Debug performance flags
13-
SpreadOffers = false
13+
SpreadOffers = true
1414
SpreadPartitionsState = true
15+
GUIDEstimatedNetworkSize = 5000
16+
GUIDScaleFactor = 3
1517
[Caravela.DiscoveryBackend.RandomChordBackend]
1618
RandBackendMaxRetries = 4
1719
[Caravela.Resources]

engine/engine.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func NewEngine(metricsCollector *metrics.Collector, simConfig *configuration.Con
5252

5353
nodes: make([]*caravelaNode.Node, simConfig.TotalNumberOfNodes()),
5454
overlayMock: nil,
55-
feeder: feeder.Create(simConfig, baseRngSeed),
55+
feeder: feeder.Create(simConfig, caravelaConfigurations, baseRngSeed),
5656

5757
metricsCollector: metricsCollector,
5858
workersPool: grpool.NewPool(maxWorkers, maxWorkers*6),
@@ -66,7 +66,7 @@ func (e *Engine) Init() {
6666
util.Log.Info(util.LogTag(engineLogTag) + "Initializing...")
6767

6868
// Init CARAVELA's packages structures.
69-
caravela.Init(e.simulatorConfigs.CaravelaLogsLevel())
69+
caravela.Init(e.simulatorConfigs.CaravelaLogsLevel(), e.caravelaConfigs)
7070

7171
// External node's component mocks (Creation and initialization).
7272
apiServerMock := caravela.NewAPIServerMock()
@@ -110,15 +110,16 @@ func (e *Engine) Init() {
110110
e.metricsCollector.InitNewSimulation(e.caravelaConfigs.DiscoveryBackend(), maxNodesResources)
111111

112112
// Initialize request feeder.
113-
e.feeder.Init(e.metricsCollector)
113+
cpus, memory := dockerClientMock.MaxResourcesAvailable()
114+
e.feeder.Init(e.metricsCollector, types.Resources{CPUs: cpus, Memory: memory})
114115

115116
e.isInit = true
116117
util.Log.Info(util.LogTag(engineLogTag) + "Initialized")
117118
}
118119

119120
// Start starts the simulator engine.
120121
func (e *Engine) Start() {
121-
const ticksPerPersist = 1
122+
const ticksPerPersist = 5
122123

123124
if !e.isInit {
124125
panic(errors.New("simulator is not initialized"))
@@ -148,16 +149,17 @@ func (e *Engine) Start() {
148149
simCurrentTime = simCurrentTime + e.simulatorConfigs.TicksInterval()
149150
numTicks++
150151
if numTicks == e.simulatorConfigs.MaximumTicks() {
151-
close(ticksChan) // Alert feeder that the engine has ended.
152152
break
153153
}
154+
154155
if numTicks != 0 && (numTicks%ticksPerPersist) == 0 {
155156
e.metricsCollector.Persist(simCurrentTime)
156157
continue
157158
}
158159
e.metricsCollector.CreateNewGlobalSnapshot(simCurrentTime)
159160
}
160161

162+
close(ticksChan) // Alert feeder that the engine has ended.
161163
e.metricsCollector.EndSimulation(simCurrentTime)
162164

163165
util.Log.Info(util.LogTag(engineLogTag) + "Simulation Ended")

engine/feeder/feeder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package feeder
22

33
import (
44
"github.com/strabox/caravela-sim/engine/metrics"
5+
"github.com/strabox/caravela/api/types"
56
"github.com/strabox/caravela/node"
67
"time"
78
)
89

910
type RequestTask func(randNodeIndex int, randNode *node.Node, currentTime time.Duration)
1011

1112
type Feeder interface {
12-
Init(metricsCollector *metrics.Collector)
13+
Init(metricsCollector *metrics.Collector, systemTotalResources types.Resources)
1314
Start(ticksChannel <-chan chan RequestTask)
1415
}

engine/feeder/feeder_factory.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@ import (
55
"fmt"
66
"github.com/strabox/caravela-sim/configuration"
77
"github.com/strabox/caravela-sim/util"
8+
caravelaConfigs "github.com/strabox/caravela/configuration"
89
"log"
910
"strings"
1011
)
1112

1213
// Factory represents a method that creates new requests feeders.
13-
type Factory func(config *configuration.Configuration, rngSeed int64) (Feeder, error)
14+
type Factory func(simConfig *configuration.Configuration, caravelaConfigs *caravelaConfigs.Configuration, rngSeed int64) (Feeder, error)
1415

1516
// feeders holds all the registered requests feeder available.
1617
var feeders = make(map[string]Factory)
1718

1819
// init initializes our predefined request feeders.
1920
func init() {
2021
Register("random", newRandomFeeder)
22+
Register("json", newJsonFeeder)
2123
}
2224

2325
// Register can be used to register a new request feeder in order to be available.
@@ -33,7 +35,7 @@ func Register(feederName string, factory Factory) {
3335
}
3436

3537
// Create is used to obtain a request feeder based on the configurations.
36-
func Create(config *configuration.Configuration, rngSeed int64) Feeder {
38+
func Create(config *configuration.Configuration, caravelaConfigs *caravelaConfigs.Configuration, rngSeed int64) Feeder {
3739
configuredFeeder := config.Feeder()
3840

3941
feederFactory, exist := feeders[configuredFeeder]
@@ -47,7 +49,7 @@ func Create(config *configuration.Configuration, rngSeed int64) Feeder {
4749
log.Panic(err)
4850
}
4951

50-
feeder, err := feederFactory(config, rngSeed)
52+
feeder, err := feederFactory(config, caravelaConfigs, rngSeed)
5153
if err != nil {
5254
log.Panic(err)
5355
}

engine/feeder/json_feeder.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package feeder
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/strabox/caravela-sim/configuration"
8+
"github.com/strabox/caravela-sim/engine/metrics"
9+
"github.com/strabox/caravela-sim/mocks/caravela"
10+
"github.com/strabox/caravela-sim/util"
11+
"github.com/strabox/caravela/api/types"
12+
caravelaConfigs "github.com/strabox/caravela/configuration"
13+
"github.com/strabox/caravela/node"
14+
caravelaUtil "github.com/strabox/caravela/util"
15+
"math"
16+
"math/rand"
17+
"os"
18+
"strconv"
19+
"sync"
20+
"time"
21+
)
22+
23+
const logJsonFeederTag = "JS-FEEDER"
24+
25+
// jsonFeeder generates a stream of user requests reading from a json file.
26+
type jsonFeeder struct {
27+
collector *metrics.Collector // Metrics collector that collects system level metrics.
28+
reqInjectionNode sync.Map // Map of ContainerID<->NodeIndex.
29+
systemTotalResources types.Resources // Caravela's maximum resources.
30+
randomGenerator *rand.Rand // Pseudo-random generator.
31+
simConfigs *configuration.Configuration // Simulator's configurations.
32+
caravelaConfigs *caravelaConfigs.Configuration // Caravela's configurations.
33+
}
34+
35+
// newJsonFeeder creates a new json feeder.
36+
func newJsonFeeder(simConfigs *configuration.Configuration, caravelaConfigs *caravelaConfigs.Configuration, rngSeed int64) (Feeder, error) {
37+
return &jsonFeeder{
38+
collector: nil,
39+
reqInjectionNode: sync.Map{},
40+
randomGenerator: rand.New(caravelaUtil.NewSourceSafe(rand.NewSource(rngSeed))),
41+
simConfigs: simConfigs,
42+
caravelaConfigs: caravelaConfigs,
43+
}, nil
44+
}
45+
46+
func (j *jsonFeeder) Init(metricsCollector *metrics.Collector, systemTotalResources types.Resources) {
47+
j.collector = metricsCollector
48+
j.systemTotalResources = systemTotalResources
49+
}
50+
51+
func (j *jsonFeeder) Start(ticksChannel <-chan chan RequestTask) {
52+
type Request struct {
53+
Time int64 `json:"Time"`
54+
JobID int64 `json:"job id"`
55+
EventType int `json:"event type"`
56+
User string `json:"user"`
57+
CPUs float64 `json:"CPU request"`
58+
Memory float64 `json:"memory request"`
59+
}
60+
tick := 0
61+
62+
fileReader, err := os.Open("RequestStream.js")
63+
if err != nil {
64+
panic(fmt.Errorf("json feeder invalid request stream file: %s", err))
65+
}
66+
67+
jsonRequestStream := json.NewDecoder(fileReader)
68+
_, err = jsonRequestStream.Token() // Read open bracket
69+
if err != nil {
70+
panic(err)
71+
}
72+
73+
for {
74+
select {
75+
case newTickChan, more := <-ticksChannel: // Send all the requests for this tickChan
76+
if more {
77+
tickCpusAcc := 0
78+
tickMemoryAcc := 0
79+
80+
// Generate the requests from the json request stream.
81+
for jsonRequestStream.More() && (j.ratioSystemResources(tickCpusAcc, tickMemoryAcc) < 0.05) {
82+
var reqJson Request
83+
err := jsonRequestStream.Decode(&reqJson)
84+
if err != nil {
85+
panic(err)
86+
}
87+
88+
requestID := strconv.FormatInt(reqJson.JobID, 10)
89+
reqResources := j.generateRequestResources(reqJson.CPUs, reqJson.Memory)
90+
tickCpusAcc += reqResources.CPUs
91+
tickMemoryAcc += reqResources.Memory
92+
93+
if reqJson.EventType == 1 && !j.requestExists(requestID) { // Deploy container request.
94+
95+
newTickChan <- func(nodeIndex int, injectedNode *node.Node, currentTime time.Duration) {
96+
requestCtx := context.WithValue(context.Background(), types.RequestIDKey, requestID)
97+
j.collector.CreateRunRequest(nodeIndex, requestID, reqResources, currentTime)
98+
contStatus, err := injectedNode.SubmitContainers(
99+
requestCtx,
100+
[]types.ContainerConfig{{
101+
ImageKey: util.RandomName(),
102+
Name: util.RandomName(),
103+
PortMappings: caravela.EmptyPortMappings(),
104+
Args: caravela.EmptyContainerArgs(),
105+
Resources: reqResources,
106+
GroupPolicy: types.SpreadGroupPolicy,
107+
}})
108+
if err == nil {
109+
j.reqInjectionNode.Store(contStatus[0].ContainerID, injectedNode)
110+
j.collector.RunRequestSucceeded()
111+
}
112+
j.collector.ArchiveRunRequest(requestID)
113+
}
114+
115+
} else if (reqJson.EventType == 2 || reqJson.EventType == 3 || reqJson.EventType == 4 ||
116+
reqJson.EventType == 5 || reqJson.EventType == 6) && j.requestExists(requestID) { // Stop container request.
117+
118+
j.reqInjectionNode.Range(func(key, value interface{}) bool {
119+
containerID, _ := key.(string)
120+
injectionNode, _ := value.(*node.Node)
121+
newTickChan <- func(_ int, _ *node.Node, _ time.Duration) {
122+
err := injectionNode.StopContainers(context.Background(), []string{containerID})
123+
if err != nil {
124+
//util.Log.Infof(util.LogTag(logRandFeederTag)+"Stop container FAILED, err: %s", err)
125+
}
126+
j.reqInjectionNode.Delete(containerID)
127+
}
128+
return false
129+
})
130+
131+
}
132+
133+
}
134+
135+
if !jsonRequestStream.More() {
136+
fileReader.Close() // Close the request file.
137+
close(newTickChan) // No more user requests for this tick.
138+
return
139+
}
140+
141+
close(newTickChan) // No more user requests for this tick.
142+
} else { // Simulator closed ticks channel.
143+
fileReader.Close() // Close the request file.
144+
return // Stop feeding engine
145+
}
146+
}
147+
tick++
148+
}
149+
}
150+
151+
// generateRequestResources ...
152+
func (j *jsonFeeder) generateRequestResources(normalizedCpus, normalizedMemory float64) types.Resources {
153+
cpuClasses := make([]int, len(j.caravelaConfigs.ResourcesPartitions().CPUClasses))
154+
maxCpus := 0
155+
maxMemory := 0
156+
157+
cpuClassAcc := 0
158+
for i, cpuClass := range j.caravelaConfigs.ResourcesPartitions().CPUClasses {
159+
cpuClasses[i] = cpuClass.Percentage + cpuClassAcc
160+
cpuClassAcc += cpuClass.Percentage
161+
for _, cpus := range cpuClass.CPUCores {
162+
if cpus.Value > maxCpus {
163+
maxCpus = cpus.Value
164+
}
165+
for _, memory := range cpus.Memory {
166+
if memory.Value > maxMemory {
167+
maxMemory = memory.Value
168+
}
169+
}
170+
}
171+
}
172+
173+
chosenCpuClass := 0
174+
randInt := j.randomGenerator.Intn(101)
175+
for i, cpuClass := range cpuClasses {
176+
if randInt <= cpuClass {
177+
chosenCpuClass = i
178+
}
179+
}
180+
181+
return types.Resources{
182+
CPUClass: types.CPUClass(chosenCpuClass),
183+
CPUs: int(math.Ceil(normalizedCpus * float64(maxCpus))),
184+
Memory: int(math.Ceil(normalizedMemory * float64(maxMemory))),
185+
}
186+
}
187+
188+
// ratioSystemResources returns the ratio of resources given considered the system's total resources.
189+
func (j *jsonFeeder) ratioSystemResources(cpus, memory int) float64 {
190+
cpusRatio := float64(cpus) / float64(j.systemTotalResources.CPUs)
191+
memoryRatio := float64(memory) / float64(j.systemTotalResources.Memory)
192+
return (cpusRatio + memoryRatio) / 2
193+
}
194+
195+
// requestExists verifies if a request was already injected in the request stream.
196+
func (j *jsonFeeder) requestExists(requestID string) bool {
197+
_, ok := j.reqInjectionNode.Load(requestID)
198+
return ok
199+
}

0 commit comments

Comments
 (0)