Using C# .Net to send data to Microsoft Azure EventHubs

In this article I will show you how to use C# .Net to send data to Azure eventhubs. This program can come really handy when we want to simulate live streaming data and consume the data via Azure Eventhubs. This can be then used in tools like Power BI for true real time data analytics and combine this with Azure Anomaly Detector for anomaly detection. Here is a quick introduction on Azure Stream Analytics

Azure Stream Analytics is a real-time analytics and complex event-processing engine that is designed to analyze and process high volumes of fast streaming data from multiple sources simultaneously. Patterns and relationships can be identified in information extracted from a number of input sources including devices, sensors, clickstreams, social media feeds, and applications. These patterns can be used to trigger actions and initiate workflows such as creating alerts, feeding information to a reporting tool, or storing transformed data for later use. Also, Stream Analytics is available on Azure IoT Edge runtime, enabling to process data on IoT devices.

The following are some examples of when you can use Stream Analytics:

  • Journey Management
  • Analyze real-time telemetry streams from IoT devices
  • Web logs/clickstream analytics
  • Geospatial analytics for fleet management and driverless vehicles
  • Remote monitoring and predictive maintenance of high value assets
  • Real-time analytics on Point of Sale data for inventory control and anomaly detection

How does Stream Analytics work?

An Azure Stream Analytics job consists of an input, query, and an output. Stream Analytics ingests data from Azure Event Hubs (including Azure Event Hubs from Apache Kafka), Azure IoT Hub, or Azure Blob Storage. The query, which is based on SQL query language, can be used to easily filter, sort, aggregate, and join streaming data over a period of time. You can also extend this SQL language with JavaScript and C# user-defined functions (UDFs). You can easily adjust the event ordering options and duration of time windows when performing aggregation operations through simple language constructs and/or configurations.

Each job has one or several outputs for the transformed data, and you can control what happens in response to the information you’ve analyzed. For example, you can:

  • Send data to services such as Azure Functions, Service Bus Topics or Queues to trigger communications or custom workflows downstream.
  • Send data to a Power BI dashboard for real-time dashboarding.
  • Store data in other Azure storage services (for example, Azure Data Lake, Azure Synapse Analytics, etc.) to train a machine learning model based on historical data or perform batch analytics. For more refer to Microsoft docs here

In this article I want to show you how you can use Azure Event Hubs with C# .Net. Let’s start by creating a C# .Net console application. We would need to import the NuGet package Microsoft.Azure.EventHubs. In order to create this console application you would need an Integrated Development Environment (IDE) like Microsoft Visual Studio. For this example I used Microsoft Visual Studio Professional 2019.

One of the resources in your visual studio project in which you can create connections and varibales is the app.config file. Create a connection string to your Azure Event Hub in your app.config file. Please note this article is assuming that you have already created the eventhub resource in Azure. If not please refer to this guide

<connectionStrings>
<add name="conn_eventHub" connectionString="Endpoint=sb://myeventsnamespace.servicebus.windows.net/;SharedAccessKeyName=sendconn;SharedAccessKey=your shared access key here;EntityPath=eventhub1" />
</connectionStrings>
//required headers
using System.Configuration;
using Microsoft.Azure.EventHubs;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.Storage;
using System.Threading;
using System.Configuration;
using Microsoft.Azure.EventHubs;
namespace AzureStreamAnalyticsUpload
{
class Program
{
private static EventHubClient eventHubClient;
private const string EventHubName = "eventhub1";
private static async Task MainAsync(string[] args)
{

string EventHubConnectionString =
ConfigurationManager.ConnectionStrings["conn_eventHub"].ConnectionString;

var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString) { EntityPath = EventHubName };

eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

await SendMessagesToEventHub(500);
await eventHubClient.CloseAsync();

Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();

}

private static async Task SendMessagesToEventHub(int numMessagesToSend)
{
for (var i = 0; i < numMessagesToSend; i++)
{
try
{
// "vmname|environment|subscription|cpuusage|network|diskbytes|localDate"
Random random2 = new Random();
int memoryusage = random2.Next(0, 100);
DateTime localDate2 = DateTime.Now;
String message_body;
for (int fl = 0; fl < 20; fl++)
for (int bd = 0; bd < 3; bd++)
{
{
PrepDataBuilding Building = new PrepDataBuilding(bd, fl);
if (i % 5 == 0 && bd == 0 && fl == 5) //Set Temp Peak
{

Building.temperature = 200;
message_body = $"[{{\"Building\":\"" + "Building "+bd + "\",\"Floor\":\"" + fl + "\",\"ElectricityUsage\":\"" + Building.electricityUsage + "\",\"waterUsage\":\"" + Building.waterUsage + "\",\"temperature\":\"" + Building.temperature + "\",\"Time\":\" " + Building.localDate + " \"}]";

}

else

message_body = $"[{{\"Building\":\"" + "Building " + bd + "\",\"Floor\":\"" + fl + "\",\"ElectricityUsage\":\"" + Building.electricityUsage + "\",\"waterUsage\":\"" + Building.waterUsage + "\",\"temperature\":\"" + Building.temperature + "\",\"Time\":\" " + Building.localDate + " \"}]";

Console.WriteLine($"Sending message: {message_body}");

await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message_body)));

Thread.Sleep(5000); //ms

}

}
Thread.Sleep(5000); //ms //var
message2 = $"Message {i}"; //var
message = $"[{{\"vmname\":\"servername\",\"environment\":\"DA\",\"subscription\":\"Azure\",\"memoryusage\":\"" + memoryusage + "\",\"Time\":\" " + localDate2 + " \"}]";

}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
i++;

}
Console.WriteLine($"{numMessagesToSend} messages sent.");

}

static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.WriteLine("Hello World!");
}
}
}

This is the prep data building class that is called in the above main program


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.Storage;
using System.Threading;
using System.Configuration;
using Microsoft.Azure.EventHubs;
namespace AzureStreamAnalyticsUpload
{
class PrepDataBuilding
{
public int building;
public int floor;
public float electricityUsage;
public float waterUsage;
public int temperature;
public DateTime localDate;
public PrepDataBuilding(int buildingno, int floorno)
{
Random random2 = new Random();
this.temperature = random2.Next(65, 80); //F
this.electricityUsage = random2.Next(400, 600); //KWH
this.waterUsage = random2.Next(50, 100); // gallons
this.building = buildingno;
this.floor = floorno;
this.localDate = DateTime.Now;
}
}
}

%d bloggers like this: