Stream merge: Difference between revisions

From Rosetta Code
Content added Content deleted
(Go solution)
m (added whitespace before the TOC (table of contents), added other whitespace to the task's preamble.)
Line 3: Line 3:
: Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
: Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
: Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
: Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.

; ''N''-stream merge
; ''N''-stream merge
: The same as above, but reading from ''N'' sources.
: The same as above, but reading from ''N'' sources.
: Common algorithm: same as above, but keep buffered items and their source descriptors in a [[heap]].
: Common algorithm: same as above, but keep buffered items and their source descriptors in a [[heap]].



Assume streams are very big. You must not suck them whole in the memory, but read them as streams.
Assume streams are very big. You must not suck them whole in the memory, but read them as streams.
<br><br>


=={{header|Go}}==
=={{header|Go}}==

Revision as of 20:23, 17 June 2016

Task
Stream merge
You are encouraged to solve this task according to the task description, using any language you may know.
2-stream merge
Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
N-stream merge
The same as above, but reading from N sources.
Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.


Assume streams are very big. You must not suck them whole in the memory, but read them as streams.

Go

<lang go>package main

import (

   "container/heap"
   "fmt"
   "io"
   "log"
   "os"
   "strings"

)

var s1 = "3 14 15" var s2 = "2 17 18" var s3 = "" var s4 = "2 3 5 7"

func main() {

   fmt.Print("merge2: ")
   merge2(
       os.Stdout,
       strings.NewReader(s1),
       strings.NewReader(s2))
   fmt.Println()
   fmt.Print("mergeN: ")
   mergeN(
       os.Stdout,
       strings.NewReader(s1),
       strings.NewReader(s2),
       strings.NewReader(s3),
       strings.NewReader(s4))
   fmt.Println()

}

func r1(r io.Reader) (v int, ok bool) {

   switch _, err := fmt.Fscan(r, &v); {
   case err == nil:
       return v, true
   case err != io.EOF:
       log.Fatal(err)
   }
   return

}

func merge2(m io.Writer, s1, s2 io.Reader) {

   v1, d1 := r1(s1)
   v2, d2 := r1(s2)
   var v int
   for d1 || d2 {
       if !d2 || d1 && v1 < v2 {
           v = v1
           v1, d1 = r1(s1)
       } else {
           v = v2
           v2, d2 = r1(s2)
       }
       fmt.Fprint(m, v, " ")
   }

}

type sv struct {

   s io.Reader
   v int

}

type sh []sv

func (s sh) Len() int { return len(s) } func (s sh) Less(i, j int) bool { return s[i].v < s[j].v } func (s sh) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (p *sh) Push(x interface{}) { *p = append(*p, x.(sv)) } func (p *sh) Pop() interface{} {

   s := *p
   last := len(s) - 1
   v := s[last]
   *p = s[:last]
   return v

}

func mergeN(m io.Writer, s ...io.Reader) {

   var h sh
   for _, s := range s {
       if v, d := r1(s); d {
           h = append(h, sv{s, v})
       }
   }
   heap.Init(&h)
   for len(h) > 0 {
       p := heap.Pop(&h).(sv)
       fmt.Fprint(m, p.v, " ")
       if v, d := r1(p.s); d {
           heap.Push(&h, sv{p.s, v})
       }
   }

}</lang>

Output:
merge2: 2 3 14 15 17 18 
mergeN: 2 2 3 3 5 7 14 15 17 18 

Haskell

There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.

conduit

<lang haskell>-- stack runhaskell --package=conduit-extra --package=conduit-merge

import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Char8 as BS import Data.Conduit (($$), (=$=)) import Data.Conduit.Binary (sinkHandle, sourceFile) import qualified Data.Conduit.Binary as Conduit import qualified Data.Conduit.List as Conduit import Data.Conduit.Merge (mergeSources) import System.Environment (getArgs) import System.IO (stdout)

main :: IO () main = do

   inputFileNames <- getArgs
   let inputs = [sourceFile file =$= Conduit.lines | file <- inputFileNames]
   runResourceT $ mergeSources inputs $$ sinkStdoutLn
 where
   sinkStdoutLn = Conduit.map (`BS.snoc` '\n') =$= sinkHandle stdout</lang>

pipes

<lang haskell>-- stack runhaskell --package=pipes-safe --package=pipes-interleave

import Pipes (runEffect, (>->)) import Pipes.Interleave (interleave) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)

main :: IO () main = do

   sourceFileNames <- getArgs
   let sources = map readFile sourceFileNames
   runSafeT . runEffect $ interleave compare sources >-> stdoutLn</lang>

Perl 6

<lang perl6>sub merge_streams ( @streams ) {

   my @s = @streams.map({ hash( STREAM => $_, HEAD => .get ) })\
                   .grep({ .<HEAD>.defined });
   return gather while @s {
       my $h = @s.min: *.<HEAD>;
       take $h<HEAD>;
       $h<HEAD> = $h<STREAM>.get
           orelse @s .= grep( { $_ !=== $h } );
   }

}

say merge_streams([ @*ARGS».&open ]);</lang>

Python

Built-in function open opens a file for reading and returns a line-by-line iterator (stream) over the file.

There exists a standard library function heapq.merge that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.

<lang python>import heapq import sys

sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):

   print(item)</lang>

REXX

<lang rexx>/**********************************************************************

  • Merge 1.txt ... n.txt into m.txt
  • 1.txt 2.txt 3.txt 4.txt
  • 1 19 1999 2e3
  • 17 33 2999 3000
  • 8 500 3999
                                                                                                                                            • /

n=4 high='ffff'x p.= Do i=1 To n

 f.i=i'.txt'
 Call get i
 End

Do Forever

 min=high
 Do i=1 To n
   If x.i<<min Then Do    /* avoid numerical comparison */
     imin=i
     min=x.i
     End
   End
 If min<<high Then Do
   Call o x.imin
   Call get imin
   End
 Else Do
   Call lineout oid
   Leave
   End
 End

Exit get: Procedure Expose f. x. high p.

 Parse Arg ii
 If lines(f.ii)=0 Then
   x.ii=high
 Else Do
   x.ii=linein(f.ii)
   If x.ii<<p.ii Then Do
     Say 'Input file' f.ii 'is not sorted ascendingly'
     Say p.ii 'precedes' x.ii
     Exit
     End
   p.ii=x.ii
   End
 Return

o: Say arg(1)

  Return lineout(oid,arg(1))</lang>
Output:
1
17
19
1999
2999
2e3
3000
33
3999
500
8

Shell

sort --merge source1 source2 sourceN > sink

zkl

This solution uses iterators, doesn't care where the streams orginate and only keeps the head of the stream on hand. <lang zkl>fcn mergeStreams(s1,s2,etc){ //-->Walker

  streams:=vm.arglist.pump(List(),fcn(s){ // prime and prune
     if( (w:=s.walker())._next() ) return(w);
     Void.Skip		// stream is dry
  });
  Walker().tweak(fcn(streams){
     if(not streams) return(Void.Stop);  // all streams are dry
     values:=streams.apply("value");	  // head of the streams
     v:=values.reduce('wrap(min,x){ if(min<=x) min else x });
     n:=values.find(v); w:=streams[n]; w._next();  // read next value from min stream
     if(w.atEnd) streams.del(n); // prune empty streams
     v
  }.fp(streams));

}</lang> Using infinite streams: <lang zkl>w:=mergeStreams([0..],[2..*,2],[3..*,3],T(5)); w.walk(20).println();</lang>

Output:
L(0,1,2,2,3,3,4,4,5,5,6,6,6,7,8,8,9,9,10,10)

Using files: <lang zkl>w:=mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")); do(10){ w.read().print() }</lang>

Output:
10th
1st
2
2nd
3
3rd
4th
5
5th
6th

Using the above example to squirt the merged stream to a file: <lang zkl>mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")) .pump(File("foo.txt","w"));</lang>

Output:
$ ls -l unixdict.txt 2hkprimes.txt foo.txt 
-rw-r--r-- 1 craigd craigd 1510484 Oct 29  2013 2hkprimes.txt
-rw-r--r-- 1 craigd craigd 1716887 Jun 16 23:34 foo.txt
-rw-r--r-- 1 craigd craigd  206403 Jun 11  2014 unixdict.txt