I'm getting a weird issue while trying to run (locally, and on Azure) my Azure Func. I got this :
[2024-11-27T08:47:28.789Z] Error receiving event: Error { context: Custom(Custom { kind: Other, error: Fe2o3 Error: Receiver attach error RemoteClosedWithError(Error { condition: AmqpError(UnauthorizedAccess), description: Some("Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://backendiothub.servicebus.windows.net/backendiothubns/consumergroups/backendiotcg/partitions/0'. TrackingId:f5b003e1e2c54481adbcafaebc948090_G30, SystemTracker:gateway5, Timestamp:2024-11-27T08:47:28"), info: None }) }) }
But I'm pretty sure, I've granted full access to my func :
1/Azure portal - Role assignment 2/Azure portal - Event Hub Namespace 3/Azure portal - Environment variables
UPDATE - Here is the code :
config.rs :
use std::{fmt, sync::Arc};
use anyhow::Result;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::consumer::ConsumerClient;
use figment::{
providers::Env,
Figment,
};
use serde::Deserialize;
use tracing::debug;
/// Structure contaning the config of the Azure EventHub.
#[derive(Deserialize, Clone, Default)]
pub struct EventHub {
host: String,
name: String,
#[serde(alias="consumergroup")]
consumer_group: String,
#[serde(skip)]
pub client: Option<Arc<ConsumerClient>>,
}
impl fmt::Debug for EventHub {
/// Custom Debug implementation as ConsumerClient do not implement it.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventHub")
.field("host", &self.host)
.field("name", &self.name)
.field("consumer_group", &self.name)
.finish_non_exhaustive()
}
}
/// Structure containing the configuaration of the application.
/// derive:
/// - Deserialize: can deserialize data to this structure.
/// - Clone: can be explicitly cloned if needed.
/// - Default: can be initialized empty. (mostly for testing)
/// - Debug: can be printed in debug output.
/// all the sub structure must implement or derive those trait;
#[derive(Deserialize, Clone, Default, Debug)]
pub struct Config {
pub eventhub: EventHub,
}
impl Config {
/// Create a new Config from the environment.
///
/// It use the folowing environment variable:
/// - EVENTHUB_HOST (fully qualified namespace of the EventHub)
/// - EVENTHUB_NAME (name of the EventHub)
/// the funcion then generates the credential from the default azure
/// environment variables and use it to create a client that is added to
/// the config.
pub fn new() -> Result<Self> {
let mut config: Config = Figment::new()
.merge(
Env::raw()
.filter(|k| k.starts_with("EVENTHUB_"))
.split("_")
).extract()?;
let credential = DefaultAzureCredential::new()?;
let client = ConsumerClient::new(
config.eventhub.host.clone(),
config.eventhub.name.clone(),
Some(config.eventhub.consumer_group.clone()),
credential,
None,
);
config.eventhub.client = Some(Arc::new(client));
debug!("Config: {config:?}");
Ok(config)
}
}
main.rs :
use anyhow::{bail, Result};
use tracing::info;
use futures::{pin_mut, StreamExt};
use tracing_subscriber::{fmt, EnvFilter};
mod config;
use config::Config;
async fn receive_events(config: &Config) -> Result<()>{
let Some(ref client) = config.eventhub.client else {
bail!("EventHub not correctly initialized!")
};
client.open().await.unwrap();
info!("Connected to the event hub.");
let event_stream = client
.receive_events_on_partition(
"0".to_string(),
Some(
azure_messaging_eventhubs::consumer::ReceiveOptions{
start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
..Default::default()
}),
..Default::default()
},
))
.await;
pin_mut!(event_stream);
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
fmt()
.with_target(false)
.with_level(true)
.with_env_filter(EnvFilter::from_default_env())
.init();
let config = Config::new()?;
receive_events(&config).await?;
println!("funcion terminated");
Ok(())
}
The lib I use : https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/eventhubs/azure_messaging_eventhubs/README.md
Any help is more than welcome. Thanks in advance
I tried your code, and the same error is occurring due to the azure_messaging_eventhubs
package. I recommend switching to azeventhubs
.
I referred to this documentation to create an Azure Function in Rust using Visual Studio Code .
Additionally, I used this documentation for azeventhubs
in Rust.
The Rust code below creates an HTTP server using the Warp framework, which integrates with Azure Event Hub to send messages based on HTTP GET requests.
use std::collections::HashMap;
use std::env;
use std::net::Ipv4Addr;
use warp::{http::Response, Filter};
use azeventhubs::producer::{EventHubProducerClient, EventHubProducerClientOptions, SendEventOptions};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut producer_client = EventHubProducerClient::new_from_connection_string(
"<CONNECTION_STRING>",
"<EVENT_HUB_NAME>".to_string(),
EventHubProducerClientOptions::default()
).await?;
let partition_ids = producer_client.get_partition_ids().await?;
let example1 = warp::get()
.and(warp::path("api"))
.and(warp::path("HttpExample"))
.and(warp::query::<HashMap<String, String>>())
.map(|p: HashMap<String, String>| {
if let Some(name) = p.get("name") {
// Prepare and send event to the first partition
let event = format!("Hello, {} from Warp Server!", name);
let options = SendEventOptions::new().with_partition_id(&partition_ids[0]);
tokio::spawn(async move {
if let Err(e) = producer_client.send_event(&event, options).await {
eprintln!("Error sending event: {}", e);
}
});
Response::builder().body(format!("Hello, {}. This HTTP triggered function executed successfully.", name))
} else {
Response::builder().body(String::from("This HTTP triggered function executed successfully. Pass a name in the query string for a personalized response."))
}
});
let port_key = "FUNCTIONS_CUSTOMHANDLER_PORT";
let port: u16 = match env::var(port_key) {
Ok(val) => val.parse().expect("Custom Handler port is not a number!"),
Err(_) => 3000,
};
warp::serve(example1).run((Ipv4Addr::LOCALHOST, port)).await;
producer_client.close().await?;
Ok(())
}