Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make batch sealing retry correct #367

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
7 changes: 3 additions & 4 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,17 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableBatchSeal {
slotMgr = slotmgr.NewSlotMgr()

batchSealTask, err := sealsupra.NewSupraSeal(
batchSealTask, sm, err := sealsupra.NewSupraSeal(
cfg.Seal.BatchSealSectorSize,
cfg.Seal.BatchSealBatchSize,
cfg.Seal.BatchSealPipelines,
!cfg.Seal.SingleHasherPerThread,
cfg.Seal.LayerNVMEDevices,
machineHostPort, slotMgr, db, full, stor, si)
machineHostPort, db, full, stor, si)
if err != nil {
return nil, xerrors.Errorf("setting up batch sealer: %w", err)
}
slotMgr = sm
activeTasks = append(activeTasks, batchSealTask)
addFinalize = true
}
Expand Down
51 changes: 32 additions & 19 deletions extern/supraseal/sealing/supra_seal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ static void init_ctx(size_t sector_size) {
}
}

extern "C"
int supra_version() {
return 0x100001;
}

extern "C"
void supra_seal_init(size_t sector_size, const char* config_file) {
printf("INIT called %s\n", config_file);
Expand Down Expand Up @@ -207,29 +212,37 @@ int c1(size_t block_offset, size_t num_sectors, size_t sector_slot,
const uint8_t* ticket, const char* cache_path,
const char* parents_filename, const char* replica_path,
size_t sector_size) {
size_t qpair = sealing_ctx->topology->c1_qpair;
int node_reader_core = sealing_ctx->topology->c1_reader;
const char* output_dir = cache_path;
try {
size_t qpair = sealing_ctx->topology->c1_qpair;
int node_reader_core = sealing_ctx->topology->c1_reader;
const char* output_dir = cache_path;

init_ctx(sector_size);
init_ctx(sector_size);

#define CALL_C1(C) \
{ \
streaming_node_reader_t<C> reader(sealing_ctx->controllers, qpair, \
block_offset, node_reader_core, \
sealing_ctx->topology->c1_sleep_time); \
return do_c1<C>(reader, \
num_sectors, sector_slot, \
replica_id, seed, \
ticket, cache_path, \
parents_filename, replica_path, \
output_dir); \
}
#define CALL_C1(C) \
{ \
streaming_node_reader_t<C> reader(sealing_ctx->controllers, qpair, \
block_offset, node_reader_core, \
sealing_ctx->topology->c1_sleep_time); \
return do_c1<C>(reader, \
num_sectors, sector_slot, \
replica_id, seed, \
ticket, cache_path, \
parents_filename, replica_path, \
output_dir); \
}

SECTOR_PARAMS_TABLE(SECTOR_CALL_TABLE(CALL_C1));
#undef CALL_C1
SECTOR_PARAMS_TABLE(SECTOR_CALL_TABLE(CALL_C1));
#undef CALL_C1

return 0;
return 0;
} catch (const std::exception& e) {
fprintf(stderr, "Exception in c1: %s\n", e.what());
std::terminate();
} catch (...) {
fprintf(stderr, "Unknown exception in c1\n");
std::terminate();
}
}

template<class C>
Expand Down
1 change: 1 addition & 0 deletions extern/supraseal/sealing/supra_seal.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
extern "C" {
#endif

int supra_version();

// Optional init function.
// config_file - topology config file. Defaults to supra_config.cfg
Expand Down
84 changes: 52 additions & 32 deletions lib/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,8 @@ func (st *Local) ReadSnapVanillaProof(ctx context.Context, sr storiface.SectorRe
return out, nil
}

var supraC1Token = make(chan struct{}, 1)

func (st *Local) supraPoRepVanillaProof(src storiface.SectorPaths, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
batchMetaPath := filepath.Join(src.Cache, BatchMetaFile)
bmdata, err := os.ReadFile(batchMetaPath)
Expand Down Expand Up @@ -1178,49 +1180,67 @@ func (st *Local) supraPoRepVanillaProof(src storiface.SectorPaths, sr storiface.

// first see if commit-phase1-output is there
commitPhase1OutputPath := filepath.Join(src.Cache, CommitPhase1OutputFileSupra)
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
if !os.IsNotExist(err) {
return nil, xerrors.Errorf("stat commit phase1 output: %w", err)
}

parentsPath, err := ParentsForProof(sr.ProofType)
if err != nil {
return nil, xerrors.Errorf("parents for proof: %w", err)
}
var retry bool

// not found, compute it
res := supraffi.C1(bm.BlockOffset, bm.BatchSectors, bm.NumInPipeline, replicaID[:], seed, ticket, src.Cache, parentsPath, src.Sealed, uint64(ssize))
if res != 0 {
return nil, xerrors.Errorf("c1 failed: %d", res)
for {
if retry {
if err := os.Remove(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("remove bad commit phase 1 output file: %w", err)
}
}
retry = true

// check again
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("stat commit phase1 output after compute: %w", err)
if !os.IsNotExist(err) {
return nil, xerrors.Errorf("stat commit phase1 output: %w", err)
}

parentsPath, err := ParentsForProof(sr.ProofType)
if err != nil {
return nil, xerrors.Errorf("parents for proof: %w", err)
}

// not found, compute it
supraC1Token <- struct{}{}
res := supraffi.C1(bm.BlockOffset, bm.BatchSectors, bm.NumInPipeline, replicaID[:], seed, ticket, src.Cache, parentsPath, src.Sealed, uint64(ssize))
<-supraC1Token

if res != 0 {
return nil, xerrors.Errorf("c1 failed: %d", res)
}

// check again
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("stat commit phase1 output after compute: %w", err)
}
}
}

// read the output
rawOut, err := os.ReadFile(commitPhase1OutputPath)
if err != nil {
return nil, xerrors.Errorf("read commit phase1 output: %w", err)
}
// read the output
rawOut, err := os.ReadFile(commitPhase1OutputPath)
if err != nil {
return nil, xerrors.Errorf("read commit phase1 output: %w", err)
}

// decode
dec, err := cuproof.DecodeCommit1OutRaw(bytes.NewReader(rawOut))
if err != nil {
return nil, xerrors.Errorf("decode commit phase1 output: %w", err)
}
// decode
dec, err := cuproof.DecodeCommit1OutRaw(bytes.NewReader(rawOut))
if err != nil {
log.Errorw("failed to decode commit phase1 output, will retry", "err", err)
time.Sleep(1 * time.Second)
continue
}

log.Infow("supraPoRepVanillaProof", "sref", sr, "replicaID", replicaID, "seed", seed, "ticket", ticket, "decrepl", dec.ReplicaID, "decr", dec.CommR, "decd", dec.CommD)
log.Infow("supraPoRepVanillaProof", "sref", sr, "replicaID", replicaID, "seed", seed, "ticket", ticket, "decrepl", dec.ReplicaID, "decr", dec.CommR, "decd", dec.CommD)

// out is json, so we need to marshal it back
out, err := json.Marshal(dec)
if err != nil {
return nil, xerrors.Errorf("marshal commit phase1 output: %w", err)
}
// out is json, so we need to marshal it back
out, err := json.Marshal(dec)
if err != nil {
log.Errorw("failed to decode commit phase1 output", "err", err)
time.Sleep(1 * time.Second)
}

return out, nil
return out, nil
}
}

var _ Store = &Local{}
13 changes: 11 additions & 2 deletions lib/proof/porep_vproof_bin_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"fmt"
"io"

"golang.org/x/xerrors"
)

// This file contains a bincode decoder for Commit1OutRaw.
Expand Down Expand Up @@ -67,8 +69,15 @@ func DecodeCommit1OutRaw(r io.Reader) (Commit1OutRaw, error) {
}

// Read last byte, require EOF
if _, err := r.Read(make([]byte, 1)); err != io.EOF {
return out, fmt.Errorf("expected EOF")
b := make([]byte, 1)
if n, err := r.Read(b); err != io.EOF {
if err != nil {
return out, xerrors.Errorf("expected EOF, got: %w (n:%d, b:%x)", err, n, b)
}

n2, derr := io.Copy(io.Discard, r)

return out, xerrors.Errorf("expected EOF (ndc:%d, b:%x, derr:%x)", int64(n)+n2, b, derr)
}

return out, nil
Expand Down
36 changes: 34 additions & 2 deletions lib/slotmgr/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package slotmgr
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
pre = "slotmgr_"

// KeySlotOffset tags metrics with the slot offset.
KeySlotOffset, _ = tag.NewKey("slot_offset")
)

// SlotMgrMeasures groups all slotmgr metrics.
// SlotMgrMeasures groups the high-level slotmgr metrics.
var SlotMgrMeasures = struct {
SlotsAvailable *stats.Int64Measure
SlotsAcquired *stats.Int64Measure
Expand All @@ -22,25 +26,53 @@ var SlotMgrMeasures = struct {
SlotErrors: stats.Int64(pre+"slot_errors", "Total number of slot errors (e.g., failed to put).", stats.UnitDimensionless),
}

// init registers the views for slotmgr metrics.
// SlotMgrSlotMeasures groups per-slot metrics.
var SlotMgrSlotMeasures = struct {
SlotInUse *stats.Int64Measure
SlotSectorCount *stats.Int64Measure
}{
SlotInUse: stats.Int64(pre+"slot_in_use", "Slot actively in use (batch sealing). 1=in use, 0=not in use", stats.UnitDimensionless),
SlotSectorCount: stats.Int64(pre+"slot_sector_count", "Number of sectors in the slot", stats.UnitDimensionless),
}

func init() {
err := view.Register(
&view.View{
Measure: SlotMgrMeasures.SlotsAvailable,
Description: "Number of available slots",
Aggregation: view.LastValue(),
},
&view.View{
Measure: SlotMgrMeasures.SlotsAcquired,
Description: "Total number of slots acquired",
Aggregation: view.Sum(),
},
&view.View{
Measure: SlotMgrMeasures.SlotsReleased,
Description: "Total number of slots released",
Aggregation: view.Sum(),
},
&view.View{
Measure: SlotMgrMeasures.SlotErrors,
Description: "Total number of slot errors",
Aggregation: view.Sum(),
},

// Register per-slot metrics
&view.View{
Name: pre + "slot_in_use",
Measure: SlotMgrSlotMeasures.SlotInUse,
Description: "Slot is in use (1) or not (0)",
TagKeys: []tag.Key{KeySlotOffset},
Aggregation: view.LastValue(),
},
&view.View{
Name: pre + "slot_sector_count",
Measure: SlotMgrSlotMeasures.SlotSectorCount,
Description: "Number of sectors in a slot",
TagKeys: []tag.Key{KeySlotOffset},
Aggregation: view.LastValue(),
},
)
if err != nil {
panic(err)
Expand Down
Loading
Loading