Stream merge

From Rosetta Code
Revision as of 22:59, 17 June 2016 by rosettacode>Gerard Schildberger (→‎version 2: added verbiage to the REXX section header.)
Task
Stream merge
You are encouraged to solve this task according to the task description, using any language you may know.
2-stream merge
Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
N-stream merge
The same as above, but reading from N sources.
Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.


Assume streams are very big. You must not suck them whole in the memory, but read them as streams.

Go

<lang go>package main

import (

   "container/heap"
   "fmt"
   "io"
   "log"
   "os"
   "strings"

)

var s1 = "3 14 15" var s2 = "2 17 18" var s3 = "" var s4 = "2 3 5 7"

func main() {

   fmt.Print("merge2: ")
   merge2(
       os.Stdout,
       strings.NewReader(s1),
       strings.NewReader(s2))
   fmt.Println()
   fmt.Print("mergeN: ")
   mergeN(
       os.Stdout,
       strings.NewReader(s1),
       strings.NewReader(s2),
       strings.NewReader(s3),
       strings.NewReader(s4))
   fmt.Println()

}

func r1(r io.Reader) (v int, ok bool) {

   switch _, err := fmt.Fscan(r, &v); {
   case err == nil:
       return v, true
   case err != io.EOF:
       log.Fatal(err)
   }
   return

}

func merge2(m io.Writer, s1, s2 io.Reader) {

   v1, d1 := r1(s1)
   v2, d2 := r1(s2)
   var v int
   for d1 || d2 {
       if !d2 || d1 && v1 < v2 {
           v = v1
           v1, d1 = r1(s1)
       } else {
           v = v2
           v2, d2 = r1(s2)
       }
       fmt.Fprint(m, v, " ")
   }

}

type sv struct {

   s io.Reader
   v int

}

type sh []sv

func (s sh) Len() int { return len(s) } func (s sh) Less(i, j int) bool { return s[i].v < s[j].v } func (s sh) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (p *sh) Push(x interface{}) { *p = append(*p, x.(sv)) } func (p *sh) Pop() interface{} {

   s := *p
   last := len(s) - 1
   v := s[last]
   *p = s[:last]
   return v

}

func mergeN(m io.Writer, s ...io.Reader) {

   var h sh
   for _, s := range s {
       if v, d := r1(s); d {
           h = append(h, sv{s, v})
       }
   }
   heap.Init(&h)
   for len(h) > 0 {
       p := heap.Pop(&h).(sv)
       fmt.Fprint(m, p.v, " ")
       if v, d := r1(p.s); d {
           heap.Push(&h, sv{p.s, v})
       }
   }

}</lang>

Output:
merge2: 2 3 14 15 17 18 
mergeN: 2 2 3 3 5 7 14 15 17 18 

Haskell

There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.

conduit

<lang haskell>-- stack runhaskell --package=conduit-extra --package=conduit-merge

import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Char8 as BS import Data.Conduit (($$), (=$=)) import Data.Conduit.Binary (sinkHandle, sourceFile) import qualified Data.Conduit.Binary as Conduit import qualified Data.Conduit.List as Conduit import Data.Conduit.Merge (mergeSources) import System.Environment (getArgs) import System.IO (stdout)

main :: IO () main = do

   inputFileNames <- getArgs
   let inputs = [sourceFile file =$= Conduit.lines | file <- inputFileNames]
   runResourceT $ mergeSources inputs $$ sinkStdoutLn
 where
   sinkStdoutLn = Conduit.map (`BS.snoc` '\n') =$= sinkHandle stdout</lang>

pipes

<lang haskell>-- stack runhaskell --package=pipes-safe --package=pipes-interleave

import Pipes (runEffect, (>->)) import Pipes.Interleave (interleave) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)

main :: IO () main = do

   sourceFileNames <- getArgs
   let sources = map readFile sourceFileNames
   runSafeT . runEffect $ interleave compare sources >-> stdoutLn</lang>

Perl 6

<lang perl6>sub merge_streams ( @streams ) {

   my @s = @streams.map({ hash( STREAM => $_, HEAD => .get ) })\
                   .grep({ .<HEAD>.defined });
   return gather while @s {
       my $h = @s.min: *.<HEAD>;
       take $h<HEAD>;
       $h<HEAD> = $h<STREAM>.get
           orelse @s .= grep( { $_ !=== $h } );
   }

}

say merge_streams([ @*ARGS».&open ]);</lang>

Python

Built-in function open opens a file for reading and returns a line-by-line iterator (stream) over the file.

There exists a standard library function heapq.merge that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.

<lang python>import heapq import sys

sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):

   print(item)</lang>

REXX

version 1

<lang rexx>/**********************************************************************

  • Merge 1.txt ... n.txt into m.txt
  • 1.txt 2.txt 3.txt 4.txt
  • 1 19 1999 2e3
  • 17 33 2999 3000
  • 8 500 3999
                                                                                                                                            • /

n=4 high='ffff'x p.= Do i=1 To n

 f.i=i'.txt'
 Call get i
 End

Do Forever

 min=high
 Do i=1 To n
   If x.i<<min Then Do    /* avoid numerical comparison */
     imin=i
     min=x.i
     End
   End
 If min<<high Then Do
   Call o x.imin
   Call get imin
   End
 Else Do
   Call lineout oid
   Leave
   End
 End

Exit get: Procedure Expose f. x. high p.

 Parse Arg ii
 If lines(f.ii)=0 Then
   x.ii=high
 Else Do
   x.ii=linein(f.ii)
   If x.ii<<p.ii Then Do
     Say 'Input file' f.ii 'is not sorted ascendingly'
     Say p.ii 'precedes' x.ii
     Exit
     End
   p.ii=x.ii
   End
 Return

o: Say arg(1)

  Return lineout(oid,arg(1))</lang>
Output:
1
17
19
1999
2999
2e3
3000
33
3999
500
8

version 2

This REXX version reads   (in numerical order)   any number of input files in the form of:   nnn.TXT   (starting at 1.TXT)   and
stops reading input files when encountering a file that doesn't exist   (or is empty).

No   heap   is needed to keep track of which record was written and needs replenishing from its input file. <lang rexx>/*REXX pgm reads sorted files (1.TXT, 2.TXT, ···), and writes sorted data ───► ALL.TXT */ $=copies('FF'x, 1e5) /*no value should be larger than this. */ @.=$ /*the default value for the @ array. */

    do n=1  until @.n==$;     call rdr n;   end /*read any number of appropriate files.*/
                                                /* [↑]  read 'til a non─existent file. */
    do  forever;              y=$;    #=0       /*find the lowest value for  N  values.*/
         do k=1  for n-1                        /*traipse through the stemmed  @ array.*/
         if @.k==$  then call rdr k             /*Not defined?  Then read a file record*/
         if @.k<<y  then do;  y=@.k;  #=k;  end /*Lowest so far?  Then mark this as min*/
         end   /*k*/
    if #==0  then exit                          /*stick a fork in it,  we're all done. */
    call lineout 'ALL.TXT', @.#;      say @.#   /*output the value to ALL.TXT; also say*/
    call rdr #                                  /*repopulate this file's input value.  */
    end   /*until*/

/*──────────────────────────────────────────────────────────────────────────────────────*/ rdr: parse arg z; @.z=$; if lines(z'.TXT')\==0 then @.z=linein(z".TXT"); return</lang> output   is the same as the 1st REXX version when using the same input).

Shell

sort --merge source1 source2 sourceN > sink

zkl

This solution uses iterators, doesn't care where the streams orginate and only keeps the head of the stream on hand. <lang zkl>fcn mergeStreams(s1,s2,etc){ //-->Walker

  streams:=vm.arglist.pump(List(),fcn(s){ // prime and prune
     if( (w:=s.walker())._next() ) return(w);
     Void.Skip		// stream is dry
  });
  Walker().tweak(fcn(streams){
     if(not streams) return(Void.Stop);  // all streams are dry
     values:=streams.apply("value");	  // head of the streams
     v:=values.reduce('wrap(min,x){ if(min<=x) min else x });
     n:=values.find(v); w:=streams[n]; w._next();  // read next value from min stream
     if(w.atEnd) streams.del(n); // prune empty streams
     v
  }.fp(streams));

}</lang> Using infinite streams: <lang zkl>w:=mergeStreams([0..],[2..*,2],[3..*,3],T(5)); w.walk(20).println();</lang>

Output:
L(0,1,2,2,3,3,4,4,5,5,6,6,6,7,8,8,9,9,10,10)

Using files: <lang zkl>w:=mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")); do(10){ w.read().print() }</lang>

Output:
10th
1st
2
2nd
3
3rd
4th
5
5th
6th

Using the above example to squirt the merged stream to a file: <lang zkl>mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")) .pump(File("foo.txt","w"));</lang>

Output:
$ ls -l unixdict.txt 2hkprimes.txt foo.txt 
-rw-r--r-- 1 craigd craigd 1510484 Oct 29  2013 2hkprimes.txt
-rw-r--r-- 1 craigd craigd 1716887 Jun 16 23:34 foo.txt
-rw-r--r-- 1 craigd craigd  206403 Jun 11  2014 unixdict.txt