Reactive Extension(Rx) Examples
Asynchronous Background Operations
Start - Run Code Asynchronously
public static async void StartBackgroundWork() {
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(1000);
Console.WriteLine("Background work completed.");
});
await o.FirstAsync(); // subscribe and wait for completion of background operation. If you remove await, the main thread will complete first.
Console.WriteLine("Main thread completed.");
}
Run a method asynchronously on demand
Execute a long-running method asynchronously. The method does not start running until there is a subscriber. The method is started every time the observable is created and subscribed, so there could be more than one running at once.// Synchronous operation
public DataType DoLongRunningOperation(string param)
{
...
}
public IObservable<DataType> LongRunningOperationAsync(string param)
{
return Observable.Create<DataType>(
o => Observable.ToAsync<string,DataType>(DoLongRunningOperation)(param).Subscribe(o)
);
}
CombineLatest - Parallel Execution
Merges the specified observable sequences into one observable sequence by emitting a list with the latest source elements whenever any of the observable sequences produces an element.var o = Observable.CombineLatest(
Observable.Start(() => { Console.WriteLine("Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result A"; }),
Observable.Start(() => { Console.WriteLine("Executing 2nd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result B"; }),
Observable.Start(() => { Console.WriteLine("Executing 3rd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result C"; })
).Finally(() => Console.WriteLine("Done!"));
foreach (string r in o.First())
Console.WriteLine(r);
Result
Executing 1st on Thread: 3
Executing 2nd on Thread: 4
Executing 3rd on Thread: 3
Done!
Result A
Result B
Result C Note Was ForkJoin which is no longer supported. CombineLatest gives the same result.)
Create With Disposable & Scheduler - Canceling an asynchronous operation
This sample starts a background operation that generates a sequence of integers until it is canceled by the main thread. To start the background operation new the Scheduler class is used and a CancellationTokenSource is indirectly created by a Observable.Create.Please check out the MSDN documentation on System.Threading.CancellationTokenSource to learn more about cancellation source.
IObservable<int> ob =
Observable.Create<int>(o =>
{
var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
NewThreadScheduler.Default.Schedule(() =>
{
int i = 0;
for (; ; )
{
Thread.Sleep(200); // here we do the long lasting background operation
if (!cancel.Token.IsCancellationRequested) // check cancel token periodically
o.OnNext(i++);
else
{
Console.WriteLine("Aborting because cancel event was signaled!");
o.OnCompleted();
return;
}
}
}
);
return cancel;
}
);
IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Press any key to cancel");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to quit");
Console.ReadKey(); // give background thread chance to write the cancel acknowledge message
Observation Operators
Observing an Event - Simple
class ObserveEvent_Simple
{
public static event EventHandler SimpleEvent;
static void Main()
{
// To consume SimpleEvent as an IObservable:
IObservable<EventPattern<EventArgs>> eventAsObservable = Observable.FromEventPattern(
ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
}
}
Alternately, you can use EventArgs:
public static event EventHandler<EventArgs> SimpleEvent;
private static void Main(string[] args) {
IObservable<EventPattern<EventArgs>> eventAsObservable = Observable.FromEventPattern<EventArgs>
(ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
}
Observing an Event - Simple (expanded)
class ObserveEvent_Simple
{
public static event EventHandler SimpleEvent;
private static void Main()
{
Console.WriteLine("Setup observable");
// To consume SimpleEvent as an IObservable:
IObservable<EventPattern<EventArgs>> eventAsObservable = Observable.FromEventPattern(
ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
// SimpleEvent is null until we subscribe
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Subscribe");
//Create two event subscribers
var s = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for s subscriber"));
var t = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for t subscriber"));
// After subscribing the event handler has been added
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Raise event");
if (null != SimpleEvent)
{
SimpleEvent(null, EventArgs.Empty);
}
// Allow some time before unsubscribing or event may not happen
Thread.Sleep(100);
Console.WriteLine("Unsubscribe");
s.Dispose();
t.Dispose();
// After unsubscribing the event handler has been removed
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.ReadKey();
}
}
Observing MouseMove in Silverlight
var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
mouseMove.ObserveOnDispatcher()
.Subscribe(args => Debug.WriteLine(args.EventArgs.GetPosition(this)));
Note that a reference to System.Reactive.Windows.Threading is required for ObserveOnDispatcher which is in Nuget as Reactive Extensions - Silverlight Helpers.
Observing an Event - Generic
class ObserveEvent_Generic
{
public class SomeEventArgs : EventArgs { }
public static event EventHandler<SomeEventArgs> GenericEvent;
static void Main()
{
// To consume GenericEvent as an IObservable:
IObservable<EventPattern<SomeEventArgs>> eventAsObservable = Observable.FromEventPattern<SomeEventArgs>(
ev => GenericEvent += ev,
ev => GenericEvent -= ev );
}
}
Observing an Event - Non-Generic
class ObserveEvent_NonGeneric
{
public class SomeEventArgs : EventArgs { }
public delegate void SomeNonGenericEventHandler(object sender, SomeEventArgs e);
public static event SomeNonGenericEventHandler NonGenericEvent;
static void Main()
{
// To consume NonGenericEvent as an IObservable, first inspect the type of EventArgs used in the second parameter of the delegate.
// In this case, it is SomeEventArgs. Then, use as shown below.
IObservable<IEvent<SomeEventArgs>> eventAsObservable = Observable.FromEvent(
(EventHandler<SomeEventArgs> ev) => new SomeNonGenericEventHandler(ev),
ev => NonGenericEvent += ev,
ev => NonGenericEvent -= ev);
}
}
Observing an Asynchronous Operation
class Observe_IAsync
{
static void Main()
{
// We will use Stream's BeginRead and EndRead for this sample.
Stream inputStream = Console.OpenStandardInput();
// To convert an asynchronous operation that uses the IAsyncResult pattern to a function that returns an IObservable, use the following format.
// For the generic arguments, specify the types of the arguments of the Begin* method, up to the AsyncCallback.
// If the End* method returns a value, append this as your final generic argument.
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
// Now, you can get an IObservable instead of an IAsyncResult when calling it.
byte[] someBytes = new byte[10];
IObservable<int> observable = read(someBytes, 0, 10);
}
}
Creating an observable sequence and
c# - What is the proper way to create an Observable which reads a stream to the end - Stack Overflow.
Observing a Generic IEnumerable
class Observe_GenericIEnumerable
{
static void Main()
{
IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };
// To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
IObservable<int> observable = someInts.ToObservable();
}
}
Observing a Non-Generic IEnumerable - Single Type
class Observe_NonGenericIEnumerableSingleType
{
static void Main()
{
IEnumerable someInts = new object[] { 1, 2, 3, 4, 5 };
// To convert a non-generic IEnumerable that contains elements of a single type,
// first use Cast<> to change the non-generic enumerable into a generic enumerable,
// then use ToObservable.
IObservable<int> observable = someInts.Cast<int>().ToObservable();
}
}
Observing a Non-Generic IEnumerable - Multiple Types
Observing the Passing of Time
class Observe_Time
{
static void Main()
{
// To observe time passing, use the Observable.Interval function.
// It will notify you on a time interval you specify.
// 0 after 1s, 1 after 2s, 2 after 3s, etc.
IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<long> alsoOneNumberPerSecond = Observable.Interval(1000 /* milliseconds */);
}
}
Restriction Operators
Where - Simple
class Where_Simple
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var lowNums = from n in oneNumberPerSecond
where n < 5
select n;
Console.WriteLine("Numbers < 5:");
lowNums.Subscribe(lowNum =>
{
Console.WriteLine(lowNum);
});
Console.ReadKey();
}
}
Result
Numbers < 5:
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
4 (after 5s)
Where - Drilldown
class Where_DrillDown
{
class Customer
{
public Customer() { Orders = new ObservableCollection<Order>(); }
public string CustomerName { get; set; }
public string Region { get; set; }
public ObservableCollection<Order> Orders { get; private set; }
}
class Order
{
public int OrderId { get; set; }
public DateTimeOffset OrderDate { get; set; }
}
static void Main()
{
var customers = new ObservableCollection<Customer>();
var customerChanges = Observable.FromEventPattern(
(EventHandler<NotifyCollectionChangedEventArgs> ev)
=> new NotifyCollectionChangedEventHandler(ev),
ev => customers.CollectionChanged += ev,
ev => customers.CollectionChanged -= ev);
var watchForNewCustomersFromWashington =
from c in customerChanges
where c.EventArgs.Action == NotifyCollectionChangedAction.Add
from cus in c.EventArgs.NewItems.Cast<Customer>().ToObservable()
where cus.Region == "WA"
select cus;
Console.WriteLine("New customers from Washington and their orders:");
watchForNewCustomersFromWashington.Subscribe(cus =>
{
Console.WriteLine("Customer {0}:", cus.CustomerName);
foreach (var order in cus.Orders)
{
Console.WriteLine("Order {0}: {1}", order.OrderId, order.OrderDate);
}
});
customers.Add(new Customer
{
CustomerName = "Lazy K Kountry Store",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 1 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Joe's Food Shop",
Region = "NY",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 2 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Trail's Head Gourmet Provisioners",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 3 } }
});
Console.ReadKey();
}
}
Result
New customers from Washington and their orders:
Customer Lazy K Kountry Store: (after 0s)
Order 1: 11/20/2009 11:52:02 AM -06:00
Customer Trail's Head Gourmet Provisioners: (after 2s)
Order 3: 11/20/2009 11:52:04 AM -06:00
Projection Operators
Select - Simple
class Select_Simple
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var numbersTimesTwo = from n in oneNumberPerSecond
select n * 2;
Console.WriteLine("Numbers * 2:");
numbersTimesTwo.Subscribe(num =>
{
Console.WriteLine(num);
});
Console.ReadKey();
}
}
Result
Numbers * 2:
0 (after 1s)
2 (after 2s)
4 (after 3s)
6 (after 4s)
8 (after 5s)
Select - Transformation
class Select_Transform
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var stringsFromNumbers = from n in oneNumberPerSecond
select new string('*', (int)n);
Console.WriteLine("Strings from numbers:");
stringsFromNumbers.Subscribe(num =>
{
Console.WriteLine(num);
});
Console.ReadKey();
}
}
Result
Strings from numbers:
(after 0s)
* (after 1s)
** (after 2s)
*** (after 3s)
**** (after 4s)
***** (after 5s)
****** (after 6s)
Select - Indexed
class Where_Indexed
{
class TimeIndex
{
public TimeIndex(int index, DateTimeOffset time)
{
Index = index;
Time = time;
}
public int Index { get; set; }
public DateTimeOffset Time { get; set; }
}
static void Main()
{
var clock = Observable.Interval(TimeSpan.FromSeconds(1))
.Select((t, index) => new TimeIndex(index, DateTimeOffset.Now));
clock.Subscribe(timeIndex =>
{
Console.WriteLine(
"Ding dong. The time is now {0:T}. This is event number {1}.",
timeIndex.Time,
timeIndex.Index);
});
Console.ReadKey();
}
}
Result
Ding dong. The time is now 1:55:00 PM. This is event number 0. (after 0s)
Ding dong. The time is now 1:55:01 PM. This is event number 1. (after 1s)
Ding dong. The time is now 1:55:02 PM. This is event number 2. (after 2s)
Ding dong. The time is now 1:55:03 PM. This is event number 3. (after 3s)
Ding dong. The time is now 1:55:04 PM. This is event number 4. (after 4s)
Ding dong. The time is now 1:55:05 PM. This is event number 5. (after 5s)
Grouping
Group By - Simple
This example counts how many time you press each key as you furiously hit the keyboard. :)class GroupBy_Simple
{
static IEnumerable<ConsoleKeyInfo> KeyPresses()
{
for (; ; )
{
var currentKey = Console.ReadKey(true);
if (currentKey.Key == ConsoleKey.Enter)
yield break;
else
yield return currentKey;
}
}
static void Main()
{
var timeToStop = new ManualResetEvent(false);
var keyPresses = KeyPresses().ToObservable();
var groupedKeyPresses =
from k in keyPresses
group k by k.Key into keyPressGroup
select keyPressGroup;
Console.WriteLine("Press Enter to stop. Now bang that keyboard!");
groupedKeyPresses.Subscribe(keyPressGroup =>
{
int numberPresses = 0;
keyPressGroup.Subscribe(keyPress =>
{
Console.WriteLine(
"You pressed the {0} key {1} time(s)!",
keyPress.Key,
++numberPresses);
},
() => timeToStop.Set());
});
timeToStop.WaitOne();
}
}
Result
Depends on what you press! But something like:
Press Enter to stop. Now bang that keyboard!
You pressed the A key 1 time(s)!
You pressed the A key 2 time(s)!
You pressed the B key 1 time(s)!
You pressed the B key 2 time(s)!
You pressed the C key 1 time(s)!
You pressed the C key 2 time(s)!
You pressed the C key 3 time(s)!
You pressed the A key 3 time(s)!
You pressed the B key 3 time(s)!
You pressed the A key 4 time(s)!
You pressed the A key 5 time(s)!
You pressed the C key 4 time(s)!
Time-Related Operators
Buffer - Simple
Buffer has a strange name, but a simple concept.Imagine an email program that checks for new mail every 5 minutes. While you can receive mail at any instant in time, you only get a batch of emails at every five minute mark.
Let's use Buffer to simulate this.
class Buffer_Simple
{
static IEnumerable<string> EndlessBarrageOfEmail()
{
var random = new Random();
var emails = new List<String> { "Here is an email!", "Another email!", "Yet another email!" };
for (; ; )
{
// Return some random emails at random intervals.
yield return emails[random.Next(emails.Count)];
Thread.Sleep(random.Next(1000));
}
}
static void Main()
{
var myInbox = EndlessBarrageOfEmail().ToObservable();
// Instead of making you wait 5 minutes, we will just check every three seconds instead. :)
var getMailEveryThreeSeconds = myInbox.Buffer(TimeSpan.FromSeconds(3)); // Was .BufferWithTime(...
getMailEveryThreeSeconds.Subscribe(emails =>
{
Console.WriteLine("You've got {0} new messages! Here they are!", emails.Count());
foreach (var email in emails)
{
Console.WriteLine("> {0}", email);
}
Console.WriteLine();
});
Console.ReadKey();
}
}
Result
You've got 5 new messages! Here they are! (after 3s)
> Here is an email!
> Another email!
> Here is an email!
> Another email!
> Here is an email! You've got 6 new messages! Here they are! (after 6s)
> Another email!
> Another email!
> Here is an email!
> Here is an email!
> Another email!
> Another email!
Delay - Simple
class Delay_Simple
{
static void Main()
{
var oneNumberEveryFiveSeconds = Observable.Interval(TimeSpan.FromSeconds(5));
// Instant echo
oneNumberEveryFiveSeconds.Subscribe(num =>
{
Console.WriteLine(num);
});
// One second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).Subscribe(num =>
{
Console.WriteLine("...{0}...", num);
});
// Two second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).Subscribe(num =>
{
Console.WriteLine("......{0}......", num);
});
Console.ReadKey();
}
}
Result
0 (after 5s)
…0… (after 6s)
……0…… (after 7s)
1 (after 10s)
…1… (after 11s)
……1…… (after 12s)
Interval - Simple
internal class Interval_Simple
{
private static void Main()
{
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
…
Sample - Simple
internal class Sample_Simple
{
private static void Main()
{
// Generate sequence of numbers, (an interval of 50 ms seems to result in approx 16 per second).
IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(50));
// Sample the sequence every second
using (observable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
15: 24/11/2009 15:40:45 (after 1s)
31: 24/11/2009 15:40:46 (after 2s)
47: 24/11/2009 15:40:47 (after 3s)
64: 24/11/2009 15:40:48 (after 4s)
…
Throttle - Simple
Throttle stops the flow of events until no more events are produced for a specified period of time. For example, if you throttle a TextChanged event of a textbox to .5 seconds, no events will be passed until the user has stopped typing for .5 seconds. This is useful in search boxes where you do not want to start a new search after every keystroke, but want to wait until the user pauses.SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher().Subscribe(e => this.ListItems.Add(this.textBox.Text));
internal class Throttle_Simple
{
// Generates events with interval that alternates between 500ms and 1000ms every 5 events
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{
int i = 0;
while(true)
{
if(i > 1000)
{
yield break;
}
yield return i;
Thread.Sleep( i++ % 10 < 5 ? 500 : 1000);
}
}
private static void Main()
{
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
5: <timestamp>
6: <timestamp>
7: <timestamp>
8: <timestamp>
9: <timestamp>
15: <timestamp>
16: <timestamp>
17: <timestamp>
18: <timestamp>
19: <timestamp>
…etc
Interval - With TimeInterval() - Simple
internal class TimeInterval_Simple
{
// Like TimeStamp but gives the time-interval between successive values
private static void Main()
{
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0: 00:00:00.8090459 (1st value)
1: 00:00:00.7610435 (2nd value)
2: 00:00:00.7650438 (3rd value)
…
Interval - With TimeInterval() - Remove
internal class TimeInterval_Remove
{
private static void Main()
{
// Add a time interval
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();
// Remove it again
using (observable.RemoveTimeInterval().Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0
1
2
…
Timeout - Simple
internal class Timeout_Simple
{
private static void Main()
{
Console.WriteLine(DateTime.Now);
// create a single event in 10 seconds time
var observable = Observable.Timer(TimeSpan.FromSeconds(10)).Timestamp();
// raise exception if no event received within 9 seconds
var observableWithTimeout = Observable.Timeout(observable, TimeSpan.FromSeconds(9));
using (observableWithTimeout.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp),
ex => Console.WriteLine("{0} {1}", ex.Message, DateTime.Now)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
02/12/2009 10:13:00
Press any key to unsubscribe
The operation has timed out. 02/12/2009 10:13:09
…
Timer - Simple
Observable.Interval is a simple wrapper around Observable.Timer.internal class Timer_Simple
{
private static void Main()
{
Console.WriteLine(DateTime.Now);
var observable = Observable.Timer(TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(1)).Timestamp();
// or, equivalently
// var observable = Observable.Timer(DateTime.Now + TimeSpan.FromSeconds(5),
// TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
02/12/2009 10:02:29
Press any key to unsubscribe
0: 02/12/2009 10:02:34(after 5s)
1: 02/12/2009 10:02:35 (after 6s)
2: 02/12/2009 10:02:36 (after 7s)
…
Timestamp - Simple
Adds a TimeStamp to each element using the system's local time.internal class Timestamp_Simple
{
private static void Main()
{
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0: 24/11/2009 15:40:45 (after 1s)
1: 24/11/2009 15:40:46 (after 2s)
2: 24/11/2009 15:40:47 (after 3s)
3: 24/11/2009 15:40:48 (after 4s)
…
Timestamp - Remove
internal class Timestamp_Remove
{
private static void Main()
{
// Add timestamp
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
// Remove it
using (observable.RemoveTimestamp().Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
…
Window and Joins
Window
Divides a stream into "Windows" of time. For example, 5 five second window would contain all elements pushed in that five second interval.IObservable<long> mainSequence = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<IObservable<long>> seqWindowed = mainSequence.Window(() =>
{
IObservable<long> seqWindowControl = Observable.Interval(TimeSpan.FromSeconds(6));
return seqWindowControl;
});
seqWindowed.Subscribe(seqWindow =>
{
Console.WriteLine("\nA new window into the main sequence has opened: {0}\n",
DateTime.Now.ToString());
seqWindow.Subscribe(x => { Console.WriteLine("Integer : {0}", x); });
});
Console.ReadLine();
GroupJoin - Joins two streams matching by one of their attributes
var leftList = new List<string[]>();
leftList.Add(new string[] { "2013-01-01 02:00:00", "Batch1" });
leftList.Add(new string[] { "2013-01-01 03:00:00", "Batch2" });
leftList.Add(new string[] { "2013-01-01 04:00:00", "Batch3" });
var rightList = new List<string[]>();
rightList.Add(new string[] { "2013-01-01 01:00:00", "Production=2" });
rightList.Add(new string[] { "2013-01-01 02:00:00", "Production=0" });
rightList.Add(new string[] { "2013-01-01 03:00:00", "Production=3" });
var l = leftList.ToObservable();
var r = rightList.ToObservable();
var q = l.GroupJoin(r,
_ => Observable.Never<Unit>(), // windows from each left event going on forever
_ => Observable.Never<Unit>(), // windows from each right event going on forever
(left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events
// e is a tuple with two items, left and obsOfRight
q.Subscribe(e =>
{
var xs = e.Item2;
xs.Where(
x => x[0] == e.Item1[0]) // filter only when datetime matches
.Subscribe(
v =>
{
Console.WriteLine(
string.Format("{0},{1} and {2},{3} occur at the same time",
e.Item1[0],
e.Item1[1],
v[0],
v[1]
));
});
});
Range
Generates a Range of values. Useful for testing purposes.Range - Prints from 1 to 10.
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
Generate
There are several overloads for Generate.Generate - simple
A simple use is to replicate Interval but have the sequence stop.internal class Generate_Simple
{
private static void Main()
{
var observable =
Observable.Generate(1, x => x < 6, x => x + 1, x => x,
x=>TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(x => Console.WriteLine("{0}, {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
1: 24/11/2009 15:40:45 (after 1s)
2: 24/11/2009 15:40:46 (after 2s)
3: 24/11/2009 15:40:47 (after 3s)
4: 24/11/2009 15:40:48 (after 4s)
5: 24/11/2009 15:40:49 (after 5s)
ISubject<T> and ISubject<T1, T2>
There are several implementations for ISubject.Ping Pong Actor Model with ISubject<T1, T2>
using System;
using System.Collections.Generic;
using System.Linq;
namespace RxPingPong
{
/// <summary>Simple Ping Pong Actor model using Rx </summary>
/// <remarks>
/// You'll need to install the Reactive Extensions (Rx) for this to work.
/// You can get the installer from <see href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx"/>
/// </remarks>
class Program
{
static void Main(string[] args)
{
var ping = new Ping();
var pong = new Pong();
Console.WriteLine("Press any key to stop ...");
var pongSubscription = ping.Subscribe(pong);
var pingSubscription = pong.Subscribe(ping);
Console.ReadKey();
pongSubscription.Dispose();
pingSubscription.Dispose();
Console.WriteLine("Ping Pong has completed.");
}
}
class Ping : ISubject<Pong, Ping>
{
#region Implementation of IObserver<Pong>
/// <summary>
/// Notifies the observer of a new value in the sequence.
/// </summary>
public void OnNext(Pong value)
{
Console.WriteLine("Ping received Pong.");
}
/// <summary>
/// Notifies the observer that an exception has occurred.
/// </summary>
public void OnError(Exception exception)
{
Console.WriteLine("Ping experienced an exception and had to quit playing.");
}
/// <summary>
/// Notifies the observer of the end of the sequence.
/// </summary>
public void OnCompleted()
{
Console.WriteLine("Ping finished.");
}
#endregion
#region Implementation of IObservable<Ping>
/// <summary>
/// Subscribes an observer to the observable sequence.
/// </summary>
public IDisposable Subscribe(IObserver<Ping> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(2))
.Where(n => n < 10)
.Select(n => this)
.Subscribe(observer);
}
#endregion
#region Implementation of IDisposable
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
OnCompleted();
}
#endregion
}
class Pong : ISubject<Ping, Pong>
{
#region Implementation of IObserver<Ping>
/// <summary>
/// Notifies the observer of a new value in the sequence.
/// </summary>
public void OnNext(Ping value)
{
Console.WriteLine("Pong received Ping.");
}
/// <summary>
/// Notifies the observer that an exception has occurred.
/// </summary>
public void OnError(Exception exception)
{
Console.WriteLine("Pong experienced an exception and had to quit playing.");
}
/// <summary>
/// Notifies the observer of the end of the sequence.
/// </summary>
public void OnCompleted()
{
Console.WriteLine("Pong finished.");
}
#endregion
#region Implementation of IObservable<Pong>
/// <summary>
/// Subscribes an observer to the observable sequence.
/// </summary>
public IDisposable Subscribe(IObserver<Pong> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1.5))
.Where(n => n < 10)
.Select(n => this)
.Subscribe(observer);
}
#endregion
#region Implementation of IDisposable
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
OnCompleted();
}
#endregion
}
}
Result
1: Ping received Pong.
2: Pong received Ping.
3: Ping received Pong.
4: Pong received Ping.
5: Ping received Pong.
Combination Operators
Merge
The Merge operator combine two or more sequences. In the following example, the two streams are merged into one so that both are printed with one subscription. Also note the use of "using" to wrap the Observable, thus ensuring the subscription is Disposed.class Merge
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {1, 2, 2, 2, 2}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {2, 2, 2, 2, 2}); }
}
private static IObservable<int> Generate(int initialValue, IList<int> intervals)
{
// work-around for Observable.Generate calling timeInterval before resultSelector
intervals.Add(0);
return Observable.Generate(initialValue,
x => x < initialValue + intervals.Count - 1,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(intervals[x - initialValue]));
}
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.Merge(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:44
100: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
101: 11/12/2009 12:17:47
2: 11/12/2009 12:17:48
102: 11/12/2009 12:17:49
3: 11/12/2009 12:17:50
103: 11/12/2009 12:17:51
4: 11/12/2009 12:17:52
104: 11/12/2009 12:17:53
Publish - Sharing a subscription with multiple Observers
class Publish
{
private static void Main()
{
var unshared = Observable.Range(1, 4);
// Each subscription starts a new sequence
unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #1: " + i));
unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #2: " + i));
Console.WriteLine();
// By using publish the subscriptions are shared, but the sequence doesn't start until Connect() is called.
var shared = unshared.Publish();
shared.Subscribe(i => Console.WriteLine("Shared Subscription #1: " + i));
shared.Subscribe(i => Console.WriteLine("Shared Subscription #2: " + i));
shared.Connect();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
Unshared Subscription #1: 1
Unshared Subscription #1: 2
Unshared Subscription #1: 3
Unshared Subscription #1: 4
Unshared Subscription #2: 1
Unshared Subscription #2: 2
Unshared Subscription #2: 3
Unshared Subscription #2: 4 Shared Subscription #1: 1
Shared Subscription #2: 1
Shared Subscription #1: 2
Shared Subscription #2: 2
Shared Subscription #1: 3
Shared Subscription #2: 3
Shared Subscription #1: 4
Shared Subscription #2: 4
Zip
class Zip
{
// same code as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.Zip(Ys, (x, y) => x + y).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
100: 11/12/2009 12:17:45
102: 11/12/2009 12:17:47
104: 11/12/2009 12:17:49
106: 11/12/2009 12:17:51
108: 11/12/2009 12:17:53
CombineLatest
class CombineLatest
{
// same code as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.CombineLatest(Ys, (x, y) => x + y).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
100: 11/12/2009 12:17:45
101: 11/12/2009 12:17:46
102: 11/12/2009 12:17:47
103: 11/12/2009 12:17:48
104: 11/12/2009 12:17:49
105: 11/12/2009 12:17:50
106: 11/12/2009 12:17:51
107: 11/12/2009 12:17:52
108: 11/12/2009 12:17:53
Concat - cold observable
class ConcatCold
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {0, 1, 1}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {1, 1, 1}); }
}
// same Generate() method as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
Console.WriteLine(DateTime.Now);
using (Xs.Concat(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
100: 11/12/2009 12:17:48
101: 11/12/2009 12:17:49
102: 11/12/2009 12:17:50
Concat - hot observable
class ConcatHot
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {0, 1, 1}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {1, 1, 1}).Publish(); }
}
// same Generate() method as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
Console.WriteLine(DateTime.Now);
using (Xs.Concat(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
102: 11/12/2009 12:17:48
Make your class native to IObservable<T>
If you are about to build new system, you could consider using just IObservable<T>.Use Subject<T> as backend for IObservable<T>
class UseSubject
{
public class Order
{
private DateTime? _paidDate;
private readonly Subject<Order> _paidSubj = new Subject<Order>();
public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }
public void MarkPaid(DateTime paidDate)
{
_paidDate = paidDate;
_paidSubj.OnNext(this); // Raise PAID event
}
}
private static void Main()
{
var order = new Order();
order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe
order.MarkPaid(DateTime.Now);
}
}
No comments:
Post a Comment