Stream merge

From Rosetta Code
Revision as of 17:41, 20 June 2016 by rosettacode>Gerard Schildberger (added highlighting to the task's preamble.)
Stream merge
You are encouraged to solve this task according to the task description, using any language you may know.
2-stream merge
Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
N-stream merge
The same as above, but reading from   N   sources.
Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.

Assume streams are very big. You must not suck them whole in the memory, but read them as streams.


NB, all the files (including the output files) must exist before running this. The output files are overwritten with the merged records. <lang algol68># merge a number of input files to an output file # PROC mergenf = ( []REF FILE inf, REF FILE out )VOID:

       INT        eof count := 0;
       BOOL       at eof    := FALSE;
       []REF FILE inputs     = inf[ AT 1 ];
       INT   number of files = UPB inputs;
       [ number of files ]BOOL eof;
       [ number of files ]STRING line;
       FOR f TO number of files DO
           eof[ f ] := FALSE;
           on logical file end( inf[ f ], ( REF FILE f )BOOL:
                                              # note that we reached EOF on the latest read #
                                              # and return TRUE so processing can continue #
                                              at eof := TRUE
       # read a line from one of the input files                              #
       PROC read line = ( INT file number )VOID:
                at eof := FALSE;
                get( inputs[ file number ], ( line[ file number ], newline ) );
                eof[ file number ] := at eof;
                IF at eof THEN
                    # reached eof on this file                                #
                    eof count +:= 1
            END; # read line #
       # get the first line from each input file                              #
       FOR f TO number of files DO read line( f ) OD;
       # merge the files                                                      #
       WHILE eof count < number of files DO
           # find the lowest line in the current set                          #
           INT    low pos     := 0;
           STRING low line    := "";
           BOOL   first file  := TRUE;
           FOR file pos TO number of files DO
               IF eof[ file pos ] THEN
                   # file is at eof - ignore it                               #
               ELIF first file THEN
                   # this is the first file not at eof                        #
                   low pos    := file pos;
                   low line   := line[ file pos ];
                   first file := FALSE
               ELIF line[ file pos ] < low line THEN
                   # this line is lower than the previous one                 #
                   low pos    := file pos;
                   low line   := line[ file pos ]
           # write the record from the lowest file and get the next record    #
           # from it                                                          #
           put( out, ( line[ low pos ], newline ) );
           read line( low pos )
    END; # mergenf #
  1. merges the files named in input list, the results are written to the file #
  2. named output name #
  3. the output file must already exist and will be overwritten #

PROC mergen = ( []STRING input list, STRING output name )VOID:

       []STRING inputs       = input list[ AT 1 ];
       INT number of files   = UPB inputs;
       [ number of files ]REF FILE inf;
       # open the input files                                                  #
       FOR f TO number of files DO
            inf[ f ] := LOC FILE;
            IF  open( inf[ f ], inputs[ f ], stand in channel ) /= 0
                # failed to open the input file #
                print( (  "Unable to open """ + input list[ f ] + """", newline ) );
       # open the output file (which must already exist & will be overwritten) #
       IF FILE output file;
          open( output file, output name, stand out channel ) /= 0
           # failed to open the output file #
           print( (  "Unable to open """ + output name + """", newline ) );
           # files opened OK, merge them #
           mergenf( inf, output file );
           # close the files #
           close( output file );
           FOR f TO number of files DO close( inf[ f ] ) OD
    END; # mergen #
  1. merges the two files in1 and in2 to output file #

PROC merge2f = ( REF FILE in1, REF FILE in2, REF FILE output file )VOID: mergenf( ( in1, in2 ), output file );

  1. merges the two files named in1 and in2 to the file named output file #

PROC merge2 = ( STRING in1, STRING in2, STRING output file )VOID: mergen( ( in1, in2 ), output file );

  1. test the file merge #

merge2( "in1.txt", "in2.txt", "out2.txt" ); mergen( ( "in1.txt", "in2.txt", "in3.txt", "in4.txt" ), "outn.txt" )</lang>



This is a classic problem, but even so, Fortran does not supply a library routine for this. So...<lang Fortran> SUBROUTINE FILEMERGE(N,INF,OUTF) !Merge multiple inputs into one output.

      INTEGER N	!The number of input files.
      INTEGER INF(*)	!Their unit numbers.
      INTEGER OUTF	!The output file.
      INTEGER L(N)	!The length of each current record.
      INTEGER LIST(0:N)!In sorted order.
      LOGICAL LIVE(N)	!Until end-of-file.
      INTEGER ENUFF		!As ever, how long is a piece of string?
      PARAMETER (ENUFF = 666)	!Perhaps this will suffice.
      CHARACTER*(ENUFF) AREC(N)!One for each input file.
      INTEGER I,IT	!Assistants.
       LIST = 0	!LIST(0) fingers the leader.
       LIVE = .TRUE.	!All files are presumed live.

Charge the battery.

       DO I = 1,N	!Taste each.
         CALL GRAB(I)		!By obtaining the first record.
       END DO		!Also, preparing the LIST.

Chug away.

       DO WHILE(LIST(0).GT.0)	!Have we a leader?
         IT = LIST(0)		!Yes. Which is it?
         WRITE (OUTF,"(A)") AREC(IT)(1:L(IT))	!Send it forth.
         LIST(0) = LIST(IT)	!Head to the leader's follower.
         CALL GRAB(IT)		!Get the next candidate.
       END DO			!Try again.
      CONTAINS	!An assistant, called in two places.
       SUBROUTINE GRAB(IN)	!Get another record.
        INTEGER IN		!From this input file.
        INTEGER IT,P		!Linked-list stepping.
         IF (.NOT.LIVE(IN)) RETURN	!No more grist?
         READ (INF(IN),1,END = 10) L(IN),AREC(IN)(1:MIN(ENUFF,L(IN)))	!Burp.
   1     FORMAT (Q,A)		!Q = "length remaining", obviously.

Consider the place of AREC(IN) in the LIST. Entry LIST(IN) is to be linked back in.

         P = 0		!Finger the head of the LIST.
   2     IT = LIST(P)		!Which supplier is fingered?
         IF (IT.GT.0) THEN	!If we're not at the end,
           IF (AREC(IN)(1:L(IN)).GT.AREC(IT)(1:L(IT))) THEN	!Compare.
             P = IT			!The incomer follows this node.
             GO TO 2			!So, move to IT and check afresh.
           END IF		!So much for the comparison.
         END IF	!The record from supplier IN is to precede that from IT, fingered by LIST(P).
         LIST(IN) = IT		!So, IN's follower is IT.
         LIST(P) = IN		!And P's follower is now IN.
         RETURN	!Done.
  10     LIVE(IN) = .FALSE.	!No further input.
         LIST(IN) = -666	!This will cause trouble if accessed.
       END SUBROUTINE GRAB	!Grab input, and jostle for position.
     PARAMETER (MANY = 4)	!Sufficient?
     DATA FNAME/"FileAppend.for","FileChop.for",
    1 "FileExt.for","FileHack.for"/
     F = 10	!Safely past pre-defined unit numbers.
     OPEN (F,FILE="Merged.txt",STATUS="REPLACE",ACTION="WRITE")	!File for output.
     DO I = 1,MANY	!Go for the input files.
       FI(I) = F + I		!Choose another unit number.
     END DO		!On to the next.
     CALL FILEMERGE(MANY,FI,F)	!E pluribus unum.
     END	!That was easy.</lang>

Obviously, there would be variations according to the nature of the data streams being merged, and whatever sort key was involved. For this example, input from disc files will do and the sort key is the entire record's text. This means there is no need to worry over the case where, having written a record from stream S and obtained the next record from stream S, it proves to have equal precedence with the waiting record for some other stream. Which now should take precedence? With entirely-equal records it obviously doesn't matter but if the sort key is only partial then different record content could be deemed equal and then a choice has an effect.

The method is straightforward: with a linked-list of stream source identifiers (here, indices to an array INF of unit numbers, so the values are 1,2,3,...N) ordered by the current record content, send forth the head element and obtain the next record from that stream, inserting its entry into the linked-list according to precedence. There is no requirement that each input stream presents its records in sorted order. The key advantage of the linked-list is that when an input stream runs dry, its entry vanishes from the linked-list, having been unlinked when its record was written out. For the case N = 2, rather than write a special version with maddening compound tests, just use the general routine.

The problem with linked-lists is that each time a new record for stream S is to be positioned, the linked-list has to be searched linearly. One could instead maintain an array XLIST fingering the streams in sorted order, which array allows random access and thus (say) a binary search. However, each time, the entry for S must be removed and XLIST compacted for the search, then, when its position is determined, it must be re-inserted after space has been made. Alternatively, an insertion sort could be used and again, there would be many array accesses.

The source file style is F77 except for the usage of an array having an element zero. One could play about with offsets to achieve the effect with an array starting at one, but F90 standardised the availability of specified lower bounds. A further requirement for F90 is that subroutine FILEMERGE declares arrays of size N, to suit the size of the problem. Older Fortrans do not allow this as standard (despite Algol allowing it from the start in the 1960s) so either the arrays have to be declared "surely big enough" or else they could be supplied as additional parameters by the caller, whose problem that becomes. Similarly, the maximum record size is unknown, so ENUFF = 666 seems "surely big enough", at least for this test. Without the Q format code, annoyances expand for any attempt at generality.

The source for subroutine GRAB is within subroutine FILEMERGE for the convenience in sharing and messing with variables important to both, but not to outsiders. This facility is standard in Algol-following languages but often omitted and was not added to Fortran until F90. In its absence, either more parameters are required for the separate routines, or there will be messing with COMMON storage areas.


<lang go>package main

import (



var s1 = "3 14 15" var s2 = "2 17 18" var s3 = "" var s4 = "2 3 5 7"

func main() {

   fmt.Print("merge2: ")
   fmt.Print("mergeN: ")


func r1(r io.Reader) (v int, ok bool) {

   switch _, err := fmt.Fscan(r, &v); {
   case err == nil:
       return v, true
   case err != io.EOF:


func merge2(m io.Writer, s1, s2 io.Reader) {

   v1, d1 := r1(s1)
   v2, d2 := r1(s2)
   var v int
   for d1 || d2 {
       if !d2 || d1 && v1 < v2 {
           v = v1
           v1, d1 = r1(s1)
       } else {
           v = v2
           v2, d2 = r1(s2)
       fmt.Fprint(m, v, " ")


type sv struct {

   s io.Reader
   v int


type sh []sv

func (s sh) Len() int { return len(s) } func (s sh) Less(i, j int) bool { return s[i].v < s[j].v } func (s sh) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (p *sh) Push(x interface{}) { *p = append(*p, x.(sv)) } func (p *sh) Pop() interface{} {

   s := *p
   last := len(s) - 1
   v := s[last]
   *p = s[:last]
   return v


func mergeN(m io.Writer, s {

   var h sh
   for _, s := range s {
       if v, d := r1(s); d {
           h = append(h, sv{s, v})
   for len(h) > 0 {
       p := heap.Pop(&h).(sv)
       fmt.Fprint(m, p.v, " ")
       if v, d := r1(p.s); d {
           heap.Push(&h, sv{p.s, v})


merge2: 2 3 14 15 17 18 
mergeN: 2 2 3 3 5 7 14 15 17 18 


There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.


<lang haskell>-- stack runhaskell --package=conduit-extra --package=conduit-merge

import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Char8 as BS import Data.Conduit (($$), (=$=)) import Data.Conduit.Binary (sinkHandle, sourceFile) import qualified Data.Conduit.Binary as Conduit import qualified Data.Conduit.List as Conduit import Data.Conduit.Merge (mergeSources) import System.Environment (getArgs) import System.IO (stdout)

main :: IO () main = do

   inputFileNames <- getArgs
   let inputs = [sourceFile file =$= Conduit.lines | file <- inputFileNames]
   runResourceT $ mergeSources inputs $$ sinkStdoutLn
   sinkStdoutLn = (`BS.snoc` '\n') =$= sinkHandle stdout</lang>

See implementation in


<lang haskell>-- stack runhaskell --package=pipes-safe --package=pipes-interleave

import Pipes (runEffect, (>->)) import Pipes.Interleave (interleave) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)

main :: IO () main = do

   sourceFileNames <- getArgs
   let sources = map readFile sourceFileNames
   runSafeT . runEffect $ interleave compare sources >-> stdoutLn</lang>

See implementation in

Perl 6

<lang perl6>sub merge_streams ( @streams ) {

   my @s ={ hash( STREAM => $_, HEAD => .get ) })\
                   .grep({ .<HEAD>.defined });
   return gather while @s {
       my $h = @s.min: *.<HEAD>;
       take $h<HEAD>;
       $h<HEAD> = $h<STREAM>.get
           orelse @s .= grep( { $_ !=== $h } );


say merge_streams([ @*ARGS».&open ]);</lang>


Built-in function open opens a file for reading and returns a line-by-line iterator (stream) over the file.

There exists a standard library function heapq.merge that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.

<lang python>import heapq import sys

sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):



version 1

<lang rexx>/**********************************************************************

  • Merge 1.txt ... n.txt into m.txt
  • 1.txt 2.txt 3.txt 4.txt
  • 1 19 1999 2e3
  • 17 33 2999 3000
  • 8 500 3999
                                                                                                                                            • /

n=4 high='ffff'x p.= Do i=1 To n

 Call get i

Do Forever

 Do i=1 To n
   If x.i<<min Then Do    /* avoid numerical comparison */
 If min<<high Then Do
   Call o x.imin
   Call get imin
 Else Do
   Call lineout oid

Exit get: Procedure Expose f. x. high p.

 Parse Arg ii
 If lines(f.ii)=0 Then
 Else Do
   If x.ii<<p.ii Then Do
     Say 'Input file' f.ii 'is not sorted ascendingly'
     Say p.ii 'precedes' x.ii

o: Say arg(1)

  Return lineout(oid,arg(1))</lang>

version 2

This REXX version reads   (in numerical order)   any number of input files in the form of:     nnn.TXT     and
and stops reading subsequent   new   input files when it encounters a file that doesn't exist   (or is empty).

No   heap   is needed to keep track of which record was written and needs replenishing from its input file. <lang rexx>/*REXX pgm reads sorted files (1.TXT, 2.TXT, ···), and writes sorted data ───► ALL.TXT */ $=copies('FF'x, 1e5) /*no value should be larger than this. */ @.=$ /*the default value for the @ array. */

    do n=1  until @.n==$;     call rdr n;   end /*read any number of appropriate files.*/
                                                /* [↑]  read 'til a non─existent file. */
    do  forever;              y=$;    #=0       /*find the lowest value for  N  values.*/
         do k=1  for n-1                        /*traipse through the stemmed  @ array.*/
         if @.k==$  then call rdr k             /*Not defined?  Then read a file record*/
         if @.k<<y  then do;  y=@.k;  #=k;  end /*Lowest so far?  Then mark this as min*/
         end   /*k*/
    if #==0  then exit                          /*stick a fork in it,  we're all done. */
    call lineout 'ALL.TXT', @.#;      say @.#   /*output the value to ALL.TXT; also say*/
    call rdr #                                  /*repopulate this file's input value.  */
    end   /*until*/

/*──────────────────────────────────────────────────────────────────────────────────────*/ rdr: parse arg z; @.z=$; if lines(z'.TXT')\==0 then @.z=linein(z".TXT"); return</lang> output   is the same as the 1st REXX version when using the same input).

UNIX Shell

sort --merge source1 source2 sourceN > sink


This solution uses iterators, doesn't care where the streams orginate and only keeps the head of the stream on hand. <lang zkl>fcn mergeStreams(s1,s2,etc){ //-->Walker

  streams:=vm.arglist.pump(List(),fcn(s){ // prime and prune
     if( (w:=s.walker())._next() ) return(w);
     Void.Skip		// stream is dry
     if(not streams) return(Void.Stop);  // all streams are dry
     values:=streams.apply("value");	  // head of the streams
     v:=values.reduce('wrap(min,x){ if(min<=x) min else x });
     n:=values.find(v); w:=streams[n]; w._next();  // read next value from min stream
     if(w.atEnd) streams.del(n); // prune empty streams

}</lang> Using infinite streams: <lang zkl>w:=mergeStreams([0..],[2..*,2],[3..*,3],T(5)); w.walk(20).println();</lang>


Using files: <lang zkl>w:=mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")); do(10){ }</lang>


Using the above example to squirt the merged stream to a file: <lang zkl>mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")) .pump(File("foo.txt","w"));</lang>

$ ls -l unixdict.txt 2hkprimes.txt foo.txt 
-rw-r--r-- 1 craigd craigd 1510484 Oct 29  2013 2hkprimes.txt
-rw-r--r-- 1 craigd craigd 1716887 Jun 16 23:34 foo.txt
-rw-r--r-- 1 craigd craigd  206403 Jun 11  2014 unixdict.txt