diff --git a/Sharp7.Rx.Tests/S7ValueConverterTests.cs b/Sharp7.Rx.Tests/S7ValueConverterTests.cs index a1eb657..032877d 100644 --- a/Sharp7.Rx.Tests/S7ValueConverterTests.cs +++ b/Sharp7.Rx.Tests/S7ValueConverterTests.cs @@ -1,5 +1,4 @@ -using System; -using NUnit.Framework; +using NUnit.Framework; using Sharp7.Rx.Interfaces; using Shouldly; @@ -64,4 +63,4 @@ public class S7ValueConverterTests //Act Should.Throw(() => S7ValueConverter.ConvertToType(data, variableAddress)); } -} \ No newline at end of file +} diff --git a/Sharp7.Rx.Tests/S7VariableNameParserTests.cs b/Sharp7.Rx.Tests/S7VariableNameParserTests.cs index b97641f..a5fa263 100644 --- a/Sharp7.Rx.Tests/S7VariableNameParserTests.cs +++ b/Sharp7.Rx.Tests/S7VariableNameParserTests.cs @@ -1,5 +1,4 @@ -using System.Collections.Generic; -using DeepEqual.Syntax; +using DeepEqual.Syntax; using NUnit.Framework; using Sharp7.Rx.Enums; @@ -39,4 +38,4 @@ internal class S7VariableNameParserTests { public override string ToString() => Input; } -} \ No newline at end of file +} diff --git a/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs b/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs index d831707..3ea9ef0 100644 --- a/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs +++ b/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs @@ -1,128 +1,125 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; -namespace Sharp7.Rx.Basics +namespace Sharp7.Rx.Basics; + +internal class ConcurrentSubjectDictionary : IDisposable { - internal class ConcurrentSubjectDictionary : IDisposable + private readonly object dictionaryLock = new object(); + private readonly Func valueFactory; + private ConcurrentDictionary dictionary; + + public ConcurrentSubjectDictionary() { - private readonly object dictionaryLock = new object(); - private readonly Func valueFactory; - private ConcurrentDictionary dictionary; + dictionary = new ConcurrentDictionary(); + } - public ConcurrentSubjectDictionary() + public ConcurrentSubjectDictionary(IEqualityComparer comparer) + { + dictionary = new ConcurrentDictionary(comparer); + } + + public ConcurrentSubjectDictionary(TValue initialValue, IEqualityComparer comparer) + { + valueFactory = _ => initialValue; + dictionary = new ConcurrentDictionary(comparer); + } + + public ConcurrentSubjectDictionary(TValue initialValue) + { + valueFactory = _ => initialValue; + dictionary = new ConcurrentDictionary(); + } + + public ConcurrentSubjectDictionary(Func valueFactory = null) + { + this.valueFactory = valueFactory; + dictionary = new ConcurrentDictionary(); + } + + public IEnumerable ExistingKeys => dictionary.Keys; + + public bool IsDisposed { get; private set; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public DisposableItem GetOrCreateObservable(TKey key) + { + lock (dictionaryLock) { - dictionary = new ConcurrentDictionary(); - } - - public ConcurrentSubjectDictionary(IEqualityComparer comparer) - { - dictionary = new ConcurrentDictionary(comparer); - } - - public ConcurrentSubjectDictionary(TValue initialValue, IEqualityComparer comparer) - { - valueFactory = _ => initialValue; - dictionary = new ConcurrentDictionary(comparer); - } - - public ConcurrentSubjectDictionary(TValue initialValue) - { - valueFactory = _ => initialValue; - dictionary = new ConcurrentDictionary(); - } - - public ConcurrentSubjectDictionary(Func valueFactory = null) - { - this.valueFactory = valueFactory; - dictionary = new ConcurrentDictionary(); - } - - public IEnumerable ExistingKeys => dictionary.Keys; - - public bool IsDisposed { get; private set; } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public DisposableItem GetOrCreateObservable(TKey key) - { - lock (dictionaryLock) + var subject = dictionary.AddOrUpdate(key, k => new SubjectWithRefCounter {Counter = 1, Subject = CreateSubject(k)}, (key1, counter) => { - var subject = dictionary.AddOrUpdate(key, k => new SubjectWithRefCounter {Counter = 1, Subject = CreateSubject(k)}, (key1, counter) => - { - counter.Counter = counter.Counter + 1; - return counter; - }); + counter.Counter = counter.Counter + 1; + return counter; + }); - return new DisposableItem(subject.Subject.AsObservable(), () => RemoveIfNoLongerInUse(key)); - } - } - - public bool TryGetObserver(TKey key, out IObserver subject) - { - SubjectWithRefCounter subjectWithRefCount; - if (dictionary.TryGetValue(key, out subjectWithRefCount)) - { - subject = subjectWithRefCount.Subject.AsObserver(); - return true; - } - - subject = null; - return false; - } - - protected virtual void Dispose(bool disposing) - { - if (IsDisposed) - return; - if (disposing && dictionary != null) - { - foreach (var subjectWithRefCounter in dictionary) - subjectWithRefCounter.Value.Subject.OnCompleted(); - dictionary.Clear(); - dictionary = null; - } - - IsDisposed = true; - } - - private ISubject CreateSubject(TKey key) - { - if (valueFactory == null) - return new Subject(); - return new BehaviorSubject(valueFactory(key)); - } - - private void RemoveIfNoLongerInUse(TKey variableName) - { - lock (dictionaryLock) - { - SubjectWithRefCounter subjectWithRefCount; - if (dictionary.TryGetValue(variableName, out subjectWithRefCount)) - { - if (subjectWithRefCount.Counter == 1) - dictionary.TryRemove(variableName, out subjectWithRefCount); - else subjectWithRefCount.Counter--; - } - } - } - - ~ConcurrentSubjectDictionary() - { - Dispose(false); - } - - class SubjectWithRefCounter - { - public int Counter { get; set; } - public ISubject Subject { get; set; } + return new DisposableItem(subject.Subject.AsObservable(), () => RemoveIfNoLongerInUse(key)); } } -} \ No newline at end of file + + public bool TryGetObserver(TKey key, out IObserver subject) + { + SubjectWithRefCounter subjectWithRefCount; + if (dictionary.TryGetValue(key, out subjectWithRefCount)) + { + subject = subjectWithRefCount.Subject.AsObserver(); + return true; + } + + subject = null; + return false; + } + + protected virtual void Dispose(bool disposing) + { + if (IsDisposed) + return; + if (disposing && dictionary != null) + { + foreach (var subjectWithRefCounter in dictionary) + subjectWithRefCounter.Value.Subject.OnCompleted(); + dictionary.Clear(); + dictionary = null; + } + + IsDisposed = true; + } + + private ISubject CreateSubject(TKey key) + { + if (valueFactory == null) + return new Subject(); + return new BehaviorSubject(valueFactory(key)); + } + + private void RemoveIfNoLongerInUse(TKey variableName) + { + lock (dictionaryLock) + { + SubjectWithRefCounter subjectWithRefCount; + if (dictionary.TryGetValue(variableName, out subjectWithRefCount)) + { + if (subjectWithRefCount.Counter == 1) + dictionary.TryRemove(variableName, out subjectWithRefCount); + else subjectWithRefCount.Counter--; + } + } + } + + ~ConcurrentSubjectDictionary() + { + Dispose(false); + } + + class SubjectWithRefCounter + { + public int Counter { get; set; } + public ISubject Subject { get; set; } + } +} diff --git a/Sharp7.Rx/Basics/DisposableItem.cs b/Sharp7.Rx/Basics/DisposableItem.cs index 29108a4..dd52a88 100644 --- a/Sharp7.Rx/Basics/DisposableItem.cs +++ b/Sharp7.Rx/Basics/DisposableItem.cs @@ -1,37 +1,34 @@ -using System; +namespace Sharp7.Rx.Basics; -namespace Sharp7.Rx.Basics +internal class DisposableItem : IDisposable { - internal class DisposableItem : IDisposable + private readonly Action disposeAction; + + bool disposed; + + public DisposableItem(IObservable observable, Action disposeAction) { - private readonly Action disposeAction; - - bool disposed; - - public DisposableItem(IObservable observable, Action disposeAction) - { - this.disposeAction = disposeAction; - Observable = observable; - } - - public IObservable Observable { get; } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposed) return; - - if (disposing) - { - disposeAction(); - } - - disposed = true; - } + this.disposeAction = disposeAction; + Observable = observable; } -} \ No newline at end of file + + public IObservable Observable { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposed) return; + + if (disposing) + { + disposeAction(); + } + + disposed = true; + } +} diff --git a/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs b/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs index ac1754d..98c24c6 100644 --- a/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs +++ b/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs @@ -1,146 +1,139 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; +namespace Sharp7.Rx.Basics; -namespace Sharp7.Rx.Basics +/// +/// Provides a task scheduler that ensures a maximum concurrency level while +/// running on top of the ThreadPool. +/// from http://msdn.microsoft.com/en-us/library/ee789351.aspx +/// +internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler { + /// Whether the current thread is processing work items. + [ThreadStatic] private static bool currentThreadIsProcessingItems; + + /// The maximum concurrency level allowed by this scheduler. + private readonly int maxDegreeOfParallelism; + + /// The list of tasks to be executed. + private readonly LinkedList tasks = new LinkedList(); // protected by lock(_tasks) + + /// Whether the scheduler is currently processing work items. + private int delegatesQueuedOrRunning; // protected by lock(_tasks) + /// - /// Provides a task scheduler that ensures a maximum concurrency level while - /// running on top of the ThreadPool. - /// from http://msdn.microsoft.com/en-us/library/ee789351.aspx + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. /// - internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + /// The maximum degree of parallelism provided by this scheduler. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { - /// Whether the current thread is processing work items. - [ThreadStatic] private static bool currentThreadIsProcessingItems; + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + this.maxDegreeOfParallelism = maxDegreeOfParallelism; + } - /// The maximum concurrency level allowed by this scheduler. - private readonly int maxDegreeOfParallelism; + /// Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel => maxDegreeOfParallelism; - /// The list of tasks to be executed. - private readonly LinkedList tasks = new LinkedList(); // protected by lock(_tasks) - - /// Whether the scheduler is currently processing work items. - private int delegatesQueuedOrRunning; // protected by lock(_tasks) - - /// - /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the - /// specified degree of parallelism. - /// - /// The maximum degree of parallelism provided by this scheduler. - public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// An enumerable of the tasks currently scheduled. + protected sealed override IEnumerable GetScheduledTasks() + { + var lockTaken = false; + try { - if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); - this.maxDegreeOfParallelism = maxDegreeOfParallelism; + Monitor.TryEnter(tasks, ref lockTaken); + if (lockTaken) return tasks.ToArray(); + else throw new NotSupportedException(); } - - /// Gets the maximum concurrency level supported by this scheduler. - public sealed override int MaximumConcurrencyLevel => maxDegreeOfParallelism; - - /// Gets an enumerable of the tasks currently scheduled on this scheduler. - /// An enumerable of the tasks currently scheduled. - protected sealed override IEnumerable GetScheduledTasks() + finally { - var lockTaken = false; - try - { - Monitor.TryEnter(tasks, ref lockTaken); - if (lockTaken) return tasks.ToArray(); - else throw new NotSupportedException(); - } - finally - { - if (lockTaken) Monitor.Exit(tasks); - } - } - - /// Queues a task to the scheduler. - /// The task to be queued. - protected sealed override void QueueTask(Task task) - { - // Add the task to the list of tasks to be processed. If there aren't enough - // delegates currently queued or running to process tasks, schedule another. - lock (tasks) - { - tasks.AddLast(task); - if (delegatesQueuedOrRunning < maxDegreeOfParallelism) - { - ++delegatesQueuedOrRunning; - NotifyThreadPoolOfPendingWork(); - } - } - } - - /// Attempts to remove a previously scheduled task from the scheduler. - /// The task to be removed. - /// Whether the task could be found and removed. - protected sealed override bool TryDequeue(Task task) - { - lock (tasks) - { - return tasks.Remove(task); - } - } - - /// Attempts to execute the specified task on the current thread. - /// The task to be executed. - /// - /// Whether the task could be executed on the current thread. - protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) - { - // If this thread isn't already processing a task, we don't support inlining - if (!currentThreadIsProcessingItems) return false; - - // If the task was previously queued, remove it from the queue - if (taskWasPreviouslyQueued) TryDequeue(task); - - // Try to run the task. - return TryExecuteTask(task); - } - - /// - /// Informs the ThreadPool that there's work to be executed for this scheduler. - /// - private void NotifyThreadPoolOfPendingWork() - { - ThreadPool.UnsafeQueueUserWorkItem(_ => - { - // Note that the current thread is now processing work items. - // This is necessary to enable inlining of tasks into this thread. - currentThreadIsProcessingItems = true; - try - { - // Process all available items in the queue. - while (true) - { - Task item; - lock (tasks) - { - // When there are no more items to be processed, - // note that we're done processing, and get out. - if (tasks.Count == 0) - { - --delegatesQueuedOrRunning; - break; - } - - // Get the next item from the queue - item = tasks.First.Value; - tasks.RemoveFirst(); - } - - // Execute the task we pulled out of the queue - TryExecuteTask(item); - } - } - // We're done processing items on the current thread - finally - { - currentThreadIsProcessingItems = false; - } - }, null); + if (lockTaken) Monitor.Exit(tasks); } } -} \ No newline at end of file + + /// Queues a task to the scheduler. + /// The task to be queued. + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (tasks) + { + tasks.AddLast(task); + if (delegatesQueuedOrRunning < maxDegreeOfParallelism) + { + ++delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// Attempts to remove a previously scheduled task from the scheduler. + /// The task to be removed. + /// Whether the task could be found and removed. + protected sealed override bool TryDequeue(Task task) + { + lock (tasks) + { + return tasks.Remove(task); + } + } + + /// Attempts to execute the specified task on the current thread. + /// The task to be executed. + /// + /// Whether the task could be executed on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) TryDequeue(task); + + // Try to run the task. + return TryExecuteTask(task); + } + + /// + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (tasks.Count == 0) + { + --delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = tasks.First.Value; + tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally + { + currentThreadIsProcessingItems = false; + } + }, null); + } +} diff --git a/Sharp7.Rx/CacheVariableNameParser.cs b/Sharp7.Rx/CacheVariableNameParser.cs index ddd258b..d24ef98 100644 --- a/Sharp7.Rx/CacheVariableNameParser.cs +++ b/Sharp7.Rx/CacheVariableNameParser.cs @@ -1,20 +1,18 @@ -using System; -using System.Collections.Concurrent; +using System.Collections.Concurrent; using Sharp7.Rx.Interfaces; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +internal class CacheVariableNameParser : IS7VariableNameParser { - internal class CacheVariableNameParser : IS7VariableNameParser + private static readonly ConcurrentDictionary addressCache = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + private readonly IS7VariableNameParser inner; + + public CacheVariableNameParser(IS7VariableNameParser inner) { - private static readonly ConcurrentDictionary addressCache = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - - private readonly IS7VariableNameParser inner; - - public CacheVariableNameParser(IS7VariableNameParser inner) - { - this.inner = inner; - } - - public S7VariableAddress Parse(string input) => addressCache.GetOrAdd(input, inner.Parse); + this.inner = inner; } -} \ No newline at end of file + + public S7VariableAddress Parse(string input) => addressCache.GetOrAdd(input, inner.Parse); +} diff --git a/Sharp7.Rx/Enums/ConnectionState.cs b/Sharp7.Rx/Enums/ConnectionState.cs index 31e54ac..5cbedc5 100644 --- a/Sharp7.Rx/Enums/ConnectionState.cs +++ b/Sharp7.Rx/Enums/ConnectionState.cs @@ -1,10 +1,9 @@ -namespace Sharp7.Rx.Enums +namespace Sharp7.Rx.Enums; + +public enum ConnectionState { - public enum ConnectionState - { - Initial, - Connected, - DisconnectedByUser, - ConnectionLost - } -} \ No newline at end of file + Initial, + Connected, + DisconnectedByUser, + ConnectionLost +} diff --git a/Sharp7.Rx/Enums/CpuType.cs b/Sharp7.Rx/Enums/CpuType.cs index ad0a679..a923cbc 100644 --- a/Sharp7.Rx/Enums/CpuType.cs +++ b/Sharp7.Rx/Enums/CpuType.cs @@ -1,10 +1,9 @@ -namespace Sharp7.Rx.Enums +namespace Sharp7.Rx.Enums; + +internal enum CpuType { - internal enum CpuType - { - S7_300, - S7_400, - S7_1200, - S7_1500 - } -} \ No newline at end of file + S7_300, + S7_400, + S7_1200, + S7_1500 +} diff --git a/Sharp7.Rx/Enums/DbType.cs b/Sharp7.Rx/Enums/DbType.cs index 8a134ec..07066f8 100644 --- a/Sharp7.Rx/Enums/DbType.cs +++ b/Sharp7.Rx/Enums/DbType.cs @@ -1,13 +1,12 @@ -namespace Sharp7.Rx.Enums +namespace Sharp7.Rx.Enums; + +internal enum DbType { - internal enum DbType - { - Bit, - String, - Byte, - Double, - Integer, - DInteger, - ULong - } -} \ No newline at end of file + Bit, + String, + Byte, + Double, + Integer, + DInteger, + ULong +} diff --git a/Sharp7.Rx/Enums/Operand.cs b/Sharp7.Rx/Enums/Operand.cs index 4f6a687..6ad4970 100644 --- a/Sharp7.Rx/Enums/Operand.cs +++ b/Sharp7.Rx/Enums/Operand.cs @@ -1,10 +1,9 @@ -namespace Sharp7.Rx.Enums +namespace Sharp7.Rx.Enums; + +internal enum Operand : byte { - internal enum Operand : byte - { - Input = 69, - Output = 65, - Marker = 77, - Db = 68, - } -} \ No newline at end of file + Input = 69, + Output = 65, + Marker = 77, + Db = 68, +} diff --git a/Sharp7.Rx/Enums/TransmissionMode.cs b/Sharp7.Rx/Enums/TransmissionMode.cs index c33ef62..5ba9dc2 100644 --- a/Sharp7.Rx/Enums/TransmissionMode.cs +++ b/Sharp7.Rx/Enums/TransmissionMode.cs @@ -1,8 +1,7 @@ -namespace Sharp7.Rx.Enums +namespace Sharp7.Rx.Enums; + +public enum TransmissionMode { - public enum TransmissionMode - { - Cyclic = 3, - OnChange = 4, - } -} \ No newline at end of file + Cyclic = 3, + OnChange = 4, +} diff --git a/Sharp7.Rx/Extensions/DisposableExtensions.cs b/Sharp7.Rx/Extensions/DisposableExtensions.cs index 6f68c0f..5638168 100644 --- a/Sharp7.Rx/Extensions/DisposableExtensions.cs +++ b/Sharp7.Rx/Extensions/DisposableExtensions.cs @@ -1,13 +1,11 @@ -using System; -using System.Reactive.Disposables; +using System.Reactive.Disposables; -namespace Sharp7.Rx.Extensions +namespace Sharp7.Rx.Extensions; + +internal static class DisposableExtensions { - internal static class DisposableExtensions + public static void AddDisposableTo(this IDisposable disposable, CompositeDisposable compositeDisposable) { - public static void AddDisposableTo(this IDisposable disposable, CompositeDisposable compositeDisposable) - { - compositeDisposable.Add(disposable); - } + compositeDisposable.Add(disposable); } -} \ No newline at end of file +} diff --git a/Sharp7.Rx/Extensions/ObservableExtensions.cs b/Sharp7.Rx/Extensions/ObservableExtensions.cs index 8cdc26e..3c5735b 100644 --- a/Sharp7.Rx/Extensions/ObservableExtensions.cs +++ b/Sharp7.Rx/Extensions/ObservableExtensions.cs @@ -1,83 +1,81 @@ -using System; -using System.Reactive.Concurrency; +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using Microsoft.Extensions.Logging; -namespace Sharp7.Rx.Extensions -{ - internal static class ObservableExtensions - { - public static IObservable DisposeMany(this IObservable source) - { - return Observable.Create(obs => - { - var serialDisposable = new SerialDisposable(); - var subscription = - source.Subscribe( - item => - { - serialDisposable.Disposable = item as IDisposable; - obs.OnNext(item); - }, - obs.OnError, - obs.OnCompleted); - return new CompositeDisposable(serialDisposable, subscription); - }); - } +namespace Sharp7.Rx.Extensions; - public static IObservable LogAndRetry(this IObservable source, ILogger logger, string message) +internal static class ObservableExtensions +{ + public static IObservable DisposeMany(this IObservable source) + { + return Observable.Create(obs => { - return source + var serialDisposable = new SerialDisposable(); + var subscription = + source.Subscribe( + item => + { + serialDisposable.Disposable = item as IDisposable; + obs.OnNext(item); + }, + obs.OnError, + obs.OnCompleted); + return new CompositeDisposable(serialDisposable, subscription); + }); + } + + public static IObservable LogAndRetry(this IObservable source, ILogger logger, string message) + { + return source + .Do( + _ => { }, + ex => logger?.LogError(ex, message)) + .Retry(); + } + + public static IObservable LogAndRetryAfterDelay( + this IObservable source, + ILogger logger, + TimeSpan retryDelay, + string message, + int retryCount = -1, + IScheduler scheduler = null) + { + var sourceLogged = + source .Do( _ => { }, - ex => logger?.LogError(ex, message)) - .Retry(); - } + ex => logger?.LogError(ex, message)); - public static IObservable LogAndRetryAfterDelay( - this IObservable source, - ILogger logger, - TimeSpan retryDelay, - string message, - int retryCount = -1, - IScheduler scheduler = null) - { - var sourceLogged = - source - .Do( - _ => { }, - ex => logger?.LogError(ex, message)); - - return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler); - } - - public static IObservable RepeatAfterDelay( - this IObservable source, - TimeSpan retryDelay, - int repeatCount = -1, - IScheduler scheduler = null) - { - return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat); - } - - public static IObservable RetryAfterDelay( - this IObservable source, - TimeSpan retryDelay, - int retryCount = -1, - IScheduler scheduler = null) - { - return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry); - } - - private static IObservable RedoAfterDelay(IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func, IObservable> reDo, - Func, int, IObservable> reDoCount) - { - scheduler = scheduler ?? TaskPoolScheduler.Default; - var attempt = 0; - - var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler))); - return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs); - } + return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler); } -} \ No newline at end of file + + public static IObservable RepeatAfterDelay( + this IObservable source, + TimeSpan retryDelay, + int repeatCount = -1, + IScheduler scheduler = null) + { + return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat); + } + + public static IObservable RetryAfterDelay( + this IObservable source, + TimeSpan retryDelay, + int retryCount = -1, + IScheduler scheduler = null) + { + return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry); + } + + private static IObservable RedoAfterDelay(IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func, IObservable> reDo, + Func, int, IObservable> reDoCount) + { + scheduler = scheduler ?? TaskPoolScheduler.Default; + var attempt = 0; + + var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler))); + return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs); + } +} diff --git a/Sharp7.Rx/Extensions/PlcExtensions.cs b/Sharp7.Rx/Extensions/PlcExtensions.cs index 2cfbc85..b1f1cd2 100644 --- a/Sharp7.Rx/Extensions/PlcExtensions.cs +++ b/Sharp7.Rx/Extensions/PlcExtensions.cs @@ -1,74 +1,71 @@ -using System; -using System.Reactive; +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; -using System.Threading.Tasks; using Sharp7.Rx.Enums; using Sharp7.Rx.Interfaces; -namespace Sharp7.Rx.Extensions +namespace Sharp7.Rx.Extensions; + +public static class PlcExtensions { - public static class PlcExtensions + public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData, bool initialTransfer) { - public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData, bool initialTransfer) + return Observable.Create(async observer => { - return Observable.Create(async observer => + var subscriptions = new CompositeDisposable(); + + var notification = plc + .CreateNotification(triggerAddress, TransmissionMode.OnChange) + .Publish() + .RefCount(); + + if (initialTransfer) { - var subscriptions = new CompositeDisposable(); + await plc.ConnectionState.FirstAsync(state => state == ConnectionState.Connected).ToTask(); + var initialValue = await ReadData(plc, readData); + observer.OnNext(initialValue); + } - var notification = plc - .CreateNotification(triggerAddress, TransmissionMode.OnChange) - .Publish() - .RefCount(); + notification + .Where(trigger => trigger) + .SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) + .Subscribe(observer) + .AddDisposableTo(subscriptions); - if (initialTransfer) + notification + .Where(trigger => !trigger) + .SelectMany(async _ => { - await plc.ConnectionState.FirstAsync(state => state == ConnectionState.Connected).ToTask(); - var initialValue = await ReadData(plc, readData); - observer.OnNext(initialValue); - } + await plc.SetValue(ackTriggerAddress, false); + return Unit.Default; + }) + .Subscribe() + .AddDisposableTo(subscriptions); - notification - .Where(trigger => trigger) - .SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) - .Subscribe(observer) - .AddDisposableTo(subscriptions); + return subscriptions; + }); + } - notification - .Where(trigger => !trigger) - .SelectMany(async _ => - { - await plc.SetValue(ackTriggerAddress, false); - return Unit.Default; - }) - .Subscribe() - .AddDisposableTo(subscriptions); + public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData) + { + return CreateDatatransferWithHandshake(plc, triggerAddress, ackTriggerAddress, readData, false); + } - return subscriptions; - }); - } + private static async Task ReadData(IPlc plc, Func> receiveData) + { + return await receiveData(plc); + } - public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData) + private static async Task ReadDataAndAcknowlodge(IPlc plc, Func> readData, string ackTriggerAddress) + { + try { - return CreateDatatransferWithHandshake(plc, triggerAddress, ackTriggerAddress, readData, false); + return await ReadData(plc, readData); } - - private static async Task ReadData(IPlc plc, Func> receiveData) + finally { - return await receiveData(plc); - } - - private static async Task ReadDataAndAcknowlodge(IPlc plc, Func> readData, string ackTriggerAddress) - { - try - { - return await ReadData(plc, readData); - } - finally - { - await plc.SetValue(ackTriggerAddress, true); - } + await plc.SetValue(ackTriggerAddress, true); } } -} \ No newline at end of file +} diff --git a/Sharp7.Rx/Interfaces/IPlc.cs b/Sharp7.Rx/Interfaces/IPlc.cs index 683aed9..296e830 100644 --- a/Sharp7.Rx/Interfaces/IPlc.cs +++ b/Sharp7.Rx/Interfaces/IPlc.cs @@ -1,18 +1,15 @@ -using System; -using System.Threading.Tasks; -using JetBrains.Annotations; +using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Sharp7.Rx.Enums; -namespace Sharp7.Rx.Interfaces +namespace Sharp7.Rx.Interfaces; + +[NoReorder] +public interface IPlc : IDisposable { - [NoReorder] - public interface IPlc : IDisposable - { - IObservable CreateNotification(string variableName, TransmissionMode transmissionMode); - Task SetValue(string variableName, TValue value); - Task GetValue(string variableName); - IObservable ConnectionState { get; } - ILogger Logger { get; } - } + IObservable CreateNotification(string variableName, TransmissionMode transmissionMode); + Task SetValue(string variableName, TValue value); + Task GetValue(string variableName); + IObservable ConnectionState { get; } + ILogger Logger { get; } } diff --git a/Sharp7.Rx/Interfaces/IS7Connector.cs b/Sharp7.Rx/Interfaces/IS7Connector.cs index 10aa850..5f9da81 100644 --- a/Sharp7.Rx/Interfaces/IS7Connector.cs +++ b/Sharp7.Rx/Interfaces/IS7Connector.cs @@ -1,26 +1,21 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using JetBrains.Annotations; +using JetBrains.Annotations; using Sharp7.Rx.Enums; -namespace Sharp7.Rx.Interfaces +namespace Sharp7.Rx.Interfaces; + +[NoReorder] +internal interface IS7Connector : IDisposable { - [NoReorder] - internal interface IS7Connector : IDisposable - { - IObservable ConnectionState { get; } - Task InitializeAsync(); + IObservable ConnectionState { get; } + Task InitializeAsync(); - Task Connect(); - Task Disconnect(); + Task Connect(); + Task Disconnect(); - Task ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token); + Task ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token); - Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token); - Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token); + Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token); + Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token); - Task> ExecuteMultiVarRequest(IReadOnlyList variableNames); - } -} \ No newline at end of file + Task> ExecuteMultiVarRequest(IReadOnlyList variableNames); +} diff --git a/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs b/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs index dd8e272..07f4b9e 100644 --- a/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs +++ b/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs @@ -1,7 +1,6 @@ -namespace Sharp7.Rx.Interfaces +namespace Sharp7.Rx.Interfaces; + +internal interface IS7VariableNameParser { - internal interface IS7VariableNameParser - { - S7VariableAddress Parse(string input); - } -} \ No newline at end of file + S7VariableAddress Parse(string input); +} diff --git a/Sharp7.Rx/S7ErrorCodes.cs b/Sharp7.Rx/S7ErrorCodes.cs index db1b5eb..f076bf8 100644 --- a/Sharp7.Rx/S7ErrorCodes.cs +++ b/Sharp7.Rx/S7ErrorCodes.cs @@ -1,27 +1,24 @@ -using System.Collections.Generic; +namespace Sharp7.Rx; -namespace Sharp7.Rx +public static class S7ErrorCodes { - public static class S7ErrorCodes + /// + /// This list is not exhaustive and should be considered work in progress. + /// + private static readonly HashSet notDisconnectedErrorCodes = new HashSet { - /// - /// This list is not exhaustive and should be considered work in progress. - /// - private static readonly HashSet notDisconnectedErrorCodes = new HashSet - { - 0x000000, // OK - 0xC00000, // CPU: Item not available - 0x900000, // CPU: Address out of range - }; + 0x000000, // OK + 0xC00000, // CPU: Item not available + 0x900000, // CPU: Address out of range + }; - /// - /// Some error codes indicate connection lost, in which case, the driver tries to reestablish connection. - /// Other error codes indicate a user error, like reading from an unavailable DB or exceeding - /// the DBs range. In this case the driver should not consider the connection to be lost. - /// - public static bool AssumeConnectionLost(int errorCode) - { - return !notDisconnectedErrorCodes.Contains(errorCode); - } + /// + /// Some error codes indicate connection lost, in which case, the driver tries to reestablish connection. + /// Other error codes indicate a user error, like reading from an unavailable DB or exceeding + /// the DBs range. In this case the driver should not consider the connection to be lost. + /// + public static bool AssumeConnectionLost(int errorCode) + { + return !notDisconnectedErrorCodes.Contains(errorCode); } -} \ No newline at end of file +} diff --git a/Sharp7.Rx/S7ValueConverter.cs b/Sharp7.Rx/S7ValueConverter.cs index fb6f985..26b86c5 100644 --- a/Sharp7.Rx/S7ValueConverter.cs +++ b/Sharp7.Rx/S7ValueConverter.cs @@ -1,85 +1,83 @@ -using System; -using System.Buffers.Binary; +using System.Buffers.Binary; using System.Runtime.InteropServices; using System.Text; using Sharp7.Rx.Enums; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +internal static class S7ValueConverter { - internal static class S7ValueConverter + public static TValue ConvertToType(byte[] buffer, S7VariableAddress address) { - public static TValue ConvertToType(byte[] buffer, S7VariableAddress address) + if (typeof(TValue) == typeof(bool)) + return (TValue) (object) (((buffer[0] >> address.Bit) & 1) > 0); + + if (typeof(TValue) == typeof(int)) { - if (typeof(TValue) == typeof(bool)) - return (TValue) (object) (((buffer[0] >> address.Bit) & 1) > 0); + if (address.Length == 2) + return (TValue) (object) (int) BinaryPrimitives.ReadInt16BigEndian(buffer); + if (address.Length == 4) + return (TValue) (object) BinaryPrimitives.ReadInt32BigEndian(buffer); - if (typeof(TValue) == typeof(int)) - { - if (address.Length == 2) - return (TValue) (object) (int) BinaryPrimitives.ReadInt16BigEndian(buffer); - if (address.Length == 4) - return (TValue) (object) BinaryPrimitives.ReadInt32BigEndian(buffer); - - throw new InvalidOperationException($"length must be 2 or 4 but is {address.Length}"); - } - - if (typeof(TValue) == typeof(long)) - return (TValue) (object) BinaryPrimitives.ReadInt64BigEndian(buffer); - - if (typeof(TValue) == typeof(ulong)) - return (TValue) (object) BinaryPrimitives.ReadUInt64BigEndian(buffer); - - if (typeof(TValue) == typeof(short)) - return (TValue) (object) BinaryPrimitives.ReadInt16BigEndian(buffer); - - if (typeof(TValue) == typeof(byte)) - return (TValue) (object) buffer[0]; - if (typeof(TValue) == typeof(char)) - return (TValue) (object) (char) buffer[0]; - - if (typeof(TValue) == typeof(byte[])) - return (TValue) (object) buffer; - - if (typeof(TValue) == typeof(double)) - { - var d = new UInt32SingleMap - { - UInt32 = BinaryPrimitives.ReadUInt32BigEndian(buffer) - }; - return (TValue) (object) (double) d.Single; - } - - if (typeof(TValue) == typeof(float)) - { - var d = new UInt32SingleMap - { - UInt32 = BinaryPrimitives.ReadUInt32BigEndian(buffer) - }; - return (TValue) (object) d.Single; - } - - if (typeof(TValue) == typeof(string)) - if (address.Type == DbType.String) - { - // First byte is maximal length - // Second byte is actual length - // https://cache.industry.siemens.com/dl/files/480/22506480/att_105176/v1/s7_scl_string_parameterzuweisung_e.pdf - - var length = Math.Min(address.Length, buffer[1]); - - return (TValue) (object) Encoding.ASCII.GetString(buffer, 2, length); - } - else - return (TValue) (object) Encoding.ASCII.GetString(buffer).Trim(); - - throw new InvalidOperationException(string.Format("type '{0}' not supported.", typeof(TValue))); + throw new InvalidOperationException($"length must be 2 or 4 but is {address.Length}"); } - [StructLayout(LayoutKind.Explicit)] - private struct UInt32SingleMap + if (typeof(TValue) == typeof(long)) + return (TValue) (object) BinaryPrimitives.ReadInt64BigEndian(buffer); + + if (typeof(TValue) == typeof(ulong)) + return (TValue) (object) BinaryPrimitives.ReadUInt64BigEndian(buffer); + + if (typeof(TValue) == typeof(short)) + return (TValue) (object) BinaryPrimitives.ReadInt16BigEndian(buffer); + + if (typeof(TValue) == typeof(byte)) + return (TValue) (object) buffer[0]; + if (typeof(TValue) == typeof(char)) + return (TValue) (object) (char) buffer[0]; + + if (typeof(TValue) == typeof(byte[])) + return (TValue) (object) buffer; + + if (typeof(TValue) == typeof(double)) { - [FieldOffset(0)] public uint UInt32; - [FieldOffset(0)] public float Single; + var d = new UInt32SingleMap + { + UInt32 = BinaryPrimitives.ReadUInt32BigEndian(buffer) + }; + return (TValue) (object) (double) d.Single; } + + if (typeof(TValue) == typeof(float)) + { + var d = new UInt32SingleMap + { + UInt32 = BinaryPrimitives.ReadUInt32BigEndian(buffer) + }; + return (TValue) (object) d.Single; + } + + if (typeof(TValue) == typeof(string)) + if (address.Type == DbType.String) + { + // First byte is maximal length + // Second byte is actual length + // https://cache.industry.siemens.com/dl/files/480/22506480/att_105176/v1/s7_scl_string_parameterzuweisung_e.pdf + + var length = Math.Min(address.Length, buffer[1]); + + return (TValue) (object) Encoding.ASCII.GetString(buffer, 2, length); + } + else + return (TValue) (object) Encoding.ASCII.GetString(buffer).Trim(); + + throw new InvalidOperationException(string.Format("type '{0}' not supported.", typeof(TValue))); } -} \ No newline at end of file + + [StructLayout(LayoutKind.Explicit)] + private struct UInt32SingleMap + { + [FieldOffset(0)] public uint UInt32; + [FieldOffset(0)] public float Single; + } +} diff --git a/Sharp7.Rx/S7VariableAddress.cs b/Sharp7.Rx/S7VariableAddress.cs index 7c751f7..ae5e440 100644 --- a/Sharp7.Rx/S7VariableAddress.cs +++ b/Sharp7.Rx/S7VariableAddress.cs @@ -1,16 +1,15 @@ using JetBrains.Annotations; using Sharp7.Rx.Enums; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +[NoReorder] +internal class S7VariableAddress { - [NoReorder] - internal class S7VariableAddress - { - public Operand Operand { get; set; } - public ushort DbNr { get; set; } - public ushort Start { get; set; } - public ushort Length { get; set; } - public byte Bit { get; set; } - public DbType Type { get; set; } - } -} \ No newline at end of file + public Operand Operand { get; set; } + public ushort DbNr { get; set; } + public ushort Start { get; set; } + public ushort Length { get; set; } + public byte Bit { get; set; } + public DbType Type { get; set; } +} diff --git a/Sharp7.Rx/S7VariableNameParser.cs b/Sharp7.Rx/S7VariableNameParser.cs index df34b3a..80e9913 100644 --- a/Sharp7.Rx/S7VariableNameParser.cs +++ b/Sharp7.Rx/S7VariableNameParser.cs @@ -1,84 +1,81 @@ -using System; -using System.Collections.Generic; -using System.Globalization; +using System.Globalization; using System.Text.RegularExpressions; using Sharp7.Rx.Enums; using Sharp7.Rx.Interfaces; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +internal class S7VariableNameParser : IS7VariableNameParser { - internal class S7VariableNameParser : IS7VariableNameParser + private static readonly Regex regex = new Regex(@"^(?db{1})(?\d{1,4})\.?(?dbx|x|s|string|b|dbb|d|int|dbw|w|dint|dul|dulint|dulong|){1}(?\d+)(\.(?\d+))?$", RegexOptions.IgnoreCase | RegexOptions.Compiled | RegexOptions.CultureInvariant); + + private static readonly IReadOnlyDictionary types = new Dictionary(StringComparer.OrdinalIgnoreCase) { - private static readonly Regex regex = new Regex(@"^(?db{1})(?\d{1,4})\.?(?dbx|x|s|string|b|dbb|d|int|dbw|w|dint|dul|dulint|dulong|){1}(?\d+)(\.(?\d+))?$", RegexOptions.IgnoreCase | RegexOptions.Compiled | RegexOptions.CultureInvariant); + {"x", DbType.Bit}, + {"dbx", DbType.Bit}, + {"s", DbType.String}, + {"string", DbType.String}, + {"b", DbType.Byte}, + {"dbb", DbType.Byte}, + {"d", DbType.Double}, + {"int", DbType.Integer}, + {"dint", DbType.DInteger}, + {"w", DbType.Integer}, + {"dbw", DbType.Integer}, + {"dul", DbType.ULong}, + {"dulint", DbType.ULong}, + {"dulong", DbType.ULong} + }; - private static readonly IReadOnlyDictionary types = new Dictionary(StringComparer.OrdinalIgnoreCase) + public S7VariableAddress Parse(string input) + { + var match = regex.Match(input); + if (match.Success) { - {"x", DbType.Bit}, - {"dbx", DbType.Bit}, - {"s", DbType.String}, - {"string", DbType.String}, - {"b", DbType.Byte}, - {"dbb", DbType.Byte}, - {"d", DbType.Double}, - {"int", DbType.Integer}, - {"dint", DbType.DInteger}, - {"w", DbType.Integer}, - {"dbw", DbType.Integer}, - {"dul", DbType.ULong}, - {"dulint", DbType.ULong}, - {"dulong", DbType.ULong} - }; + var operand = (Operand) Enum.Parse(typeof(Operand), match.Groups["operand"].Value, true); + var dbNr = ushort.Parse(match.Groups["dbNr"].Value, NumberStyles.Integer, CultureInfo.InvariantCulture); + var start = ushort.Parse(match.Groups["start"].Value, NumberStyles.Integer, CultureInfo.InvariantCulture); + if (!types.TryGetValue(match.Groups["type"].Value, out var type)) + return null; - public S7VariableAddress Parse(string input) - { - var match = regex.Match(input); - if (match.Success) + + var s7VariableAddress = new S7VariableAddress { - var operand = (Operand) Enum.Parse(typeof(Operand), match.Groups["operand"].Value, true); - var dbNr = ushort.Parse(match.Groups["dbNr"].Value, NumberStyles.Integer, CultureInfo.InvariantCulture); - var start = ushort.Parse(match.Groups["start"].Value, NumberStyles.Integer, CultureInfo.InvariantCulture); - if (!types.TryGetValue(match.Groups["type"].Value, out var type)) - return null; + Operand = operand, + DbNr = dbNr, + Start = start, + Type = type, + }; - - var s7VariableAddress = new S7VariableAddress - { - Operand = operand, - DbNr = dbNr, - Start = start, - Type = type, - }; - - switch (type) - { - case DbType.Bit: - s7VariableAddress.Length = 1; - s7VariableAddress.Bit = byte.Parse(match.Groups["bitOrLength"].Value); - break; - case DbType.Byte: - s7VariableAddress.Length = match.Groups["bitOrLength"].Success ? ushort.Parse(match.Groups["bitOrLength"].Value) : (ushort) 1; - break; - case DbType.String: - s7VariableAddress.Length = match.Groups["bitOrLength"].Success ? ushort.Parse(match.Groups["bitOrLength"].Value) : (ushort) 0; - break; - case DbType.Integer: - s7VariableAddress.Length = 2; - break; - case DbType.DInteger: - s7VariableAddress.Length = 4; - break; - case DbType.ULong: - s7VariableAddress.Length = 8; - break; - case DbType.Double: - s7VariableAddress.Length = 4; - break; - } - - return s7VariableAddress; + switch (type) + { + case DbType.Bit: + s7VariableAddress.Length = 1; + s7VariableAddress.Bit = byte.Parse(match.Groups["bitOrLength"].Value); + break; + case DbType.Byte: + s7VariableAddress.Length = match.Groups["bitOrLength"].Success ? ushort.Parse(match.Groups["bitOrLength"].Value) : (ushort) 1; + break; + case DbType.String: + s7VariableAddress.Length = match.Groups["bitOrLength"].Success ? ushort.Parse(match.Groups["bitOrLength"].Value) : (ushort) 0; + break; + case DbType.Integer: + s7VariableAddress.Length = 2; + break; + case DbType.DInteger: + s7VariableAddress.Length = 4; + break; + case DbType.ULong: + s7VariableAddress.Length = 8; + break; + case DbType.Double: + s7VariableAddress.Length = 4; + break; } - return null; + return s7VariableAddress; } + + return null; } -} \ No newline at end of file +} diff --git a/Sharp7.Rx/Settings/PlcConnectionSettings.cs b/Sharp7.Rx/Settings/PlcConnectionSettings.cs index 8adffbf..6ec2e0b 100644 --- a/Sharp7.Rx/Settings/PlcConnectionSettings.cs +++ b/Sharp7.Rx/Settings/PlcConnectionSettings.cs @@ -1,10 +1,9 @@ -namespace Sharp7.Rx.Settings +namespace Sharp7.Rx.Settings; + +internal class PlcConnectionSettings { - internal class PlcConnectionSettings - { - public int CpuMpiAddress { get; set; } - public string IpAddress { get; set; } - public int Port { get; set; } - public int RackNumber { get; set; } - } -} \ No newline at end of file + public int CpuMpiAddress { get; set; } + public string IpAddress { get; set; } + public int Port { get; set; } + public int RackNumber { get; set; } +} diff --git a/Sharp7.Rx/Sharp7Connector.cs b/Sharp7.Rx/Sharp7Connector.cs index 7adfa2e..e8a078a 100644 --- a/Sharp7.Rx/Sharp7Connector.cs +++ b/Sharp7.Rx/Sharp7Connector.cs @@ -1,11 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reactive.Disposables; +using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Sharp7.Rx.Basics; using Sharp7.Rx.Enums; @@ -14,266 +9,265 @@ using Sharp7.Rx.Interfaces; using Sharp7.Rx.Resources; using Sharp7.Rx.Settings; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +internal class Sharp7Connector : IS7Connector { - internal class Sharp7Connector : IS7Connector + private readonly BehaviorSubject connectionStateSubject = new BehaviorSubject(Enums.ConnectionState.Initial); + private readonly int cpuSlotNr; + + private readonly CompositeDisposable disposables = new CompositeDisposable(); + private readonly string ipAddress; + private readonly int port; + private readonly int rackNr; + private readonly LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 1); + private readonly IS7VariableNameParser variableNameParser; + private bool disposed; + + private S7Client sharp7; + + + public Sharp7Connector(PlcConnectionSettings settings, IS7VariableNameParser variableNameParser) { - private readonly BehaviorSubject connectionStateSubject = new BehaviorSubject(Enums.ConnectionState.Initial); - private readonly int cpuSlotNr; + this.variableNameParser = variableNameParser; + ipAddress = settings.IpAddress; + cpuSlotNr = settings.CpuMpiAddress; + port = settings.Port; + rackNr = settings.RackNumber; - private readonly CompositeDisposable disposables = new CompositeDisposable(); - private readonly string ipAddress; - private readonly int port; - private readonly int rackNr; - private readonly LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 1); - private readonly IS7VariableNameParser variableNameParser; - private bool disposed; + ReconnectDelay = TimeSpan.FromSeconds(5); + } - private S7Client sharp7; + public IObservable ConnectionState => connectionStateSubject.DistinctUntilChanged().AsObservable(); + public ILogger Logger { get; set; } - public Sharp7Connector(PlcConnectionSettings settings, IS7VariableNameParser variableNameParser) + public TimeSpan ReconnectDelay { get; set; } + + private bool IsConnected => connectionStateSubject.Value == Enums.ConnectionState.Connected; + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async Task Connect() + { + if (sharp7 == null) + throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); + + try { - this.variableNameParser = variableNameParser; - ipAddress = settings.IpAddress; - cpuSlotNr = settings.CpuMpiAddress; - port = settings.Port; - rackNr = settings.RackNumber; - - ReconnectDelay = TimeSpan.FromSeconds(5); - } - - public IObservable ConnectionState => connectionStateSubject.DistinctUntilChanged().AsObservable(); - - public ILogger Logger { get; set; } - - public TimeSpan ReconnectDelay { get; set; } - - private bool IsConnected => connectionStateSubject.Value == Enums.ConnectionState.Connected; - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public async Task Connect() - { - if (sharp7 == null) - throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); - - try + var errorCode = await Task.Factory.StartNew(() => sharp7.ConnectTo(ipAddress, rackNr, cpuSlotNr), CancellationToken.None, TaskCreationOptions.None, scheduler); + var success = EvaluateErrorCode(errorCode); + if (success) { - var errorCode = await Task.Factory.StartNew(() => sharp7.ConnectTo(ipAddress, rackNr, cpuSlotNr), CancellationToken.None, TaskCreationOptions.None, scheduler); - var success = EvaluateErrorCode(errorCode); - if (success) - { - connectionStateSubject.OnNext(Enums.ConnectionState.Connected); - return true; - } - } - catch (Exception ex) - { - // TODO: - } - - return false; - } - - - public async Task Disconnect() - { - connectionStateSubject.OnNext(Enums.ConnectionState.DisconnectedByUser); - await CloseConnection(); - } - - public async Task> ExecuteMultiVarRequest(IReadOnlyList variableNames) - { - if (variableNames.IsEmpty()) - return new Dictionary(); - - var s7MultiVar = new S7MultiVar(sharp7); - - var buffers = variableNames - .Select(key => new {VariableName = key, Address = variableNameParser.Parse(key)}) - .Select(x => - { - var buffer = new byte[x.Address.Length]; - s7MultiVar.Add(S7Consts.S7AreaDB, S7Consts.S7WLByte, x.Address.DbNr, x.Address.Start, x.Address.Length, ref buffer); - return new {x.VariableName, Buffer = buffer}; - }) - .ToArray(); - - var result = await Task.Factory.StartNew(() => s7MultiVar.Read(), CancellationToken.None, TaskCreationOptions.None, scheduler); - if (result != 0) - { - EvaluateErrorCode(result); - throw new InvalidOperationException($"Error in MultiVar request for variables: {string.Join(",", variableNames)}"); - } - - return buffers.ToDictionary(arg => arg.VariableName, arg => arg.Buffer); - } - - public Task InitializeAsync() - { - try - { - sharp7 = new S7Client(); - sharp7.PLCPort = port; - - var subscription = - ConnectionState - .Where(state => state == Enums.ConnectionState.ConnectionLost) - .Take(1) - .SelectMany(_ => Reconnect()) - .RepeatAfterDelay(ReconnectDelay) - .LogAndRetry(Logger, "Error while reconnecting to S7.") - .Subscribe(); - - disposables.Add(subscription); - } - catch (Exception ex) - { - Logger?.LogError(ex, StringResources.StrErrorS7DriverCouldNotBeInitialized); - } - - return Task.FromResult(true); - } - - public async Task ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token) - { - EnsureConnectionValid(); - - var buffer = new byte[bytesToRead]; - - - var result = - await Task.Factory.StartNew(() => sharp7.ReadArea(operand.ToArea(), dBNr, startByteAddress, bytesToRead, S7WordLength.Byte, buffer), token, TaskCreationOptions.None, scheduler); - token.ThrowIfCancellationRequested(); - - if (result != 0) - { - EvaluateErrorCode(result); - var errorText = sharp7.ErrorText(result); - throw new InvalidOperationException($"Error reading {operand}{dBNr}:{startByteAddress}->{bytesToRead} ({errorText})"); - } - - return buffer; - } - - public async Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token) - { - EnsureConnectionValid(); - - var buffer = new[] {value ? (byte) 0xff : (byte) 0}; - - var offsetStart = (startByteAddress * 8) + bitAdress; - - var result = await Task.Factory.StartNew(() => sharp7.WriteArea(operand.ToArea(), dbNr, offsetStart, 1, S7WordLength.Bit, buffer), token, TaskCreationOptions.None, scheduler); - token.ThrowIfCancellationRequested(); - - if (result != 0) - { - EvaluateErrorCode(result); - return (false); - } - - return (true); - } - - public async Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token) - { - EnsureConnectionValid(); - - var result = await Task.Factory.StartNew(() => sharp7.WriteArea(operand.ToArea(), dBNr, startByteAdress, data.Length, S7WordLength.Byte, data), token, TaskCreationOptions.None, scheduler); - token.ThrowIfCancellationRequested(); - - if (result != 0) - { - EvaluateErrorCode(result); - return 0; - } - - return (ushort) (data.Length); - } - - - protected virtual void Dispose(bool disposing) - { - if (!disposed) - { - if (disposing) - { - disposables.Dispose(); - - if (sharp7 != null) - { - sharp7.Disconnect(); - sharp7 = null; - } - - connectionStateSubject?.OnCompleted(); - connectionStateSubject?.Dispose(); - } - - disposed = true; - } - } - - private async Task CloseConnection() - { - if (sharp7 == null) - throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); - - await Task.Factory.StartNew(() => sharp7.Disconnect(), CancellationToken.None, TaskCreationOptions.None, scheduler); - } - - private void EnsureConnectionValid() - { - if (disposed) - throw new ObjectDisposedException("S7Connector"); - - if (sharp7 == null) - throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); - - if (!IsConnected) - throw new InvalidOperationException("Plc is not connected"); - } - - private bool EvaluateErrorCode(int errorCode) - { - if (errorCode == 0) + connectionStateSubject.OnNext(Enums.ConnectionState.Connected); return true; - - if (sharp7 == null) - throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); - - var errorText = sharp7.ErrorText(errorCode); - Logger?.LogError($"Error Code {errorCode} {errorText}"); - - if (S7ErrorCodes.AssumeConnectionLost(errorCode)) - SetConnectionLostState(); - - return false; + } + } + catch (Exception ex) + { + // TODO: } - private async Task Reconnect() - { - await CloseConnection(); + return false; + } - return await Connect(); + + public async Task Disconnect() + { + connectionStateSubject.OnNext(Enums.ConnectionState.DisconnectedByUser); + await CloseConnection(); + } + + public async Task> ExecuteMultiVarRequest(IReadOnlyList variableNames) + { + if (variableNames.IsEmpty()) + return new Dictionary(); + + var s7MultiVar = new S7MultiVar(sharp7); + + var buffers = variableNames + .Select(key => new {VariableName = key, Address = variableNameParser.Parse(key)}) + .Select(x => + { + var buffer = new byte[x.Address.Length]; + s7MultiVar.Add(S7Consts.S7AreaDB, S7Consts.S7WLByte, x.Address.DbNr, x.Address.Start, x.Address.Length, ref buffer); + return new {x.VariableName, Buffer = buffer}; + }) + .ToArray(); + + var result = await Task.Factory.StartNew(() => s7MultiVar.Read(), CancellationToken.None, TaskCreationOptions.None, scheduler); + if (result != 0) + { + EvaluateErrorCode(result); + throw new InvalidOperationException($"Error in MultiVar request for variables: {string.Join(",", variableNames)}"); } - private void SetConnectionLostState() - { - if (connectionStateSubject.Value == Enums.ConnectionState.ConnectionLost) return; + return buffers.ToDictionary(arg => arg.VariableName, arg => arg.Buffer); + } - connectionStateSubject.OnNext(Enums.ConnectionState.ConnectionLost); + public Task InitializeAsync() + { + try + { + sharp7 = new S7Client(); + sharp7.PLCPort = port; + + var subscription = + ConnectionState + .Where(state => state == Enums.ConnectionState.ConnectionLost) + .Take(1) + .SelectMany(_ => Reconnect()) + .RepeatAfterDelay(ReconnectDelay) + .LogAndRetry(Logger, "Error while reconnecting to S7.") + .Subscribe(); + + disposables.Add(subscription); + } + catch (Exception ex) + { + Logger?.LogError(ex, StringResources.StrErrorS7DriverCouldNotBeInitialized); } - ~Sharp7Connector() + return Task.FromResult(true); + } + + public async Task ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token) + { + EnsureConnectionValid(); + + var buffer = new byte[bytesToRead]; + + + var result = + await Task.Factory.StartNew(() => sharp7.ReadArea(operand.ToArea(), dBNr, startByteAddress, bytesToRead, S7WordLength.Byte, buffer), token, TaskCreationOptions.None, scheduler); + token.ThrowIfCancellationRequested(); + + if (result != 0) { - Dispose(false); + EvaluateErrorCode(result); + var errorText = sharp7.ErrorText(result); + throw new InvalidOperationException($"Error reading {operand}{dBNr}:{startByteAddress}->{bytesToRead} ({errorText})"); + } + + return buffer; + } + + public async Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token) + { + EnsureConnectionValid(); + + var buffer = new[] {value ? (byte) 0xff : (byte) 0}; + + var offsetStart = (startByteAddress * 8) + bitAdress; + + var result = await Task.Factory.StartNew(() => sharp7.WriteArea(operand.ToArea(), dbNr, offsetStart, 1, S7WordLength.Bit, buffer), token, TaskCreationOptions.None, scheduler); + token.ThrowIfCancellationRequested(); + + if (result != 0) + { + EvaluateErrorCode(result); + return (false); + } + + return (true); + } + + public async Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token) + { + EnsureConnectionValid(); + + var result = await Task.Factory.StartNew(() => sharp7.WriteArea(operand.ToArea(), dBNr, startByteAdress, data.Length, S7WordLength.Byte, data), token, TaskCreationOptions.None, scheduler); + token.ThrowIfCancellationRequested(); + + if (result != 0) + { + EvaluateErrorCode(result); + return 0; + } + + return (ushort) (data.Length); + } + + + protected virtual void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + disposables.Dispose(); + + if (sharp7 != null) + { + sharp7.Disconnect(); + sharp7 = null; + } + + connectionStateSubject?.OnCompleted(); + connectionStateSubject?.Dispose(); + } + + disposed = true; } } + + private async Task CloseConnection() + { + if (sharp7 == null) + throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); + + await Task.Factory.StartNew(() => sharp7.Disconnect(), CancellationToken.None, TaskCreationOptions.None, scheduler); + } + + private void EnsureConnectionValid() + { + if (disposed) + throw new ObjectDisposedException("S7Connector"); + + if (sharp7 == null) + throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); + + if (!IsConnected) + throw new InvalidOperationException("Plc is not connected"); + } + + private bool EvaluateErrorCode(int errorCode) + { + if (errorCode == 0) + return true; + + if (sharp7 == null) + throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized); + + var errorText = sharp7.ErrorText(errorCode); + Logger?.LogError($"Error Code {errorCode} {errorText}"); + + if (S7ErrorCodes.AssumeConnectionLost(errorCode)) + SetConnectionLostState(); + + return false; + } + + private async Task Reconnect() + { + await CloseConnection(); + + return await Connect(); + } + + private void SetConnectionLostState() + { + if (connectionStateSubject.Value == Enums.ConnectionState.ConnectionLost) return; + + connectionStateSubject.OnNext(Enums.ConnectionState.ConnectionLost); + } + + ~Sharp7Connector() + { + Dispose(false); + } } diff --git a/Sharp7.Rx/Sharp7Plc.cs b/Sharp7.Rx/Sharp7Plc.cs index 9902224..d57f63c 100644 --- a/Sharp7.Rx/Sharp7Plc.cs +++ b/Sharp7.Rx/Sharp7Plc.cs @@ -1,13 +1,8 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; +using System.Diagnostics; using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Text; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Sharp7.Rx.Basics; using Sharp7.Rx.Enums; @@ -15,270 +10,269 @@ using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; using Sharp7.Rx.Settings; -namespace Sharp7.Rx +namespace Sharp7.Rx; + +public class Sharp7Plc : IPlc { - public class Sharp7Plc : IPlc + protected readonly CompositeDisposable Disposables = new CompositeDisposable(); + private readonly ConcurrentSubjectDictionary multiVariableSubscriptions = new ConcurrentSubjectDictionary(StringComparer.InvariantCultureIgnoreCase); + private readonly List performanceCoutner = new List(1000); + private readonly PlcConnectionSettings plcConnectionSettings; + private readonly IS7VariableNameParser varaibleNameParser = new CacheVariableNameParser(new S7VariableNameParser()); + private bool disposed; + private IS7Connector s7Connector; + + + /// + /// + /// + /// + /// + /// + /// + /// + /// Polling interval used to read multi variable requests from PLC. + /// + /// + /// This is the wait time between two successive reads from PLC and determines the + /// time resolution for all variable reads reated with CreateNotification. + /// + /// + /// Default is 100 ms. The minimum supported time is 5 ms. + /// + /// + public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress, int port = 102, TimeSpan? multiVarRequestCycleTime = null) { - protected readonly CompositeDisposable Disposables = new CompositeDisposable(); - private readonly ConcurrentSubjectDictionary multiVariableSubscriptions = new ConcurrentSubjectDictionary(StringComparer.InvariantCultureIgnoreCase); - private readonly List performanceCoutner = new List(1000); - private readonly PlcConnectionSettings plcConnectionSettings; - private readonly IS7VariableNameParser varaibleNameParser = new CacheVariableNameParser(new S7VariableNameParser()); - private bool disposed; - private IS7Connector s7Connector; + plcConnectionSettings = new PlcConnectionSettings {IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port}; + if (multiVarRequestCycleTime != null && multiVarRequestCycleTime > TimeSpan.FromMilliseconds(5)) + MultiVarRequestCycleTime = multiVarRequestCycleTime.Value; + } - /// - /// - /// - /// - /// - /// - /// - /// - /// Polling interval used to read multi variable requests from PLC. - /// - /// - /// This is the wait time between two successive reads from PLC and determines the - /// time resolution for all variable reads reated with CreateNotification. - /// - /// - /// Default is 100 ms. The minimum supported time is 5 ms. - /// - /// - public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress, int port = 102, TimeSpan? multiVarRequestCycleTime = null) - { - plcConnectionSettings = new PlcConnectionSettings {IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port}; + public IObservable ConnectionState { get; private set; } + public ILogger Logger { get; set; } + public TimeSpan MultiVarRequestCycleTime { get; } = TimeSpan.FromSeconds(0.1); - if (multiVarRequestCycleTime != null && multiVarRequestCycleTime > TimeSpan.FromMilliseconds(5)) - MultiVarRequestCycleTime = multiVarRequestCycleTime.Value; - } + public int MultiVarRequestMaxItems { get; set; } = 16; - public IObservable ConnectionState { get; private set; } - public ILogger Logger { get; set; } - public TimeSpan MultiVarRequestCycleTime { get; } = TimeSpan.FromSeconds(0.1); + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } - public int MultiVarRequestMaxItems { get; set; } = 16; - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public IObservable CreateNotification(string variableName, TransmissionMode transmissionMode) - { - return Observable.Create(observer => - { - var address = varaibleNameParser.Parse(variableName); - if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName)); - - var disposables = new CompositeDisposable(); - var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName); - disposeableContainer.AddDisposableTo(disposables); - - var observable = disposeableContainer.Observable - .Select(bytes => S7ValueConverter.ConvertToType(bytes, address)); - - if (transmissionMode == TransmissionMode.OnChange) - observable = observable.DistinctUntilChanged(); - - observable.Subscribe(observer) - .AddDisposableTo(disposables); - - return disposables; - }); - } - - public Task GetValue(string variableName) - { - return GetValue(variableName, CancellationToken.None); - } - - - public Task SetValue(string variableName, TValue value) - { - return SetValue(variableName, value, CancellationToken.None); - } - - - public async Task GetValue(string variableName, CancellationToken token) + public IObservable CreateNotification(string variableName, TransmissionMode transmissionMode) + { + return Observable.Create(observer => { var address = varaibleNameParser.Parse(variableName); if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName)); - var data = await s7Connector.ReadBytes(address.Operand, address.Start, address.Length, address.DbNr, token); - return S7ValueConverter.ConvertToType(data, address); - } + var disposables = new CompositeDisposable(); + var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName); + disposeableContainer.AddDisposableTo(disposables); - public async Task InitializeAsync() - { - s7Connector = new Sharp7Connector(plcConnectionSettings, varaibleNameParser) {Logger = Logger}; - ConnectionState = s7Connector.ConnectionState; + var observable = disposeableContainer.Observable + .Select(bytes => S7ValueConverter.ConvertToType(bytes, address)); - await s7Connector.InitializeAsync(); + if (transmissionMode == TransmissionMode.OnChange) + observable = observable.DistinctUntilChanged(); + + observable.Subscribe(observer) + .AddDisposableTo(disposables); + + return disposables; + }); + } + + public Task GetValue(string variableName) + { + return GetValue(variableName, CancellationToken.None); + } + + + public Task SetValue(string variableName, TValue value) + { + return SetValue(variableName, value, CancellationToken.None); + } + + + public async Task GetValue(string variableName, CancellationToken token) + { + var address = varaibleNameParser.Parse(variableName); + if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName)); + + var data = await s7Connector.ReadBytes(address.Operand, address.Start, address.Length, address.DbNr, token); + return S7ValueConverter.ConvertToType(data, address); + } + + public async Task InitializeAsync() + { + s7Connector = new Sharp7Connector(plcConnectionSettings, varaibleNameParser) {Logger = Logger}; + ConnectionState = s7Connector.ConnectionState; + + await s7Connector.InitializeAsync(); #pragma warning disable 4014 - Task.Run(async () => + Task.Run(async () => + { + try { - try - { - await s7Connector.Connect(); - } - catch (Exception e) - { - Logger?.LogError(e, "Error while connecting to PLC"); - } - }); + await s7Connector.Connect(); + } + catch (Exception e) + { + Logger?.LogError(e, "Error while connecting to PLC"); + } + }); #pragma warning restore 4014 - RunNotifications(s7Connector, MultiVarRequestCycleTime) - .AddDisposableTo(Disposables); + RunNotifications(s7Connector, MultiVarRequestCycleTime) + .AddDisposableTo(Disposables); - return true; - } + return true; + } - public async Task SetValue(string variableName, TValue value, CancellationToken token) + public async Task SetValue(string variableName, TValue value, CancellationToken token) + { + var address = varaibleNameParser.Parse(variableName); + if (address == null) throw new ArgumentException("Input variable name is not valid", "variableName"); + + if (typeof(TValue) == typeof(bool)) { - var address = varaibleNameParser.Parse(variableName); - if (address == null) throw new ArgumentException("Input variable name is not valid", "variableName"); + await s7Connector.WriteBit(address.Operand, address.Start, address.Bit, (bool) (object) value, address.DbNr, token); + } + else if (typeof(TValue) == typeof(int) || typeof(TValue) == typeof(short)) + { + byte[] bytes; + if (address.Length == 4) + bytes = BitConverter.GetBytes((int) (object) value); + else + bytes = BitConverter.GetBytes((short) (object) value); - if (typeof(TValue) == typeof(bool)) - { - await s7Connector.WriteBit(address.Operand, address.Start, address.Bit, (bool) (object) value, address.DbNr, token); - } - else if (typeof(TValue) == typeof(int) || typeof(TValue) == typeof(short)) - { - byte[] bytes; - if (address.Length == 4) - bytes = BitConverter.GetBytes((int) (object) value); - else - bytes = BitConverter.GetBytes((short) (object) value); + Array.Reverse(bytes); - Array.Reverse(bytes); + await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); + } + else if (typeof(TValue) == typeof(byte) || typeof(TValue) == typeof(char)) + { + var bytes = new[] {Convert.ToByte(value)}; + await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); + } + else if (typeof(TValue) == typeof(byte[])) + { + await s7Connector.WriteBytes(address.Operand, address.Start, (byte[]) (object) value, address.DbNr, token); + } + else if (typeof(TValue) == typeof(float)) + { + var buffer = new byte[sizeof(float)]; + buffer.SetRealAt(0, (float) (object) value); + await s7Connector.WriteBytes(address.Operand, address.Start, buffer, address.DbNr, token); + } + else if (typeof(TValue) == typeof(string)) + { + var stringValue = value as string; + if (stringValue == null) throw new ArgumentException("Value must be of type string", "value"); - await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); - } - else if (typeof(TValue) == typeof(byte) || typeof(TValue) == typeof(char)) - { - var bytes = new[] {Convert.ToByte(value)}; - await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); - } - else if (typeof(TValue) == typeof(byte[])) - { - await s7Connector.WriteBytes(address.Operand, address.Start, (byte[]) (object) value, address.DbNr, token); - } - else if (typeof(TValue) == typeof(float)) - { - var buffer = new byte[sizeof(float)]; - buffer.SetRealAt(0, (float) (object) value); - await s7Connector.WriteBytes(address.Operand, address.Start, buffer, address.DbNr, token); - } - else if (typeof(TValue) == typeof(string)) - { - var stringValue = value as string; - if (stringValue == null) throw new ArgumentException("Value must be of type string", "value"); + var bytes = Encoding.ASCII.GetBytes(stringValue); + Array.Resize(ref bytes, address.Length); - var bytes = Encoding.ASCII.GetBytes(stringValue); - Array.Resize(ref bytes, address.Length); - - if (address.Type == DbType.String) + if (address.Type == DbType.String) + { + var bytesWritten = await s7Connector.WriteBytes(address.Operand, address.Start, new[] {(byte) address.Length, (byte) bytes.Length}, address.DbNr, token); + token.ThrowIfCancellationRequested(); + if (bytesWritten == 2) { - var bytesWritten = await s7Connector.WriteBytes(address.Operand, address.Start, new[] {(byte) address.Length, (byte) bytes.Length}, address.DbNr, token); - token.ThrowIfCancellationRequested(); - if (bytesWritten == 2) - { - var stringStartAddress = (ushort) (address.Start + 2); - token.ThrowIfCancellationRequested(); - await s7Connector.WriteBytes(address.Operand, stringStartAddress, bytes, address.DbNr, token); - } - } - else - { - await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); + var stringStartAddress = (ushort) (address.Start + 2); token.ThrowIfCancellationRequested(); + await s7Connector.WriteBytes(address.Operand, stringStartAddress, bytes, address.DbNr, token); } } else { - throw new InvalidOperationException($"type '{typeof(TValue)}' not supported."); + await s7Connector.WriteBytes(address.Operand, address.Start, bytes, address.DbNr, token); + token.ThrowIfCancellationRequested(); } } - - protected virtual void Dispose(bool disposing) + else { - if (disposed) return; - disposed = true; - - if (disposing) - { - Disposables.Dispose(); - - if (s7Connector != null) - { - s7Connector.Disconnect().Wait(); - s7Connector.Dispose(); - s7Connector = null; - } - - multiVariableSubscriptions.Dispose(); - } - } - - private async Task GetAllValues(bool connected, IS7Connector connector) - { - if (!connected) - return Unit.Default; - - if (multiVariableSubscriptions.ExistingKeys.IsEmpty()) - return Unit.Default; - - var stopWatch = Stopwatch.StartNew(); - foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems)) - { - var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest as IReadOnlyList); - - foreach (var pair in multiVarRequest) - if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject)) - subject.OnNext(pair.Value); - } - - stopWatch.Stop(); - performanceCoutner.Add(stopWatch.ElapsedMilliseconds); - - PrintAndResetPerformanceStatistik(); - - return Unit.Default; - } - - private void PrintAndResetPerformanceStatistik() - { - if (performanceCoutner.Count == performanceCoutner.Capacity) - { - var average = performanceCoutner.Average(); - var min = performanceCoutner.Min(); - var max = performanceCoutner.Max(); - - Logger?.LogTrace("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress, - multiVariableSubscriptions.ExistingKeys.Count(), - MultiVarRequestMaxItems); - performanceCoutner.Clear(); - } - } - - private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle) - { - return ConnectionState.FirstAsync() - .Select(states => states == Enums.ConnectionState.Connected) - .SelectMany(connected => GetAllValues(connected, connector)) - .RepeatAfterDelay(MultiVarRequestCycleTime) - .LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc") - .Subscribe(); - } - - ~Sharp7Plc() - { - Dispose(false); + throw new InvalidOperationException($"type '{typeof(TValue)}' not supported."); } } -} \ No newline at end of file + + protected virtual void Dispose(bool disposing) + { + if (disposed) return; + disposed = true; + + if (disposing) + { + Disposables.Dispose(); + + if (s7Connector != null) + { + s7Connector.Disconnect().Wait(); + s7Connector.Dispose(); + s7Connector = null; + } + + multiVariableSubscriptions.Dispose(); + } + } + + private async Task GetAllValues(bool connected, IS7Connector connector) + { + if (!connected) + return Unit.Default; + + if (multiVariableSubscriptions.ExistingKeys.IsEmpty()) + return Unit.Default; + + var stopWatch = Stopwatch.StartNew(); + foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems)) + { + var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest as IReadOnlyList); + + foreach (var pair in multiVarRequest) + if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject)) + subject.OnNext(pair.Value); + } + + stopWatch.Stop(); + performanceCoutner.Add(stopWatch.ElapsedMilliseconds); + + PrintAndResetPerformanceStatistik(); + + return Unit.Default; + } + + private void PrintAndResetPerformanceStatistik() + { + if (performanceCoutner.Count == performanceCoutner.Capacity) + { + var average = performanceCoutner.Average(); + var min = performanceCoutner.Min(); + var max = performanceCoutner.Max(); + + Logger?.LogTrace("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress, + multiVariableSubscriptions.ExistingKeys.Count(), + MultiVarRequestMaxItems); + performanceCoutner.Clear(); + } + } + + private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle) + { + return ConnectionState.FirstAsync() + .Select(states => states == Enums.ConnectionState.Connected) + .SelectMany(connected => GetAllValues(connected, connector)) + .RepeatAfterDelay(MultiVarRequestCycleTime) + .LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc") + .Subscribe(); + } + + ~Sharp7Plc() + { + Dispose(false); + } +}