Stream merge: Difference between revisions
(→REXX: added a second REXX version.) |
(→version 2: elided the use of a function, rolled the WTR code into the mainline program.) |
||
Line 258: | Line 258: | ||
do n=1 until @.n==$; call rdr n; end /*read any number of appropriate files.*/ |
do n=1 until @.n==$; call rdr n; end /*read any number of appropriate files.*/ |
||
/* [↑] read 'til a non─existent file. */ |
/* [↑] read 'til a non─existent file. */ |
||
do |
do forever; y=$; #=0 /*find the lowest value for N values.*/ |
||
do k=1 for n-1 /*traipse through the stemmed @ array.*/ |
do k=1 for n-1 /*traipse through the stemmed @ array.*/ |
||
if @.k==$ then call rdr k /*Not defined? Then read a file record*/ |
if @.k==$ then call rdr k /*Not defined? Then read a file record*/ |
||
if @.k<<y then do; y=@.k; #=k; end /*Lowest so far? Then mark this as min*/ |
if @.k<<y then do; y=@.k; #=k; end /*Lowest so far? Then mark this as min*/ |
||
end /*k*/ |
end /*k*/ |
||
if #==0 then exit /*stick a fork in it, we're all done. */ |
|||
⚫ | |||
⚫ | |||
end /*until*/ |
end /*until*/ |
||
⚫ | |||
/*──────────────────────────────────────────────────────────────────────────────────────*/ |
/*──────────────────────────────────────────────────────────────────────────────────────*/ |
||
rdr: parse arg z; @.z=$; if lines(z'.TXT')\==0 then @.z=linein(z".TXT"); return |
rdr: parse arg z; @.z=$; if lines(z'.TXT')\==0 then @.z=linein(z".TXT"); return</lang> |
||
⚫ | |||
'''output''' is the same as the 1<sup>st</sup> REXX version when using the same input). |
'''output''' is the same as the 1<sup>st</sup> REXX version when using the same input). |
||
Revision as of 22:46, 17 June 2016
You are encouraged to solve this task according to the task description, using any language you may know.
- 2-stream merge
- Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
- Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
- N-stream merge
- The same as above, but reading from N sources.
- Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.
Assume streams are very big. You must not suck them whole in the memory, but read them as streams.
Go
<lang go>package main
import (
"container/heap" "fmt" "io" "log" "os" "strings"
)
var s1 = "3 14 15" var s2 = "2 17 18" var s3 = "" var s4 = "2 3 5 7"
func main() {
fmt.Print("merge2: ") merge2( os.Stdout, strings.NewReader(s1), strings.NewReader(s2)) fmt.Println()
fmt.Print("mergeN: ") mergeN( os.Stdout, strings.NewReader(s1), strings.NewReader(s2), strings.NewReader(s3), strings.NewReader(s4)) fmt.Println()
}
func r1(r io.Reader) (v int, ok bool) {
switch _, err := fmt.Fscan(r, &v); { case err == nil: return v, true case err != io.EOF: log.Fatal(err) } return
}
func merge2(m io.Writer, s1, s2 io.Reader) {
v1, d1 := r1(s1) v2, d2 := r1(s2) var v int for d1 || d2 { if !d2 || d1 && v1 < v2 { v = v1 v1, d1 = r1(s1) } else { v = v2 v2, d2 = r1(s2) } fmt.Fprint(m, v, " ") }
}
type sv struct {
s io.Reader v int
}
type sh []sv
func (s sh) Len() int { return len(s) } func (s sh) Less(i, j int) bool { return s[i].v < s[j].v } func (s sh) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (p *sh) Push(x interface{}) { *p = append(*p, x.(sv)) } func (p *sh) Pop() interface{} {
s := *p last := len(s) - 1 v := s[last] *p = s[:last] return v
}
func mergeN(m io.Writer, s ...io.Reader) {
var h sh for _, s := range s { if v, d := r1(s); d { h = append(h, sv{s, v}) } } heap.Init(&h) for len(h) > 0 { p := heap.Pop(&h).(sv) fmt.Fprint(m, p.v, " ") if v, d := r1(p.s); d { heap.Push(&h, sv{p.s, v}) } }
}</lang>
- Output:
merge2: 2 3 14 15 17 18 mergeN: 2 2 3 3 5 7 14 15 17 18
Haskell
There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.
conduit
<lang haskell>-- stack runhaskell --package=conduit-extra --package=conduit-merge
import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Char8 as BS import Data.Conduit (($$), (=$=)) import Data.Conduit.Binary (sinkHandle, sourceFile) import qualified Data.Conduit.Binary as Conduit import qualified Data.Conduit.List as Conduit import Data.Conduit.Merge (mergeSources) import System.Environment (getArgs) import System.IO (stdout)
main :: IO () main = do
inputFileNames <- getArgs let inputs = [sourceFile file =$= Conduit.lines | file <- inputFileNames] runResourceT $ mergeSources inputs $$ sinkStdoutLn where sinkStdoutLn = Conduit.map (`BS.snoc` '\n') =$= sinkHandle stdout</lang>
pipes
<lang haskell>-- stack runhaskell --package=pipes-safe --package=pipes-interleave
import Pipes (runEffect, (>->)) import Pipes.Interleave (interleave) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)
main :: IO () main = do
sourceFileNames <- getArgs let sources = map readFile sourceFileNames runSafeT . runEffect $ interleave compare sources >-> stdoutLn</lang>
Perl 6
<lang perl6>sub merge_streams ( @streams ) {
my @s = @streams.map({ hash( STREAM => $_, HEAD => .get ) })\ .grep({ .<HEAD>.defined });
return gather while @s { my $h = @s.min: *.<HEAD>; take $h<HEAD>; $h<HEAD> = $h<STREAM>.get orelse @s .= grep( { $_ !=== $h } ); }
}
say merge_streams([ @*ARGS».&open ]);</lang>
Python
Built-in function open
opens a file for reading and returns a line-by-line iterator (stream) over the file.
There exists a standard library function heapq.merge
that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.
<lang python>import heapq import sys
sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):
print(item)</lang>
REXX
version 1
<lang rexx>/**********************************************************************
- Merge 1.txt ... n.txt into m.txt
- 1.txt 2.txt 3.txt 4.txt
- 1 19 1999 2e3
- 17 33 2999 3000
- 8 500 3999
- /
n=4 high='ffff'x p.= Do i=1 To n
f.i=i'.txt' Call get i End
Do Forever
min=high Do i=1 To n If x.i<<min Then Do /* avoid numerical comparison */ imin=i min=x.i End End If min<<high Then Do Call o x.imin Call get imin End Else Do Call lineout oid Leave End End
Exit get: Procedure Expose f. x. high p.
Parse Arg ii If lines(f.ii)=0 Then x.ii=high Else Do x.ii=linein(f.ii) If x.ii<<p.ii Then Do Say 'Input file' f.ii 'is not sorted ascendingly' Say p.ii 'precedes' x.ii Exit End p.ii=x.ii End Return
o: Say arg(1)
Return lineout(oid,arg(1))</lang>
- Output:
1 17 19 1999 2999 2e3 3000 33 3999 500 8
version 2
This REXX version reads (in numerical order) any number of input files in the form of: nnn.TXT and
stops reading when
encountering a file that doesn't exist (or is empty).
<lang rexx>/*REXX pgm reads sorted files (1.TXT, 2.TXT, ···), and writes sorted data ───► ALL.TXT */
$=copies('FF'x, 1e5) /*no value should be larger than this. */
@.=$ /*the default value for the @ array. */
do n=1 until @.n==$; call rdr n; end /*read any number of appropriate files.*/ /* [↑] read 'til a non─existent file. */ do forever; y=$; #=0 /*find the lowest value for N values.*/ do k=1 for n-1 /*traipse through the stemmed @ array.*/ if @.k==$ then call rdr k /*Not defined? Then read a file record*/ if @.k<<y then do; y=@.k; #=k; end /*Lowest so far? Then mark this as min*/ end /*k*/ if #==0 then exit /*stick a fork in it, we're all done. */ call lineout 'ALL.TXT', @.#; say @.# /*output the value to ALL.TXT; also say*/ call rdr # /*repopulate this file's input value. */ end /*until*/
/*──────────────────────────────────────────────────────────────────────────────────────*/ rdr: parse arg z; @.z=$; if lines(z'.TXT')\==0 then @.z=linein(z".TXT"); return</lang> output is the same as the 1st REXX version when using the same input).
Shell
sort --merge source1 source2 sourceN > sink
zkl
This solution uses iterators, doesn't care where the streams orginate and only keeps the head of the stream on hand. <lang zkl>fcn mergeStreams(s1,s2,etc){ //-->Walker
streams:=vm.arglist.pump(List(),fcn(s){ // prime and prune if( (w:=s.walker())._next() ) return(w); Void.Skip // stream is dry }); Walker().tweak(fcn(streams){ if(not streams) return(Void.Stop); // all streams are dry values:=streams.apply("value"); // head of the streams v:=values.reduce('wrap(min,x){ if(min<=x) min else x }); n:=values.find(v); w:=streams[n]; w._next(); // read next value from min stream if(w.atEnd) streams.del(n); // prune empty streams v }.fp(streams));
}</lang> Using infinite streams: <lang zkl>w:=mergeStreams([0..],[2..*,2],[3..*,3],T(5)); w.walk(20).println();</lang>
- Output:
L(0,1,2,2,3,3,4,4,5,5,6,6,6,7,8,8,9,9,10,10)
Using files: <lang zkl>w:=mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")); do(10){ w.read().print() }</lang>
- Output:
10th 1st 2 2nd 3 3rd 4th 5 5th 6th
Using the above example to squirt the merged stream to a file: <lang zkl>mergeStreams(File("unixdict.txt"),File("2hkprimes.txt"),File("/dev/null")) .pump(File("foo.txt","w"));</lang>
- Output:
$ ls -l unixdict.txt 2hkprimes.txt foo.txt -rw-r--r-- 1 craigd craigd 1510484 Oct 29 2013 2hkprimes.txt -rw-r--r-- 1 craigd craigd 1716887 Jun 16 23:34 foo.txt -rw-r--r-- 1 craigd craigd 206403 Jun 11 2014 unixdict.txt