jt
2021-06-10 5d0d028456874576560552f5a5c4e8b801786f11
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
//#define Trace
 
// ParallelBZip2OutputStream.cs
// ------------------------------------------------------------------
//
// Copyright (c) 2011 Dino Chiesa.
// All rights reserved.
//
// This code module is part of DotNetZip, a zipfile class library.
//
// ------------------------------------------------------------------
//
// This code is licensed under the Microsoft Public License.
// See the file License.txt for the license details.
// More info on: http://dotnetzip.codeplex.com
//
// ------------------------------------------------------------------
//
// Last Saved: <2011-August-02 16:44:24>
//
// ------------------------------------------------------------------
//
// This module defines the ParallelBZip2OutputStream class, which is a
// BZip2 compressing stream. This code was derived in part from Apache
// commons source code. The license below applies to the original Apache
// code.
//
// ------------------------------------------------------------------
// flymake: csc.exe /t:module BZip2InputStream.cs BZip2Compressor.cs Rand.cs BCRC32.cs @@FILE@@
 
 
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
 
 
// Design Notes:
//
// This class follows the classic Decorator pattern: it is a Stream that
// wraps itself around a Stream, and in doing so provides bzip2
// compression as callers Write into it.  It is exactly the same in
// outward function as the BZip2OutputStream, except that this class can
// perform compression using multiple independent threads. Because of
// that, and because of the CPU-intensive nature of BZip2 compression,
// this class can perform significantly better (in terms of wall-click
// time) than the single-threaded variant, at the expense of memory and
// CPU utilization.
//
// BZip2 is a straightforward data format: there are 4 magic bytes at
// the top of the file, followed by 1 or more compressed blocks. There
// is a small "magic byte" trailer after all compressed blocks.
//
// In concept parallelizing BZip2 is simple: do the CPU-intensive
// compression for each block in a separate thread, then emit the
// compressed output, in order, to the output stream. Each block can be
// compressed independently, so a block is the natural candidate for the
// parcel of work that can be passed to an independent worker thread.
//
// The design approach used here is simple: within the Write() method of
// the stream, fill a block.  When the block is full, pass it to a
// background worker thread for compression.  When the compressor thread
// completes its work, the main thread (the application thread that
// calls Write()) can send the compressed data to the output stream,
// being careful to respect the order of the compressed blocks.
//
// The challenge of ordering the compressed data is a solved and
// well-understood problem - it is the same approach here as DotNetZip
// uses in the ParallelDeflateOutputStream. It is a map/reduce approach
// in design intent.
//
// One new twist for BZip2 is that the compressor output is not
// byte-aligned. In other words the final output of a compressed block
// will in general be a number of bits that is not a multiple of
// 8. Therefore, combining the ordered results of the N compressor
// threads requires additional byte-shredding by the parent
// stream. Hence this stream uses a BitWriter to adapt bit-oriented
// BZip2 output to the byte-oriented .NET Stream.
//
// The approach used here creates N instances of the BZip2Compressor
// type, where N is governed by the number of cores (cpus) and limited
// by the MaxWorkers property exposed by this class. Each
// BZip2Compressor instance gets its own MemoryStream, to which it
// writes its data, via a BitWriter.
//
// along with the bit accumulator described above. The MemoryStream
// would gather the byte-aligned compressed output of the compressor.
 
// When reducing the output of the various workers, this class must
// again do the byte-shredding thing. The data from the compressors is
// therefore shredded twice: once when being placed into the
// MemoryStream, and again when emitted into the final output stream
// that this class decorates. This is an unfortunate and seemingly
// unavoidable inefficiency. Two rounds of byte-shredding will use more
// CPU than we'd like, but I haven't imagined a way to avoid it.
//
// The BZip2Compressor is designed to write directly into the parent
// stream's accumulator (BitWriter) when possible, and write into a
// distinct BitWriter when necessary.  The former can be used in a
// single-thread scenario, while the latter is required in a
// multi-thread scenario.
//
// ----
//
// Regarding the Apache code base: Most of the code in this particular
// class is related to stream operations and thread synchronization, and
// is my own code. It largely does not rely on any code obtained from
// Apache commons. If you compare this code with the Apache commons
// BZip2OutputStream, you will see very little code that is common,
// except for the nearly-boilerplate structure that is common to all
// subtypes of System.IO.Stream. There may be some small remnants of
// code in this module derived from the Apache stuff, which is why I
// left the license in here. Most of the Apache commons compressor magic
// has been ported into the BZip2Compressor class.
//
 
using System;
using System.IO;
using System.Collections.Generic;
using System.Threading;
 
namespace HH.WMS.Utils.Ionic.BZip2
{
    internal class WorkItem
    {
        public int index;
        public BZip2Compressor Compressor { get; private set; }
        public MemoryStream ms;
        public int ordinal;
        public BitWriter bw;
 
        public WorkItem(int ix, int blockSize)
        {
            // compressed data gets written to a MemoryStream
            this.ms = new MemoryStream();
            this.bw = new BitWriter(ms);
            this.Compressor = new BZip2Compressor(bw, blockSize);
            this.index = ix;
        }
    }
 
 
    /// <summary>
    ///   A write-only decorator stream that compresses data as it is
    ///   written using the BZip2 algorithm. This stream compresses by
    ///   block using multiple threads.
    /// </summary>
    /// <para>
    ///   This class performs BZIP2 compression through writing.  For
    ///   more information on the BZIP2 algorithm, see
    ///   <see href="http://en.wikipedia.org/wiki/BZIP2"/>.
    /// </para>
    ///
    /// <para>
    ///   This class is similar to <see cref="HH.WMS.Utils.Ionic.BZip2.BZip2OutputStream"/>,
    ///   except that this implementation uses an approach that employs multiple
    ///   worker threads to perform the compression.  On a multi-cpu or multi-core
    ///   computer, the performance of this class can be significantly higher than
    ///   the single-threaded BZip2OutputStream, particularly for larger streams.
    ///   How large?  Anything over 10mb is a good candidate for parallel
    ///   compression.
    /// </para>
    ///
    /// <para>
    ///   The tradeoff is that this class uses more memory and more CPU than the
    ///   vanilla <c>BZip2OutputStream</c>. Also, for small files, the
    ///   <c>ParallelBZip2OutputStream</c> can be much slower than the vanilla
    ///   <c>BZip2OutputStream</c>, because of the overhead associated to using the
    ///   thread pool.
    /// </para>
    ///
    /// <seealso cref="HH.WMS.Utils.Ionic.BZip2.BZip2OutputStream" />
    public class ParallelBZip2OutputStream : System.IO.Stream
    {
        private static readonly int BufferPairsPerCore = 4;
        private int _maxWorkers;
        private bool firstWriteDone;
        private int lastFilled;
        private int lastWritten;
        private int latestCompressed;
        private int currentlyFilling;
        private volatile Exception pendingException;
        private bool handlingException;
        private bool emitting;
        private System.Collections.Generic.Queue<int>     toWrite;
        private System.Collections.Generic.Queue<int>     toFill;
        private System.Collections.Generic.List<WorkItem> pool;
        private object latestLock = new object();
        private object eLock = new object(); // for exceptions
        private object outputLock = new object(); // for multi-thread output
        private AutoResetEvent newlyCompressedBlob;
 
        long totalBytesWrittenIn;
        long totalBytesWrittenOut;
        bool leaveOpen;
        uint combinedCRC;
        Stream output;
        BitWriter bw;
        int blockSize100k;  // 0...9
 
        private TraceBits desiredTrace = TraceBits.Crc | TraceBits.Write;
 
        /// <summary>
        ///   Constructs a new <c>ParallelBZip2OutputStream</c>, that sends its
        ///   compressed output to the given output stream.
        /// </summary>
        ///
        /// <param name='output'>
        ///   The destination stream, to which compressed output will be sent.
        /// </param>
        ///
        /// <example>
        ///
        ///   This example reads a file, then compresses it with bzip2 file,
        ///   and writes the compressed data into a newly created file.
        ///
        ///   <code>
        ///   var fname = "logfile.log";
        ///   using (var fs = File.OpenRead(fname))
        ///   {
        ///       var outFname = fname + ".bz2";
        ///       using (var output = File.Create(outFname))
        ///       {
        ///           using (var compressor = new HH.WMS.Utils.Ionic.BZip2.ParallelBZip2OutputStream(output))
        ///           {
        ///               byte[] buffer = new byte[2048];
        ///               int n;
        ///               while ((n = fs.Read(buffer, 0, buffer.Length)) > 0)
        ///               {
        ///                   compressor.Write(buffer, 0, n);
        ///               }
        ///           }
        ///       }
        ///   }
        ///   </code>
        /// </example>
        public ParallelBZip2OutputStream(Stream output)
            : this(output, BZip2.MaxBlockSize, false)
        {
        }
 
        /// <summary>
        ///   Constructs a new <c>ParallelBZip2OutputStream</c> with specified blocksize.
        /// </summary>
        /// <param name = "output">the destination stream.</param>
        /// <param name = "blockSize">
        ///   The blockSize in units of 100000 bytes.
        ///   The valid range is 1..9.
        /// </param>
        public ParallelBZip2OutputStream(Stream output, int blockSize)
            : this(output, blockSize, false)
        {
        }
 
        /// <summary>
        ///   Constructs a new <c>ParallelBZip2OutputStream</c>.
        /// </summary>
        ///   <param name = "output">the destination stream.</param>
        /// <param name = "leaveOpen">
        ///   whether to leave the captive stream open upon closing this stream.
        /// </param>
        public ParallelBZip2OutputStream(Stream output, bool leaveOpen)
            : this(output, BZip2.MaxBlockSize, leaveOpen)
        {
        }
 
        /// <summary>
        ///   Constructs a new <c>ParallelBZip2OutputStream</c> with specified blocksize,
        ///   and explicitly specifies whether to leave the wrapped stream open.
        /// </summary>
        ///
        /// <param name = "output">the destination stream.</param>
        /// <param name = "blockSize">
        ///   The blockSize in units of 100000 bytes.
        ///   The valid range is 1..9.
        /// </param>
        /// <param name = "leaveOpen">
        ///   whether to leave the captive stream open upon closing this stream.
        /// </param>
        public ParallelBZip2OutputStream(Stream output, int blockSize, bool leaveOpen)
        {
            if (blockSize < BZip2.MinBlockSize || blockSize > BZip2.MaxBlockSize)
            {
                var msg = String.Format("blockSize={0} is out of range; must be between {1} and {2}",
                                        blockSize,
                                        BZip2.MinBlockSize, BZip2.MaxBlockSize);
                throw new ArgumentException(msg, "blockSize");
            }
 
            this.output = output;
            if (!this.output.CanWrite)
                throw new ArgumentException("The stream is not writable.", "output");
 
            this.bw = new BitWriter(this.output);
            this.blockSize100k = blockSize;
            this.leaveOpen = leaveOpen;
            this.combinedCRC = 0;
            this.MaxWorkers = 16; // default
            EmitHeader();
        }
 
 
        private void InitializePoolOfWorkItems()
        {
            this.toWrite = new Queue<int>();
            this.toFill = new Queue<int>();
            this.pool = new System.Collections.Generic.List<WorkItem>();
            int nWorkers = BufferPairsPerCore * Environment.ProcessorCount;
            nWorkers = Math.Min(nWorkers, this.MaxWorkers);
            for(int i=0; i < nWorkers; i++)
            {
                this.pool.Add(new WorkItem(i, this.blockSize100k));
                this.toFill.Enqueue(i);
            }
 
            this.newlyCompressedBlob = new AutoResetEvent(false);
            this.currentlyFilling = -1;
            this.lastFilled = -1;
            this.lastWritten = -1;
            this.latestCompressed = -1;
        }
 
 
        /// <summary>
        ///   The maximum number of concurrent compression worker threads to use.
        /// </summary>
        ///
        /// <remarks>
        /// <para>
        ///   This property sets an upper limit on the number of concurrent worker
        ///   threads to employ for compression. The implementation of this stream
        ///   employs multiple threads from the .NET thread pool, via <see
        ///   cref="System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback)">
        ///   ThreadPool.QueueUserWorkItem()</see>, to compress the incoming data by
        ///   block.  As each block of data is compressed, this stream re-orders the
        ///   compressed blocks and writes them to the output stream.
        /// </para>
        ///
        /// <para>
        ///   A higher number of workers enables a higher degree of
        ///   parallelism, which tends to increase the speed of compression on
        ///   multi-cpu computers.  On the other hand, a higher number of buffer
        ///   pairs also implies a larger memory consumption, more active worker
        ///   threads, and a higher cpu utilization for any compression. This
        ///   property enables the application to limit its memory consumption and
        ///   CPU utilization behavior depending on requirements.
        /// </para>
        ///
        /// <para>
        ///   By default, DotNetZip allocates 4 workers per CPU core, subject to the
        ///   upper limit specified in this property. For example, suppose the
        ///   application sets this property to 16.  Then, on a machine with 2
        ///   cores, DotNetZip will use 8 workers; that number does not exceed the
        ///   upper limit specified by this property, so the actual number of
        ///   workers used will be 4 * 2 = 8.  On a machine with 4 cores, DotNetZip
        ///   will use 16 workers; again, the limit does not apply. On a machine
        ///   with 8 cores, DotNetZip will use 16 workers, because of the limit.
        /// </para>
        ///
        /// <para>
        ///   For each compression "worker thread" that occurs in parallel, there is
        ///   up to 2mb of memory allocated, for buffering and processing. The
        ///   actual number depends on the <see cref="BlockSize"/> property.
        /// </para>
        ///
        /// <para>
        ///   CPU utilization will also go up with additional workers, because a
        ///   larger number of buffer pairs allows a larger number of background
        ///   threads to compress in parallel. If you find that parallel
        ///   compression is consuming too much memory or CPU, you can adjust this
        ///   value downward.
        /// </para>
        ///
        /// <para>
        ///   The default value is 16. Different values may deliver better or
        ///   worse results, depending on your priorities and the dynamic
        ///   performance characteristics of your storage and compute resources.
        /// </para>
        ///
        /// <para>
        ///   The application can set this value at any time, but it is effective
        ///   only before the first call to Write(), which is when the buffers are
        ///   allocated.
        /// </para>
        /// </remarks>
        public int MaxWorkers
        {
            get
            {
                return _maxWorkers;
            }
            set
            {
                if (value < 4)
                    throw new ArgumentException("MaxWorkers",
                                                "Value must be 4 or greater.");
                _maxWorkers = value;
            }
        }
 
        /// <summary>
        ///   Close the stream.
        /// </summary>
        /// <remarks>
        ///   <para>
        ///     This may or may not close the underlying stream.  Check the
        ///     constructors that accept a bool value.
        ///   </para>
        /// </remarks>
        public override void Close()
        {
            if (this.pendingException != null)
            {
                this.handlingException = true;
                var pe = this.pendingException;
                this.pendingException = null;
                throw pe;
            }
 
            if (this.handlingException)
                return;
 
            if (output == null)
                return;
 
            Stream o = this.output;
 
            try
            {
                FlushOutput(true);
            }
            finally
            {
                this.output = null;
                this.bw = null;
            }
 
            if (!leaveOpen)
                o.Close();
        }
 
 
        private void FlushOutput(bool lastInput)
        {
            if (this.emitting) return;
 
            // compress and write whatever is ready
            if (this.currentlyFilling >= 0)
            {
                WorkItem workitem = this.pool[this.currentlyFilling];
                CompressOne(workitem);
                this.currentlyFilling = -1; // get a new buffer next Write()
            }
 
            if (lastInput)
            {
                EmitPendingBuffers(true, false);
                EmitTrailer();
            }
            else
            {
                EmitPendingBuffers(false, false);
            }
        }
 
 
 
        /// <summary>
        ///   Flush the stream.
        /// </summary>
        public override void Flush()
        {
            if (this.output != null)
            {
                FlushOutput(false);
                this.bw.Flush();
                this.output.Flush();
            }
        }
 
        private void EmitHeader()
        {
            var magic = new byte[] {
                (byte) 'B',
                (byte) 'Z',
                (byte) 'h',
                (byte) ('0' + this.blockSize100k)
            };
 
            // not necessary to shred the initial magic bytes
            this.output.Write(magic, 0, magic.Length);
        }
 
        private void EmitTrailer()
        {
            // A magic 48-bit number, 0x177245385090, to indicate the end
            // of the last block. (sqrt(pi), if you want to know)
 
            TraceOutput(TraceBits.Write, "total written out: {0} (0x{0:X})",
                        this.bw.TotalBytesWrittenOut);
 
            // must shred
            this.bw.WriteByte(0x17);
            this.bw.WriteByte(0x72);
            this.bw.WriteByte(0x45);
            this.bw.WriteByte(0x38);
            this.bw.WriteByte(0x50);
            this.bw.WriteByte(0x90);
 
            this.bw.WriteInt(this.combinedCRC);
 
            this.bw.FinishAndPad();
 
            TraceOutput(TraceBits.Write, "final total : {0} (0x{0:X})",
                        this.bw.TotalBytesWrittenOut);
        }
 
 
        /// <summary>
        ///   The blocksize parameter specified at construction time.
        /// </summary>
        public int BlockSize
        {
            get { return this.blockSize100k; }
        }
 
 
        /// <summary>
        ///   Write data to the stream.
        /// </summary>
        /// <remarks>
        ///
        /// <para>
        ///   Use the <c>ParallelBZip2OutputStream</c> to compress data while
        ///   writing: create a <c>ParallelBZip2OutputStream</c> with a writable
        ///   output stream.  Then call <c>Write()</c> on that
        ///   <c>ParallelBZip2OutputStream</c>, providing uncompressed data as
        ///   input.  The data sent to the output stream will be the compressed
        ///   form of the input data.
        /// </para>
        ///
        /// <para>
        ///   A <c>ParallelBZip2OutputStream</c> can be used only for
        ///   <c>Write()</c> not for <c>Read()</c>.
        /// </para>
        ///
        /// </remarks>
        ///
        /// <param name="buffer">The buffer holding data to write to the stream.</param>
        /// <param name="offset">the offset within that data array to find the first byte to write.</param>
        /// <param name="count">the number of bytes to write.</param>
        public override void Write(byte[] buffer, int offset, int count)
        {
            bool mustWait = false;
 
            // This method does this:
            //   0. handles any pending exceptions
            //   1. write any buffers that are ready to be written
            //   2. fills a compressor buffer; when full, flip state to 'Filled',
            //   3. if more data to be written,  goto step 1
 
            if (this.output == null)
                throw new IOException("the stream is not open");
 
            // dispense any exceptions that occurred on the BG threads
            if (this.pendingException != null)
            {
                this.handlingException = true;
                var pe = this.pendingException;
                this.pendingException = null;
                throw pe;
            }
 
            if (offset < 0)
                throw new IndexOutOfRangeException(String.Format("offset ({0}) must be > 0", offset));
            if (count < 0)
                throw new IndexOutOfRangeException(String.Format("count ({0}) must be > 0", count));
            if (offset + count > buffer.Length)
                throw new IndexOutOfRangeException(String.Format("offset({0}) count({1}) bLength({2})",
                                                                 offset, count, buffer.Length));
 
 
            if (count == 0) return;  // nothing to do
 
 
            if (!this.firstWriteDone)
            {
                // Want to do this on first Write, first session, and not in the
                // constructor.  Must allow the MaxWorkers to change after
                // construction, but before first Write().
                InitializePoolOfWorkItems();
                this.firstWriteDone = true;
            }
 
            int bytesWritten = 0;
            int bytesRemaining = count;
 
            do
            {
                // may need to make buffers available
                EmitPendingBuffers(false, mustWait);
 
                mustWait = false;
 
                // get a compressor to fill
                int ix = -1;
                if (this.currentlyFilling >= 0)
                {
                    ix = this.currentlyFilling;
                }
                else
                {
                    if (this.toFill.Count == 0)
                    {
                        // No compressors available to fill, so... need to emit
                        // compressed buffers.
                        mustWait = true;
                        continue;
                    }
 
                    ix = this.toFill.Dequeue();
                    ++this.lastFilled;
                }
 
                WorkItem workitem = this.pool[ix];
                workitem.ordinal = this.lastFilled;
 
                int n = workitem.Compressor.Fill(buffer, offset, bytesRemaining);
                if (n != bytesRemaining)
                {
                    if (!ThreadPool.QueueUserWorkItem( CompressOne, workitem ))
                        throw new Exception("Cannot enqueue workitem");
 
                    this.currentlyFilling = -1; // will get a new buffer next time
                    offset += n;
                }
                else
                    this.currentlyFilling = ix;
 
                bytesRemaining -= n;
                bytesWritten += n;
            }
            while (bytesRemaining > 0);
 
            totalBytesWrittenIn += bytesWritten;
            return;
        }
 
 
 
        private void EmitPendingBuffers(bool doAll, bool mustWait)
        {
            // When combining parallel compression with a ZipSegmentedStream, it's
            // possible for the ZSS to throw from within this method.  In that
            // case, Close/Dispose will be called on this stream, if this stream
            // is employed within a using or try/finally pair as required. But
            // this stream is unaware of the pending exception, so the Close()
            // method invokes this method AGAIN. This can lead to a deadlock.
            // Therefore, failfast if re-entering.
 
            if (emitting) return;
            emitting = true;
 
            if (doAll || mustWait)
                this.newlyCompressedBlob.WaitOne();
 
            do
            {
                int firstSkip = -1;
                int millisecondsToWait = doAll ? 200 : (mustWait ? -1 : 0);
                int nextToWrite = -1;
 
                do
                {
                    if (Monitor.TryEnter(this.toWrite, millisecondsToWait))
                    {
                        nextToWrite = -1;
                        try
                        {
                            if (this.toWrite.Count > 0)
                                nextToWrite = this.toWrite.Dequeue();
                        }
                        finally
                        {
                            Monitor.Exit(this.toWrite);
                        }
 
                        if (nextToWrite >= 0)
                        {
                            WorkItem workitem = this.pool[nextToWrite];
                            if (workitem.ordinal != this.lastWritten + 1)
                            {
                                // out of order. requeue and try again.
                                lock(this.toWrite)
                                {
                                    this.toWrite.Enqueue(nextToWrite);
                                }
 
                                if (firstSkip == nextToWrite)
                                {
                                    // We went around the list once.
                                    // None of the items in the list is the one we want.
                                    // Now wait for a compressor to signal again.
                                    this.newlyCompressedBlob.WaitOne();
                                    firstSkip = -1;
                                }
                                else if (firstSkip == -1)
                                    firstSkip = nextToWrite;
 
                                continue;
                            }
 
                            firstSkip = -1;
 
                            TraceOutput(TraceBits.Write,
                                        "Writing block {0}", workitem.ordinal);
 
                            // write the data to the output
                            var bw2 = workitem.bw;
                            bw2.Flush(); // not bw2.FinishAndPad()!
                            var ms = workitem.ms;
                            ms.Seek(0,SeekOrigin.Begin);
 
                            // cannot dump bytes!!
                            // ms.WriteTo(this.output);
                            //
                            // must do byte shredding:
                            int n;
                            int y = -1;
                            long totOut = 0;
                            var buffer = new byte[1024];
                            while ((n = ms.Read(buffer,0,buffer.Length)) > 0)
                            {
#if Trace
                                if (y == -1) // diagnostics only
                                {
                                    var sb1 = new System.Text.StringBuilder();
                                    sb1.Append("first 16 whole bytes in block: ");
                                    for (int z=0; z < 16; z++)
                                        sb1.Append(String.Format(" {0:X2}", buffer[z]));
                                    TraceOutput(TraceBits.Write, sb1.ToString());
                                }
#endif
                                y = n;
                                for (int k=0; k < n; k++)
                                {
                                    this.bw.WriteByte(buffer[k]);
                                }
                                totOut += n;
                            }
#if Trace
                            TraceOutput(TraceBits.Write,"out block length (bytes): {0} (0x{0:X})", totOut);
                            var sb = new System.Text.StringBuilder();
                            sb.Append("final 16 whole bytes in block: ");
                            for (int z=0; z < 16; z++)
                                sb.Append(String.Format(" {0:X2}", buffer[y-1-12+z]));
                            TraceOutput(TraceBits.Write, sb.ToString());
#endif
 
                            // and now any remaining bits
                            TraceOutput(TraceBits.Write,
                                        " remaining bits: {0} 0x{1:X}",
                                        bw2.NumRemainingBits,
                                        bw2.RemainingBits);
                            if (bw2.NumRemainingBits > 0)
                            {
                                this.bw.WriteBits(bw2.NumRemainingBits, bw2.RemainingBits);
                            }
 
                            TraceOutput(TraceBits.Crc," combined CRC (before): {0:X8}",
                                        this.combinedCRC);
                            this.combinedCRC = (this.combinedCRC << 1) | (this.combinedCRC >> 31);
                            this.combinedCRC ^= (uint) workitem.Compressor.Crc32;
 
                            TraceOutput(TraceBits.Crc,
                                        " block    CRC         : {0:X8}",
                                        workitem.Compressor.Crc32);
                            TraceOutput(TraceBits.Crc,
                                        " combined CRC (after) : {0:X8}",
                                        this.combinedCRC);
                            TraceOutput(TraceBits.Write,
                                        "total written out: {0} (0x{0:X})",
                                        this.bw.TotalBytesWrittenOut);
                            TraceOutput(TraceBits.Write | TraceBits.Crc, "");
 
                            this.totalBytesWrittenOut += totOut;
 
                            bw2.Reset();
                            this.lastWritten = workitem.ordinal;
                            workitem.ordinal = -1;
                            this.toFill.Enqueue(workitem.index);
 
                            // don't wait next time through
                            if (millisecondsToWait == -1) millisecondsToWait = 0;
                        }
                    }
                    else
                        nextToWrite = -1;
 
                } while (nextToWrite >= 0);
 
            } while (doAll && (this.lastWritten != this.latestCompressed));
 
            if (doAll)
            {
                TraceOutput(TraceBits.Crc,
                            " combined CRC (final) : {0:X8}", this.combinedCRC);
            }
 
            emitting = false;
        }
 
 
        private void CompressOne(Object wi)
        {
            // compress and one buffer
            WorkItem workitem = (WorkItem) wi;
            try
            {
                // compress and write to the compressor's MemoryStream
                workitem.Compressor.CompressAndWrite();
 
                lock(this.latestLock)
                {
                    if (workitem.ordinal > this.latestCompressed)
                        this.latestCompressed = workitem.ordinal;
                }
                lock (this.toWrite)
                {
                    this.toWrite.Enqueue(workitem.index);
                }
                this.newlyCompressedBlob.Set();
            }
            catch (System.Exception exc1)
            {
                lock(this.eLock)
                {
                    // expose the exception to the main thread
                    if (this.pendingException!=null)
                        this.pendingException = exc1;
                }
            }
        }
 
 
 
 
        /// <summary>
        /// Indicates whether the stream can be read.
        /// </summary>
        /// <remarks>
        /// The return value is always false.
        /// </remarks>
        public override bool CanRead
        {
            get { return false; }
        }
 
        /// <summary>
        /// Indicates whether the stream supports Seek operations.
        /// </summary>
        /// <remarks>
        /// Always returns false.
        /// </remarks>
        public override bool CanSeek
        {
            get { return false; }
        }
 
        /// <summary>
        /// Indicates whether the stream can be written.
        /// </summary>
        /// <remarks>
        /// The return value depends on whether the captive stream supports writing.
        /// </remarks>
        public override bool CanWrite
        {
            get
            {
                if (this.output == null) throw new ObjectDisposedException("BZip2Stream");
                return this.output.CanWrite;
            }
        }
 
        /// <summary>
        /// Reading this property always throws a <see cref="NotImplementedException"/>.
        /// </summary>
        public override long Length
        {
            get { throw new NotImplementedException(); }
        }
 
        /// <summary>
        /// The position of the stream pointer.
        /// </summary>
        ///
        /// <remarks>
        ///   Setting this property always throws a <see
        ///   cref="NotImplementedException"/>. Reading will return the
        ///   total number of uncompressed bytes written through.
        /// </remarks>
        public override long Position
        {
            get
            {
                return this.totalBytesWrittenIn;
            }
            set { throw new NotImplementedException(); }
        }
 
        /// <summary>
        /// The total number of bytes written out by the stream.
        /// </summary>
        /// <remarks>
        /// This value is meaningful only after a call to Close().
        /// </remarks>
        public Int64 BytesWrittenOut { get { return totalBytesWrittenOut; } }
 
        /// <summary>
        /// Calling this method always throws a <see cref="NotImplementedException"/>.
        /// </summary>
        /// <param name="offset">this is irrelevant, since it will always throw!</param>
        /// <param name="origin">this is irrelevant, since it will always throw!</param>
        /// <returns>irrelevant!</returns>
        public override long Seek(long offset, System.IO.SeekOrigin origin)
        {
            throw new NotImplementedException();
        }
 
        /// <summary>
        /// Calling this method always throws a <see cref="NotImplementedException"/>.
        /// </summary>
        /// <param name="value">this is irrelevant, since it will always throw!</param>
        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }
 
        /// <summary>
        /// Calling this method always throws a <see cref="NotImplementedException"/>.
        /// </summary>
        /// <param name='buffer'>this parameter is never used</param>
        /// <param name='offset'>this parameter is never used</param>
        /// <param name='count'>this parameter is never used</param>
        /// <returns>never returns anything; always throws</returns>
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
 
 
        // used only when Trace is defined
        [Flags]
        enum TraceBits : uint
        {
            None         = 0,
            Crc          = 1,
            Write        = 2,
            All          = 0xffffffff,
        }
 
 
        [System.Diagnostics.ConditionalAttribute("Trace")]
        private void TraceOutput(TraceBits bits, string format, params object[] varParams)
        {
            if ((bits & this.desiredTrace) != 0)
            {
                lock(outputLock)
                {
                    int tid = Thread.CurrentThread.GetHashCode();
#if !SILVERLIGHT
                    Console.ForegroundColor = (ConsoleColor) (tid % 8 + 10);
#endif
                    Console.Write("{0:000} PBOS ", tid);
                    Console.WriteLine(format, varParams);
#if !SILVERLIGHT
                    Console.ResetColor();
#endif
                }
            }
        }
 
    }
 
}