Stream merge

From Rosetta Code
Revision as of 13:15, 15 June 2016 by rosettacode>Cblp (+ Haskell)
Task
Stream merge
You are encouraged to solve this task according to the task description, using any language you may know.
2-stream merge
Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
N-stream merge
The same as above, but reading from N sources.
Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.

Assume streams are very big. You must not suck them whole in the memory, but read them as streams.

Haskell

There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.

pipes

<lang haskell>-- stack runhaskell --package=heap --package=pipes-safe

import Data.Either (rights) import Data.Heap (MinPrioHeap) import qualified Data.Heap as Heap import Data.Traversable (for) import Pipes (Producer, lift, next, runEffect, yield, (>->)) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)

merge :: (Ord a, Monad m) => [Producer a m ()] -> Producer a m () merge sources = do

   firstResults <- for sources $ \source ->
       lift $ next source
   let sourceHeap = Heap.fromList $ rights firstResults
   mergeHeap sourceHeap

mergeHeap :: (Ord a, Monad m) => MinPrioHeap a (Producer a m ()) -> Producer a m () mergeHeap sourceHeap =

   case Heap.view sourceHeap of
       Nothing ->
           pure ()
       Just ((a, src1), sourceHeap1) -> do
           yield a
           res <- lift $ next src1
           let sourceHeap2 = case res of
                   Left () -> sourceHeap1
                   Right (b, src2) -> Heap.insert (b, src2) sourceHeap1
           mergeHeap sourceHeap2

main :: IO () main = do

   sourceFileNames <- getArgs
   let sources = map readFile sourceFileNames
   runSafeT . runEffect $ merge sources >-> stdoutLn</lang>

Python

Built-in function open opens a file for reading and returns a line-by-line iterator (stream) over the file.

There exists a standard library function heapq.merge that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.

<lang python>import heapq import sys

sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):

   print(item)</lang>

REXX

<lang rexx>/**********************************************************************

  • Merge 1.txt ... n.txt into m.txt
  • 1.txt 2.txt 3.txt 4.txt
  • 1 19 1999 2e3
  • 17 33 2999 3000
  • 8 500 3999
                                                                                                                                            • /

n=4 high='ffff'x p.= Do i=1 To n

 f.i=i'.txt'
 Call get i
 End

Do Forever

 min=high
 Do i=1 To n
   If x.i<<min Then Do    /* avoid numerical comparison */
     imin=i
     min=x.i
     End
   End
 If min<<high Then Do
   Call o x.imin
   Call get imin
   End
 Else Do
   Call lineout oid
   Leave
   End
 End

Exit get: Procedure Expose f. x. high p.

 Parse Arg ii
 If lines(f.ii)=0 Then
   x.ii=high
 Else Do
   x.ii=linein(f.ii)
   If x.ii<<p.ii Then Do
     Say 'Input file' f.ii 'is not sorted ascendingly'
     Say p.ii 'precedes' x.ii
     Exit
     End
   p.ii=x.ii
   End
 Return

o: Say arg(1)

  Return lineout(oid,arg(1))</lang>
Output:
1
17
19
1999
2999
2e3
3000
33
3999
500
8

Shell

sort --merge source1 source2 sourceN > sink