Disk ARchive 2.7.18
Full featured and portable backup and archiving tool
Loading...
Searching...
No Matches
parallel_tronconneuse.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// dar - disk archive - a backup/restoration program
3// Copyright (C) 2002-2025 Denis Corbin
4//
5// This program is free software; you can redistribute it and/or
6// modify it under the terms of the GNU General Public License
7// as published by the Free Software Foundation; either version 2
8// of the License, or (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU General Public License
16// along with this program; if not, write to the Free Software
17// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18//
19// to contact the author, see the AUTHOR file
20/*********************************************************************/
21
25
34
35#ifndef PARALLEL_TRONCONNEUSE_HPP
36#define PARALLEL_TRONCONNEUSE_HPP
37
38#include "../my_config.h"
39#include <string>
40
41#include "infinint.hpp"
42#include "archive_version.hpp"
43#include "crypto_segment.hpp"
44#include "heap.hpp"
45#include "crypto_module.hpp"
46#include "proto_tronco.hpp"
47
48#include <libthreadar/libthreadar.hpp>
49
50namespace libdar
51{
52
55
56 // those class are used by the parallel_tronconneuse class to wrap the different
57 // type of threads. They are defined just after the parallel_tronconneuse definition
58 class read_below;
59 class write_below;
60 class crypto_worker;
61
63
64 enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
65
66
68 //
69 // the parallel_tronconneuse class that orchestrate all that
70 //
71 //
72
73
75
87
88 class parallel_tronconneuse : public proto_tronco
89 {
90 public:
92
101 U_32 block_size,
102 generic_file & encrypted_side,
104 std::unique_ptr<crypto_module> & ptr);
105
108
111
114
117
120
122 virtual bool skippable(skippability direction, const infinint & amount) override;
124 virtual bool skip(const infinint & pos) override;
126 virtual bool skip_to_eof() override;
128 virtual bool skip_relative(S_I x) override;
130 virtual bool truncatable(const infinint & pos) const override { return false; };
132 virtual infinint get_position() const override { if(is_terminated()) throw SRC_BUG; return current_position; };
133
135
140 virtual void write_end_of_file() override { if(is_terminated()) throw SRC_BUG; sync_write(); };
141
142
144
145 virtual void set_initial_shift(const infinint & x) override;
146
151
153 virtual U_32 get_clear_block_size() const override { return clear_block_size; };
154
155 private:
156
157 // inherited from generic_file
158
160 virtual void inherited_read_ahead(const infinint & amount) override;
161
163 virtual U_I inherited_read(char *a, U_I size) override;
164
166
168 virtual void inherited_write(const char *a, U_I size) override;
169
171
174 virtual void inherited_truncate(const infinint & pos) override { throw SRC_BUG; };
175
177 virtual void inherited_sync_write() override;
178
179
181 virtual void inherited_flush_read() override;
182
184 virtual void inherited_terminate() override;
185
186 const archive_version & get_reading_version() const { return reading_ver; };
187
188 // internal data structure
189 enum class thread_status { running, suspended, dead };
190
191 // the fields
192
198 std::unique_ptr<crypto_module> crypto;
199 infinint (*mycallback)(generic_file & below, const archive_version & reading_ver);
200 generic_file* encrypted;
201
202 // fields used to represent possible status of subthreads and communication channel (the pipe)
203
205 thread_status t_status;
206
207
208 // the following stores data from the ratelier_gather to be provided for read() operation
209 // the lus_data/lus_flags is what is extracted from the ratelier_gather, both constitute
210 // the feedback channel from sub-threads to provide order acks and normal data
211
212 std::deque<std::unique_ptr<crypto_segment> > lus_data;
213 std::deque<signed int> lus_flags;
214 bool lus_eof;
216
217 // the following stores data going to ratelier_scatter for the write() operation
218
219 std::unique_ptr<crypto_segment> tempo_write;
220 infinint block_num;
221
222 // the datastructures shared among threads
223
224 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226 std::shared_ptr<libthreadar::barrier> waiter;
227 std::shared_ptr<heap<crypto_segment> > tas;
228
229 // the child threads
230
231 std::deque<std::unique_ptr<crypto_worker> > travailleur;
232 std::unique_ptr<read_below> crypto_reader;
233 std::unique_ptr<write_below> crypto_writer;
234
235
236
238
249 bool send_read_order(tronco_flags order, const infinint & for_offset = 0);
250
253
255 void go_read();
256
259
261
268
270
279 bool purge_unack_stop_order(const infinint & pos = 0);
280
282
292
295
298
301
304
305
306 static U_I get_ratelier_size(U_I num_worker) { return num_worker + num_worker/2; };
307 static U_I get_heap_size(U_I num_worker);
308 };
309
310
312 //
313 // read_below subthread used by parallel_tronconneuse
314 // to dispatch chunk of encrypted data to the workers
315 //
316
317 class read_below: public libthreadar::thread
318 {
319 public:
320 read_below(const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
321 const std::shared_ptr<libthreadar::barrier> & waiter,
322 U_I num_workers,
323 U_I clear_block_size,
324 generic_file* encrypted_side,
325 const std::shared_ptr<heap<crypto_segment> > xtas,
326 infinint init_shift):
327 workers(to_workers),
328 waiting(waiter),
329 num_w(num_workers),
330 clear_buf_size(clear_block_size),
331 encrypted(encrypted_side),
332 tas(xtas),
333 initial_shift(init_shift),
334 reof(false),
335 trailing_clear_data(nullptr)
336 { flag = tronco_flags::normal; };
337
338 ~read_below() { if(ptr) tas->put(move(ptr)); kill(); join(); };
339
343 void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) { trailing_clear_data = call_back; };
344
345 // *** //
346 // *** the method above should not be used anymore once the thread is running *** //
347 // *** //
348
350 void set_initial_shift(const infinint & x) { initial_shift = x; };
351
353
358 void set_pos(const infinint & pos) { skip_to = pos; };
359
361
370 void set_flag(tronco_flags val) { flag = val; };
371
373
378 const infinint & get_clear_flow_start() const { return clear_flow_start; };
379
381
386 const infinint & get_pos_in_flow() const { return pos_in_flow; };
387
388
389 protected:
390 virtual void inherited_run() override;
391
392 private:
393 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
394 std::shared_ptr<libthreadar::barrier> waiting;
395 U_I num_w;
396 U_I clear_buf_size;
397 generic_file* encrypted;
398 archive_version version;
399 std::shared_ptr<heap<crypto_segment> > tas;
400 infinint initial_shift;
401 bool reof;
402 trailing_clear_data_callback trailing_clear_data;
403 std::unique_ptr<crypto_segment> ptr;
404 infinint index_num;
405
406
407 // initialized by inherited_run() / get_ready_for_new_offset()
408
409 U_I encrypted_buf_size;
410
411 // fields accessible by both the caller and the read_below thread
412
413 infinint skip_to;
414 tronco_flags flag;
415 infinint clear_flow_start;
416 infinint pos_in_flow;
417
418 void work();
419 infinint get_ready_for_new_offset();
420 void send_flag_to_workers(tronco_flags theflag);
421
422 // same function as the tronconneuse::position_clear2crypt
423 void position_clear2crypt(const infinint & pos,
424 infinint & file_buf_start,
425 infinint & clear_buf_start,
426 infinint & pos_in_buf,
427 infinint & block_num);
428
429 };
430
431
433 //
434 // write_below subthread used by parallel_tronconneuse
435 // to gather and write down encrypted data work from workers
436 //
437
438 class write_below: public libthreadar::thread
439 {
440 public:
441 write_below(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
442 const std::shared_ptr<libthreadar::barrier> & waiter,
443 U_I num_workers,
444 generic_file* encrypted_side,
445 const std::shared_ptr<heap<crypto_segment> > xtas):
446 workers(from_workers),
447 waiting(waiter),
448 num_w(num_workers),
449 cur_num_w(0),
450 encrypted(encrypted_side),
451 tas(xtas),
452 error(false),
453 error_block(0)
454 { if(encrypted == nullptr) throw SRC_BUG; };
455
456 ~write_below() { kill(); join(); };
457
458 bool exception_pending() const { return error; };
459 const infinint & get_error_block() const { return error_block; };
460
461 protected:
462 virtual void inherited_run() override;
463
464 private:
465 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
466 std::shared_ptr<libthreadar::barrier> waiting;
467 U_I num_w;
468 U_I cur_num_w;
469 generic_file* encrypted;
470 std::shared_ptr<heap<crypto_segment> > tas;
471 bool error;
472 infinint error_block; // last crypto block before error
473 std::deque<std::unique_ptr<crypto_segment> >ones;
474 std::deque<signed int> flags;
475
476 void work();
477 };
478
479
481 //
482 // the crypto_worker threads performing ciphering/deciphering
483 // of many data blocks in parallel
484 //
485
486
487 class crypto_worker: public libthreadar::thread
488 {
489 public:
490 crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
491 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
492 std::shared_ptr<libthreadar::barrier> waiter,
493 std::unique_ptr<crypto_module> && ptr,
494 bool encrypt):
495 reader(read_side),
496 writer(write_side),
497 waiting(waiter),
498 crypto(move(ptr)),
499 do_encrypt(encrypt),
500 abort(status::fine)
501 { if(!reader || !writer || !waiting || !crypto) throw SRC_BUG; };
502
503 virtual ~crypto_worker() { kill(); join(); };
504
505 protected:
506 virtual void inherited_run() override;
507
508 private:
509 enum class status { fine, inform, sent };
510
511 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
512 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
513 std::shared_ptr<libthreadar::barrier> waiting;
514 std::unique_ptr<crypto_module> crypto;
515 bool do_encrypt; // if false do decrypt
516 std::unique_ptr<crypto_segment> ptr;
517 unsigned int slot;
518 status abort;
519
520 void work();
521 };
522
523
525
526} // end of namespace
527
528#endif
class archive_version that rules which archive format to follow
class archive_version manages the version of the archive format
this is the interface class from which all other data transfer classes inherit
bool is_terminated() const
void sync_write()
write any pending data
the arbitrary large positive integer class
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
void go_read()
wake up threads in read mode when necessary
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void run_threads()
reset the interthread datastructure and launch the threads
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual infinint get_position() const override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
parallel_tronconneuse(parallel_tronconneuse &&ref)=default
move constructor
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
virtual bool skip_to_eof() override
inherited from generic_file
archive_version reading_ver
archive format we follow
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual bool skip(const infinint &pos) override
inherited from generic_file
~parallel_tronconneuse() noexcept
destructor
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
virtual bool skip_relative(S_I x) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse
bool check_bytes_to_skip
whether to check for bytes to skip
void send_write_order(tronco_flags order)
send order in write mode
parallel_tronconneuse(const parallel_tronconneuse &ref)=delete
copy constructor
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data
U_32 clear_block_size
size of a clear block
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
U_I num_workers
number of worker threads
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
thread_status t_status
wehther child thread are waiting us on the barrier
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write,...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
per block cryptography implementation
defines unit block of information ciphered as once
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
include macro defined by the configure script and some specific additional ones
libdar namespace encapsulate all libdar symbols
Definition archive.hpp:47
defines common interface for tronconneuse and parallel_tronconneuse