Skip to content

Commit cb46c97

Browse files
authored
Enhancement/disk utilization (#642)
* Add config and worker for blobber updates * Manage files as per its lookuphash * Use contenthash instead of lookuphash * Replace file path calculation with new logic Add temporary directory and files * Fix file delete while moving file to cloud Multiple files can have same content hash within an allocation. Remove iterProgress use Remove . import * Modify filestore setup * Add/modify/optimize file operations as per file manager * Reorder functions and implement lock for file write Modify tests * Restructure setup for testing Add mock for file manager * Check if a path is a mountpoint * Fix flag and mount point check for integration * Fix null scan error to int64 value db.commit to unblock postgres transaction * Add negating build tag to unit tests * Revert use of build tags * Fix/Modify download integration test * Skip integration test if not set in flag * Add build tags to separate integration and unit test partially * Distinguish from integration test Put common function in new file * Update comment, delete unused function Add nil checking for integration testing * Add minor code optimization * Validate actual file content hash w.r.t. clien'ts content hash * Delete connection object after its usage * Optimize file write to temporary directory * Remove some code redundancy * Readability change * Modify config for minio setup * Reorganize/Reorder storage related code * Reorder argument of Filestorer interface * Add temporary fix for tests * Change flag name * Update test * Add filestore mocker and update accordingly * Add file store mocker for tests in allocation package * Add unit test for filestore * Add/Modify/Fix/Optimize code * Fix commit/integration tests issues * Add/Optimize update blobber capacity and other functions * Add tag * Delete unnecessary file * Fix variable name * Remove unused function * Relax mountpoint requirement * Fix mountpoint check temporarily * Modify yaml file for custom docker build * Modify content hash calculation and size increment * Fix size update * Fix for padding less data * Move to files_dir flag temporarily for system testing * Fix file store unit test * Modify comparison to length * Fix validation/test * Change back to what staging has * Change error message * Calculate different content hash for thumbnail * Skip thumbnail hash checking * Fix hash assignment issue * Add comments and modify field * Disable lint check * Rename function * Use helper function to check if test is integration * Fix return issues Remove comment * Rename functions * Remove defer for smaller code blocks * Modify viper's config retrieval function call * Modify function to return error and avoid using go routine * Fix double unlock call * Query in batches rather than single query * Fix lint issue * Fix address issue * Set allocations in batch * Rename function
1 parent 6652c59 commit cb46c97

File tree

67 files changed

+2707
-1608
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2707
-1608
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,4 @@ jobs:
103103
#sudo make integration-tests
104104
go=$(which go)
105105
root=$(pwd)
106-
sudo CGO_ENABLED=1 root=$root integration=1 $go test -tags bn256 ./...
106+
sudo CGO_ENABLED=1 root=$root integration=1 $go test -tags bn256 -tags=integration ./...

code/go/0chain.net/blobber/config.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ func setupConfig(configDir string, deploymentMode int) {
2020
// setup config file
2121
config.SetupConfig(configDir)
2222

23+
if mountPoint != "" {
24+
config.Configuration.MountPoint = mountPoint
25+
} else {
26+
config.Configuration.MountPoint = viper.GetString("storage.files_dir")
27+
}
28+
29+
if config.Configuration.MountPoint == "" {
30+
panic("Please specify mount point in flag or config file")
31+
}
32+
config.Configuration.AllocDirLevel = viper.GetIntSlice("storage.alloc_dir_level")
33+
config.Configuration.FileDirLevel = viper.GetIntSlice("storage.file_dir_level")
2334
config.Configuration.DeploymentMode = byte(deploymentMode)
2435
config.Configuration.ChainID = viper.GetString("server_chain.id")
2536
config.Configuration.SignatureScheme = viper.GetString("server_chain.signature_scheme")
@@ -40,16 +51,28 @@ func setupConfig(configDir string, deploymentMode int) {
4051
config.Configuration.ChallengeResolveNumWorkers = viper.GetInt("challenge_response.num_workers")
4152
config.Configuration.ChallengeMaxRetires = viper.GetInt("challenge_response.max_retries")
4253

54+
config.Configuration.AutomaticUpdate = viper.GetBool("disk_update.automatic_update")
55+
blobberUpdateIntrv := viper.GetDuration("disk_update.blobber_update_interval")
56+
if blobberUpdateIntrv <= 0 {
57+
blobberUpdateIntrv = 5 * time.Minute
58+
}
59+
config.Configuration.BlobberUpdateInterval = blobberUpdateIntrv
60+
4361
config.Configuration.ColdStorageMinimumFileSize = viper.GetInt64("cold_storage.min_file_size")
4462
config.Configuration.ColdStorageTimeLimitInHours = viper.GetInt64("cold_storage.file_time_limit_in_hours")
45-
config.Configuration.ColdStorageJobQueryLimit = viper.GetInt64("cold_storage.job_query_limit")
46-
config.Configuration.ColdStorageStartCapacitySize = viper.GetInt64("cold_storage.start_capacity_size")
63+
config.Configuration.ColdStorageJobQueryLimit = viper.GetInt("cold_storage.job_query_limit")
64+
config.Configuration.ColdStorageStartCapacitySize = viper.GetUint64("cold_storage.start_capacity_size")
4765
config.Configuration.ColdStorageDeleteLocalCopy = viper.GetBool("cold_storage.delete_local_copy")
4866
config.Configuration.ColdStorageDeleteCloudCopy = viper.GetBool("cold_storage.delete_cloud_copy")
4967

5068
config.Configuration.MinioStart = viper.GetBool("minio.start")
5169
config.Configuration.MinioWorkerFreq = viper.GetInt64("minio.worker_frequency")
5270
config.Configuration.MinioUseSSL = viper.GetBool("minio.use_ssl")
71+
config.Configuration.MinioStorageUrl = viper.GetString("minio.storage_service_url")
72+
config.Configuration.MinioAccessID = viper.GetString("minio.access_id")
73+
config.Configuration.MinioSecretKey = viper.GetString("minio.secret_access_key")
74+
config.Configuration.MinioBucket = viper.GetString("minio.bucket_name")
75+
config.Configuration.MinioRegion = viper.GetString("minio.region")
5376

5477
config.Configuration.DBAutoMigrate = viper.GetBool("db.automigrate")
5578
config.Configuration.PGUserName = viper.GetString("pg.user")

code/go/0chain.net/blobber/filestore.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,22 @@ import (
66
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
77
)
88

9-
var fsStore filestore.FileStore //nolint:unused // global which might be needed somewhere
10-
119
func setupFileStore() (err error) {
1210
fmt.Print("> setup file store")
11+
var fs filestore.FileStorer
12+
if isIntegrationTest {
13+
fs = &filestore.MockStore{}
14+
} else {
15+
fs = &filestore.FileStore{}
16+
17+
}
1318

14-
fsStore, err = filestore.SetupFSStore(filesDir + "/files")
19+
err = fs.Initialize()
20+
if err != nil {
21+
return
22+
}
1523

16-
fmt.Print(" [OK]\n")
24+
filestore.SetFileStore(fs)
1725

18-
return err
26+
return nil
1927
}

code/go/0chain.net/blobber/flags.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ var (
1010
deploymentMode int
1111
keysFile string
1212
minioFile string
13-
filesDir string
13+
mountPoint string
1414
metadataDB string
1515
logDir string
1616
httpPort int
@@ -28,7 +28,7 @@ func init() {
2828
flag.IntVar(&deploymentMode, "deployment_mode", 2, "deployment mode: 0=dev,1=test, 2=mainnet")
2929
flag.StringVar(&keysFile, "keys_file", "", "keys_file")
3030
flag.StringVar(&minioFile, "minio_file", "", "minio_file")
31-
flag.StringVar(&filesDir, "files_dir", "", "files_dir")
31+
flag.StringVar(&mountPoint, "files_dir", "", "Mounted partition where all files will be stored")
3232
flag.StringVar(&metadataDB, "db_dir", "", "db_dir")
3333
flag.StringVar(&logDir, "log_dir", "", "log_dir")
3434
flag.IntVar(&httpPort, "port", 0, "port")
@@ -47,10 +47,6 @@ func parseFlags() {
4747
fmt.Print("> load flags")
4848
flag.Parse()
4949

50-
if filesDir == "" {
51-
panic("Please specify --files_dir absolute folder name option where uploaded files can be stored")
52-
}
53-
5450
if metadataDB == "" {
5551
panic("Please specify --db_dir absolute folder name option where meta data db can be stored")
5652
}

code/go/0chain.net/blobber/main.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ func main() {
2323
panic(err)
2424
}
2525

26-
if err := setupMinio(); err != nil {
27-
logging.Logger.Error("Error setting up minio " + err.Error())
28-
panic(err)
29-
}
30-
3126
if err := setupNode(); err != nil {
3227
logging.Logger.Error("Error setting up blobber node " + err.Error())
3328
panic(err)

code/go/0chain.net/blobber/minio.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

code/go/0chain.net/blobber/worker.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package main
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
78
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/challenge"
89
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
910
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
11+
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
1012
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
1113
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
1214
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
@@ -22,8 +24,10 @@ func setupWorkers() {
2224
challenge.SetupWorkers(root)
2325
readmarker.SetupWorkers(root)
2426
writemarker.SetupWorkers(root)
25-
allocation.StartUpdateWorker(root,
26-
config.Configuration.UpdateAllocationsInterval)
27+
allocation.StartUpdateWorker(root, config.Configuration.UpdateAllocationsInterval)
28+
if config.Configuration.AutomaticUpdate {
29+
go StartUpdateWorker(root, config.Configuration.BlobberUpdateInterval)
30+
}
2731
}
2832

2933
func refreshPriceOnChain() {
@@ -64,3 +68,33 @@ func startRefreshSettings() {
6468
<-time.After(REPEAT_DELAY * time.Second)
6569
}
6670
}
71+
72+
func StartUpdateWorker(ctx context.Context, interval time.Duration) {
73+
err := filestore.GetFileStore().CalculateCurrentDiskCapacity()
74+
if err != nil {
75+
panic(err)
76+
}
77+
currentCapacity := filestore.GetFileStore().GetCurrentDiskCapacity()
78+
79+
ticker := time.NewTicker(config.Configuration.BlobberUpdateInterval)
80+
for {
81+
select {
82+
case <-ctx.Done():
83+
return
84+
case <-ticker.C:
85+
err := filestore.GetFileStore().CalculateCurrentDiskCapacity()
86+
if err != nil {
87+
logging.Logger.Error("Error while getting capacity", zap.Error(err))
88+
break
89+
}
90+
if currentCapacity != filestore.GetFileStore().GetCurrentDiskCapacity() {
91+
92+
err := handler.UpdateBlobberOnChain(ctx)
93+
if err != nil {
94+
logging.Logger.Error("Error while updating blobber updates on chain", zap.Error(err))
95+
}
96+
}
97+
}
98+
}
99+
100+
}

code/go/0chain.net/blobber/zcn.go

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
8+
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
89
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
910
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
1011
"github.com/0chain/blobber/code/go/0chain.net/core/common"
@@ -23,42 +24,46 @@ func setupOnChain() {
2324
fmt.Print(" + connect to miners: ")
2425
if isIntegrationTest {
2526
fmt.Print(" [SKIP]\n")
26-
} else {
27-
if err := handler.WalletRegister(); err != nil {
28-
fmt.Println(err.Error() + "\n")
29-
panic(err)
30-
}
31-
fmt.Print(" [OK]\n")
27+
return
3228
}
3329

30+
if err := handler.WalletRegister(); err != nil {
31+
fmt.Println(err.Error() + "\n")
32+
panic(err)
33+
}
34+
fmt.Print(" [OK]\n")
35+
36+
var success bool
37+
var err error
3438
// setup blobber (add or update) on the blockchain (multiple attempts)
3539
for i := 1; i <= 10; i++ {
36-
if i == 1 {
37-
fmt.Printf("\r+ connect to sharders:")
38-
} else {
39-
fmt.Printf("\r+ [%v/10]connect to sharders:", i)
40+
fmt.Printf("\r+ [%v/10]connect to sharders:", i)
41+
if err = filestore.GetFileStore().CalculateCurrentDiskCapacity(); err != nil {
42+
fmt.Print("\n", err.Error()+"\n")
43+
goto sleep
4044
}
4145

42-
if isIntegrationTest {
43-
fmt.Print(" [SKIP]\n")
44-
break
45-
} else {
46-
if err := handler.RegisterBlobber(common.GetRootContext()); err != nil {
47-
if i == 10 { // no more attempts
48-
panic(err)
49-
}
50-
fmt.Print("\n", err.Error()+"\n")
51-
} else {
52-
fmt.Print(" [OK]\n")
53-
break
54-
}
55-
for n := 0; n < ATTEMPT_DELAY; n++ {
56-
<-time.After(1 * time.Second)
57-
58-
fmt.Printf("\r- wait %v seconds to retry", ATTEMPT_DELAY-n)
59-
}
46+
if err = handler.RegisterBlobber(common.GetRootContext()); err != nil {
47+
fmt.Print("\n", err.Error()+"\n")
48+
goto sleep
49+
}
50+
51+
fmt.Print(" [OK]\n")
52+
success = true
53+
break
54+
55+
sleep:
56+
for n := 0; n < ATTEMPT_DELAY; n++ {
57+
<-time.After(1 * time.Second)
58+
59+
fmt.Printf("\r- wait %v seconds to retry", ATTEMPT_DELAY-n)
6060
}
6161
}
62+
63+
if !success {
64+
panic(err)
65+
}
66+
6267
if !isIntegrationTest {
6368
go setupWorkers()
6469

0 commit comments

Comments
 (0)