## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importoperatorimporttimefromitertoolsimportchainfromdatetimeimportdatetimefromtypingimport(Any,Callable,Generic,Hashable,Iterable,List,Optional,Tuple,TypeVar,Union,TYPE_CHECKING,cast,overload,)frompy4j.protocolimportPy4JJavaErrorfrompyspark.storagelevelimportStorageLevelfrompyspark.streaming.utilimportrddToFileName,TransformFunctionfrompyspark.rddimportportable_hash,RDDfrompyspark.resultiterableimportResultIterablefrompy4j.java_gatewayimportJavaObjectifTYPE_CHECKING:frompyspark.serializersimportSerializerfrompyspark.streaming.contextimportStreamingContext__all__=["DStream"]S=TypeVar("S")T=TypeVar("T")T_co=TypeVar("T_co",covariant=True)U=TypeVar("U")K=TypeVar("K",bound=Hashable)V=TypeVar("V")
[docs]classDStream(Generic[T_co]):""" A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see :class:`RDD` in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, etc.) using a :class:`StreamingContext` or it can be generated by transforming existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream. DStreams internally is characterized by a few basic properties: - A list of other DStreams that the DStream depends on - A time interval at which the DStream generates an RDD - A function that is used to generate an RDD after each time interval """def__init__(self,jdstream:JavaObject,ssc:"StreamingContext",jrdd_deserializer:"Serializer",):self._jdstream=jdstreamself._ssc=sscself._sc=ssc._scself._jrdd_deserializer=jrdd_deserializerself.is_cached=Falseself.is_checkpointed=False
[docs]defcontext(self)->"StreamingContext":""" Return the StreamingContext associated with this DStream """returnself._ssc
[docs]defcount(self)->"DStream[int]":""" Return a new DStream in which each RDD has a single element generated by counting each RDD of this DStream. """returnself.mapPartitions(lambdai:[sum(1for_ini)]).reduce(operator.add)
[docs]deffilter(self:"DStream[T]",f:Callable[[T],bool])->"DStream[T]":""" Return a new DStream containing only the elements that satisfy predicate. """deffunc(iterator:Iterable[T])->Iterable[T]:returnfilter(f,iterator)returnself.mapPartitions(func,True)
[docs]defflatMap(self:"DStream[T]",f:Callable[[T],Iterable[U]],preservesPartitioning:bool=False,)->"DStream[U]":""" Return a new DStream by applying a function to all elements of this DStream, and then flattening the results """deffunc(s:int,iterator:Iterable[T])->Iterable[U]:returnchain.from_iterable(map(f,iterator))returnself.mapPartitionsWithIndex(func,preservesPartitioning)
[docs]defmap(self:"DStream[T]",f:Callable[[T],U],preservesPartitioning:bool=False)->"DStream[U]":""" Return a new DStream by applying a function to each element of DStream. """deffunc(iterator:Iterable[T])->Iterable[U]:returnmap(f,iterator)returnself.mapPartitions(func,preservesPartitioning)
[docs]defmapPartitions(self:"DStream[T]",f:Callable[[Iterable[T]],Iterable[U]],preservesPartitioning:bool=False,)->"DStream[U]":""" Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs of this DStream. """deffunc(s:int,iterator:Iterable[T])->Iterable[U]:returnf(iterator)returnself.mapPartitionsWithIndex(func,preservesPartitioning)
[docs]defmapPartitionsWithIndex(self:"DStream[T]",f:Callable[[int,Iterable[T]],Iterable[U]],preservesPartitioning:bool=False,)->"DStream[U]":""" Return a new DStream in which each RDD is generated by applying mapPartitionsWithIndex() to each RDDs of this DStream. """returnself.transform(lambdardd:rdd.mapPartitionsWithIndex(f,preservesPartitioning))
[docs]defreduce(self:"DStream[T]",func:Callable[[T,T],T])->"DStream[T]":""" Return a new DStream in which each RDD has a single element generated by reducing each RDD of this DStream. """returnself.map(lambdax:(None,x)).reduceByKey(func,1).map(lambdax:x[1])
[docs]defreduceByKey(self:"DStream[Tuple[K, V]]",func:Callable[[V,V],V],numPartitions:Optional[int]=None,)->"DStream[Tuple[K, V]]":""" Return a new DStream by applying reduceByKey to each RDD. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.combineByKey(lambdax:x,func,func,numPartitions)
[docs]defcombineByKey(self:"DStream[Tuple[K, V]]",createCombiner:Callable[[V],U],mergeValue:Callable[[U,V],U],mergeCombiners:Callable[[U,U],U],numPartitions:Optional[int]=None,)->"DStream[Tuple[K, U]]":""" Return a new DStream by applying combineByKey to each RDD. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismdeffunc(rdd:RDD[Tuple[K,V]])->RDD[Tuple[K,U]]:returnrdd.combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions)returnself.transform(func)
[docs]defpartitionBy(self:"DStream[Tuple[K, V]]",numPartitions:int,partitionFunc:Callable[[K],int]=portable_hash,)->"DStream[Tuple[K, V]]":""" Return a copy of the DStream in which each RDD are partitioned using the specified partitioner. """returnself.transform(lambdardd:rdd.partitionBy(numPartitions,partitionFunc))
[docs]defforeachRDD(self:"DStream[T]",func:Union[Callable[[RDD[T]],None],Callable[[datetime,RDD[T]],None]],)->None:""" Apply a function to each RDD in this DStream. """iffunc.__code__.co_argcount==1:old_func=funcdeffunc(_:datetime,rdd:"RDD[T]")->None:returnold_func(rdd)# type: ignore[call-arg, arg-type]jfunc=TransformFunction(self._sc,func,self._jrdd_deserializer)assertself._ssc._jvmisnotNoneapi=self._ssc._jvm.PythonDStreamapi.callForeachRDD(self._jdstream,jfunc)
[docs]defpprint(self,num:int=10)->None:""" Print the first num elements of each RDD generated in this DStream. Parameters ---------- num : int, optional the number of elements from the first will be printed. """deftakeAndPrint(time:datetime,rdd:RDD[T])->None:taken=rdd.take(num+1)print("-------------------------------------------")print("Time: %s"%time)print("-------------------------------------------")forrecordintaken[:num]:print(record)iflen(taken)>num:print("...")print("")self.foreachRDD(takeAndPrint)
[docs]defmapValues(self:"DStream[Tuple[K, V]]",f:Callable[[V],U])->"DStream[Tuple[K, U]]":""" Return a new DStream by applying a map function to the value of each key-value pairs in this DStream without changing the key. """defmap_values_fn(kv:Tuple[K,V])->Tuple[K,U]:returnkv[0],f(kv[1])returnself.map(map_values_fn,preservesPartitioning=True)
[docs]defflatMapValues(self:"DStream[Tuple[K, V]]",f:Callable[[V],Iterable[U]])->"DStream[Tuple[K, U]]":""" Return a new DStream by applying a flatmap function to the value of each key-value pairs in this DStream without changing the key. """defflat_map_fn(kv:Tuple[K,V])->Iterable[Tuple[K,U]]:return((kv[0],x)forxinf(kv[1]))returnself.flatMap(flat_map_fn,preservesPartitioning=True)
[docs]defglom(self:"DStream[T]")->"DStream[List[T]]":""" Return a new DStream in which RDD is generated by applying glom() to RDD of this DStream. """deffunc(iterator:Iterable[T])->Iterable[List[T]]:yieldlist(iterator)returnself.mapPartitions(func)
[docs]defcache(self:"DStream[T]")->"DStream[T]":""" Persist the RDDs of this DStream with the default storage level (`MEMORY_ONLY`). """self.is_cached=Trueself.persist(StorageLevel.MEMORY_ONLY)returnself
[docs]defpersist(self:"DStream[T]",storageLevel:StorageLevel)->"DStream[T]":""" Persist the RDDs of this DStream with the given storage level """self.is_cached=TruejavaStorageLevel=self._sc._getJavaStorageLevel(storageLevel)self._jdstream.persist(javaStorageLevel)returnself
[docs]defcheckpoint(self:"DStream[T]",interval:int)->"DStream[T]":""" Enable periodic checkpointing of RDDs of this DStream Parameters ---------- interval : int time in seconds, after each period of that, generated RDD will be checkpointed """self.is_checkpointed=Trueself._jdstream.checkpoint(self._ssc._jduration(interval))returnself
[docs]defgroupByKey(self:"DStream[Tuple[K, V]]",numPartitions:Optional[int]=None)->"DStream[Tuple[K, Iterable[V]]]":""" Return a new DStream by applying groupByKey on each RDD. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transform(lambdardd:rdd.groupByKey(numPartitions))
[docs]defcountByValue(self:"DStream[K]")->"DStream[Tuple[K, int]]":""" Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """returnself.map(lambdax:(x,1)).reduceByKey(lambdax,y:x+y)
[docs]defsaveAsTextFiles(self,prefix:str,suffix:Optional[str]=None)->None:""" Save each RDD in this DStream as at text file, using string representation of elements. """defsaveAsTextFile(t:Optional[datetime],rdd:RDD[T])->None:path=rddToFileName(prefix,suffix,t)try:rdd.saveAsTextFile(path)exceptPy4JJavaErrorase:# after recovered from checkpointing, the foreachRDD may# be called twiceif"FileAlreadyExistsException"notinstr(e):raisereturnself.foreachRDD(saveAsTextFile)
# TODO: uncomment this until we have ssc.pickleFileStream()# def saveAsPickleFiles(self, prefix, suffix=None):# """# Save each RDD in this DStream as at binary file, the elements are# serialized by pickle.# """# def saveAsPickleFile(t, rdd):# path = rddToFileName(prefix, suffix, t)# try:# rdd.saveAsPickleFile(path)# except Py4JJavaError as e:# # after recovered from checkpointing, the foreachRDD may# # be called twice# if 'FileAlreadyExistsException' not in str(e):# raise# return self.foreachRDD(saveAsPickleFile)@overloaddeftransform(self:"DStream[T]",func:Callable[[RDD[T]],RDD[U]])->"TransformedDStream[U]":...@overloaddeftransform(self:"DStream[T]",func:Callable[[datetime,RDD[T]],RDD[U]])->"TransformedDStream[U]":...
[docs]deftransform(self:"DStream[T]",func:Union[Callable[[RDD[T]],RDD[U]],Callable[[datetime,RDD[T]],RDD[U]]],)->"TransformedDStream[U]":""" Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream. `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """iffunc.__code__.co_argcount==1:oldfunc=funcdeffunc(_:datetime,rdd:RDD[T])->RDD[U]:returnoldfunc(rdd)# type: ignore[arg-type, call-arg]assertfunc.__code__.co_argcount==2,"func should take one or two arguments"returnTransformedDStream(self,func)
[docs]deftransformWith(self:"DStream[T]",func:Union[Callable[[RDD[T],RDD[U]],RDD[V]],Callable[[datetime,RDD[T],RDD[U]],RDD[V]],],other:"DStream[U]",keepSerializer:bool=False,)->"DStream[V]":""" Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream and 'other' DStream. `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three arguments of (`time`, `rdd_a`, `rdd_b`) """iffunc.__code__.co_argcount==2:oldfunc=funcdeffunc(_:datetime,a:RDD[T],b:RDD[U])->RDD[V]:returnoldfunc(a,b)# type: ignore[call-arg, arg-type]assertfunc.__code__.co_argcount==3,"func should take two or three arguments"jfunc=TransformFunction(self._sc,func,self._jrdd_deserializer,other._jrdd_deserializer,)assertself._sc._jvmisnotNonedstream=self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),other._jdstream.dstream(),jfunc)jrdd_serializer=self._jrdd_deserializerifkeepSerializerelseself._sc.serializerreturnDStream(dstream.asJavaDStream(),self._ssc,jrdd_serializer)
[docs]defrepartition(self:"DStream[T]",numPartitions:int)->"DStream[T]":""" Return a new DStream with an increased or decreased level of parallelism. """returnself.transform(lambdardd:rdd.repartition(numPartitions))
@propertydef_slideDuration(self)->None:""" Return the slideDuration in seconds of this DStream """returnself._jdstream.dstream().slideDuration().milliseconds()/1000.0
[docs]defunion(self:"DStream[T]",other:"DStream[U]")->"DStream[Union[T, U]]":""" Return a new DStream by unifying data of another DStream with this DStream. Parameters ---------- other : :class:`DStream` Another DStream having the same interval (i.e., slideDuration) as this DStream. """ifself._slideDuration!=other._slideDuration:raiseValueError("the two DStream should have same slide duration")returnself.transformWith(lambdaa,b:a.union(b),other,True)
[docs]defcogroup(self:"DStream[Tuple[K, V]]",other:"DStream[Tuple[K, U]]",numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Tuple[ResultIterable[V], ResultIterable[U]]]]":""" Return a new DStream by applying 'cogroup' between RDDs of this DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transformWith(lambdaa,b:a.cogroup(b,numPartitions),other,)
[docs]defjoin(self:"DStream[Tuple[K, V]]",other:"DStream[Tuple[K, U]]",numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Tuple[V, U]]]":""" Return a new DStream by applying 'join' between RDDs of this DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transformWith(lambdaa,b:a.join(b,numPartitions),other)
[docs]defleftOuterJoin(self:"DStream[Tuple[K, V]]",other:"DStream[Tuple[K, U]]",numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Tuple[V, Optional[U]]]]":""" Return a new DStream by applying 'left outer join' between RDDs of this DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transformWith(lambdaa,b:a.leftOuterJoin(b,numPartitions),other)
[docs]defrightOuterJoin(self:"DStream[Tuple[K, V]]",other:"DStream[Tuple[K, U]]",numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Tuple[Optional[V], U]]]":""" Return a new DStream by applying 'right outer join' between RDDs of this DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transformWith(lambdaa,b:a.rightOuterJoin(b,numPartitions),other)
[docs]deffullOuterJoin(self:"DStream[Tuple[K, V]]",other:"DStream[Tuple[K, U]]",numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Tuple[Optional[V], Optional[U]]]]":""" Return a new DStream by applying 'full outer join' between RDDs of this DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreturnself.transformWith(lambdaa,b:a.fullOuterJoin(b,numPartitions),other)
def_jtime(self,timestamp:Union[datetime,int,float])->JavaObject:"""Convert datetime or unix_timestamp into Time"""ifisinstance(timestamp,datetime):timestamp=time.mktime(timestamp.timetuple())assertself._sc._jvmisnotNonereturnself._sc._jvm.Time(int(timestamp*1000))
[docs]defslice(self,begin:Union[datetime,int],end:Union[datetime,int])->List[RDD[T]]:""" Return all the RDDs between 'begin' to 'end' (both included) `begin`, `end` could be datetime.datetime() or unix_timestamp """jrdds=self._jdstream.slice(self._jtime(begin),self._jtime(end))return[RDD(jrdd,self._sc,self._jrdd_deserializer)forjrddinjrdds]
def_validate_window_param(self,window:int,slide:Optional[int])->None:duration=self._jdstream.dstream().slideDuration().milliseconds()ifint(window*1000)%duration!=0:raiseValueError("windowDuration must be multiple of the parent ""dstream's slide (batch) duration (%d ms)"%duration)ifslideandint(slide*1000)%duration!=0:raiseValueError("slideDuration must be multiple of the parent ""dstream's slide (batch) duration (%d ms)"%duration)
[docs]defwindow(self,windowDuration:int,slideDuration:Optional[int]=None)->"DStream[T]":""" Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. Parameters ---------- windowDuration : int width of the window; must be a multiple of this DStream's batching interval slideDuration : int, optional sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """self._validate_window_param(windowDuration,slideDuration)d=self._ssc._jduration(windowDuration)ifslideDurationisNone:returnDStream(self._jdstream.window(d),self._ssc,self._jrdd_deserializer)s=self._ssc._jduration(slideDuration)returnDStream(self._jdstream.window(d,s),self._ssc,self._jrdd_deserializer)
[docs]defreduceByWindow(self:"DStream[T]",reduceFunc:Callable[[T,T],T],invReduceFunc:Optional[Callable[[T,T],T]],windowDuration:int,slideDuration:int,)->"DStream[T]":""" Return a new DStream in which each RDD has a single element generated by reducing all elements in a sliding window over this DStream. if `invReduceFunc` is not None, the reduction is done incrementally using the old window's reduced value : 1. reduce the new values that entered the window (e.g., adding new counts) 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. Parameters ---------- reduceFunc : function associative and commutative reduce function invReduceFunc : function inverse reduce function of `reduceFunc`; such that for all y, and invertible x: `invReduceFunc(reduceFunc(x, y), x) = y` windowDuration : int width of the window; must be a multiple of this DStream's batching interval slideDuration : int sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """keyed=self.map(lambdax:(1,x))reduced=keyed.reduceByKeyAndWindow(reduceFunc,invReduceFunc,windowDuration,slideDuration,1)returnreduced.map(lambdakv:kv[1])
[docs]defcountByWindow(self:"DStream[T]",windowDuration:int,slideDuration:int)->"DStream[int]":""" Return a new DStream in which each RDD has a single element generated by counting the number of elements in a window over this DStream. windowDuration and slideDuration are as defined in the window() operation. This is equivalent to window(windowDuration, slideDuration).count(), but will be more efficient if window is large. """returnself.map(lambdax:1).reduceByWindow(operator.add,operator.sub,windowDuration,slideDuration)
[docs]defcountByValueAndWindow(self:"DStream[T]",windowDuration:int,slideDuration:int,numPartitions:Optional[int]=None,)->"DStream[Tuple[T, int]]":""" Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. Parameters ---------- windowDuration : int width of the window; must be a multiple of this DStream's batching interval slideDuration : int sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval numPartitions : int, optional number of partitions of each RDD in the new DStream. """keyed=self.map(lambdax:(x,1))counted=keyed.reduceByKeyAndWindow(operator.add,operator.sub,windowDuration,slideDuration,numPartitions)returncounted.filter(lambdakv:kv[1]>0)
[docs]defgroupByKeyAndWindow(self:"DStream[Tuple[K, V]]",windowDuration:int,slideDuration:int,numPartitions:Optional[int]=None,)->"DStream[Tuple[K, Iterable[V]]]":""" Return a new DStream by applying `groupByKey` over a sliding window. Similar to `DStream.groupByKey()`, but applies it over a sliding window. Parameters ---------- windowDuration : int width of the window; must be a multiple of this DStream's batching interval slideDuration : int sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval numPartitions : int, optional Number of partitions of each RDD in the new DStream. """ls=self.mapValues(lambdax:[x])grouped=ls.reduceByKeyAndWindow(lambdaa,b:a.extend(b)ora,# type: ignore[func-returns-value]lambdaa,b:a[len(b):],windowDuration,slideDuration,numPartitions,)returngrouped.mapValues(ResultIterable)
[docs]defreduceByKeyAndWindow(self:"DStream[Tuple[K, V]]",func:Callable[[V,V],V],invFunc:Optional[Callable[[V,V],V]],windowDuration:int,slideDuration:Optional[int]=None,numPartitions:Optional[int]=None,filterFunc:Optional[Callable[[Tuple[K,V]],bool]]=None,)->"DStream[Tuple[K, V]]":""" Return a new DStream by applying incremental `reduceByKey` over a sliding window. The reduced value of over a new window is calculated using the old window's reduce value : 1. reduce the new values that entered the window (e.g., adding new counts) 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. Parameters ---------- func : function associative and commutative reduce function invFunc : function inverse function of `reduceFunc` windowDuration : int width of the window; must be a multiple of this DStream's batching interval slideDuration : int, optional sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval numPartitions : int, optional number of partitions of each RDD in the new DStream. filterFunc : function, optional function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter """self._validate_window_param(windowDuration,slideDuration)ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismreduced=self.reduceByKey(func,numPartitions)ifinvFunc:defreduceFunc(t:datetime,a:Any,b:Any)->Any:b=b.reduceByKey(func,numPartitions)r=a.union(b).reduceByKey(func,numPartitions)ifaelsebiffilterFunc:r=r.filter(filterFunc)returnrdefinvReduceFunc(t:datetime,a:Any,b:Any)->Any:b=b.reduceByKey(func,numPartitions)joined=a.leftOuterJoin(b,numPartitions)returnjoined.mapValues(lambdakv:invFunc(kv[0],kv[1])# type: ignore[misc]ifkv[1]isnotNoneelsekv[0])jreduceFunc=TransformFunction(self._sc,reduceFunc,reduced._jrdd_deserializer)jinvReduceFunc=TransformFunction(self._sc,invReduceFunc,reduced._jrdd_deserializer)ifslideDurationisNone:slideDuration=self._slideDurationassertself._sc._jvmisnotNonedstream=self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),jreduceFunc,jinvReduceFunc,self._ssc._jduration(windowDuration),self._ssc._jduration(slideDuration),# type: ignore[arg-type])returnDStream(dstream.asJavaDStream(),self._ssc,self._sc.serializer)else:returnreduced.window(windowDuration,slideDuration).reduceByKey(func,numPartitions# type: ignore[arg-type])
[docs]defupdateStateByKey(self:"DStream[Tuple[K, V]]",updateFunc:Callable[[Iterable[V],Optional[S]],S],numPartitions:Optional[int]=None,initialRDD:Optional[Union[RDD[Tuple[K,S]],Iterable[Tuple[K,S]]]]=None,)->"DStream[Tuple[K, S]]":""" Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. Parameters ---------- updateFunc : function State update function. If this function returns None, then corresponding state key-value pair will be eliminated. """ifnumPartitionsisNone:numPartitions=self._sc.defaultParallelismifinitialRDDandnotisinstance(initialRDD,RDD):initialRDD=self._sc.parallelize(initialRDD)defreduceFunc(t:datetime,a:Any,b:Any)->Any:ifaisNone:g=b.groupByKey(numPartitions).mapValues(lambdavs:(list(vs),None))else:g=a.cogroup(b.partitionBy(cast(int,numPartitions)),numPartitions)g=g.mapValues(lambdaab:(list(ab[1]),list(ab[0])[0]iflen(ab[0])elseNone))state=g.mapValues(lambdavs_s:updateFunc(vs_s[0],vs_s[1]))returnstate.filter(lambdak_v:k_v[1]isnotNone)jreduceFunc=TransformFunction(self._sc,reduceFunc,self._sc.serializer,self._jrdd_deserializer,)ifinitialRDD:initialRDD=cast(RDD[Tuple[K,S]],initialRDD)._reserialize(self._jrdd_deserializer)assertself._sc._jvmisnotNonedstream=self._sc._jvm.PythonStateDStream(self._jdstream.dstream(),jreduceFunc,initialRDD._jrdd,)else:assertself._sc._jvmisnotNonedstream=self._sc._jvm.PythonStateDStream(self._jdstream.dstream(),jreduceFunc)returnDStream(dstream.asJavaDStream(),self._ssc,self._sc.serializer)
classTransformedDStream(DStream[U]):""" TransformedDStream is a DStream generated by an Python function transforming each RDD of a DStream to another RDDs. Multiple continuous transformations of DStream can be combined into one transformation. """@overloaddef__init__(self:DStream[U],prev:DStream[T],func:Callable[[RDD[T]],RDD[U]]):...@overloaddef__init__(self:DStream[U],prev:DStream[T],func:Callable[[datetime,RDD[T]],RDD[U]],):...def__init__(self,prev:DStream[T],func:Union[Callable[[RDD[T]],RDD[U]],Callable[[datetime,RDD[T]],RDD[U]]],):self._ssc=prev._sscself._sc=self._ssc._scself._jrdd_deserializer=self._sc.serializerself.is_cached=Falseself.is_checkpointed=Falseself._jdstream_val=None# Using type() to avoid folding the functions and compacting the DStreams which is not# not strictly an object of TransformedDStream.iftype(prev)isTransformedDStreamandnotprev.is_cachedandnotprev.is_checkpointed:prev_func:Callable=prev.funcfunc=cast(Callable[[datetime,RDD[T]],RDD[U]],func)self.func:Union[Callable[[RDD[T]],RDD[U]],Callable[[datetime,RDD[T]],RDD[U]]]=lambdat,rdd:func(t,prev_func(t,rdd))self.prev:DStream[T]=prev.prevelse:self.prev=prevself.func=func@propertydef_jdstream(self)->JavaObject:ifself._jdstream_valisnotNone:returnself._jdstream_valjfunc=TransformFunction(self._sc,self.func,self.prev._jrdd_deserializer)assertself._sc._jvmisnotNonedstream=self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(),jfunc)self._jdstream_val=dstream.asJavaDStream()returnself._jdstream_val