mirror of
https://github.com/PotentiaRobotics/control-system.git
synced 2025-04-03 20:00:19 -04:00
Work
This commit is contained in:
parent
99f42d50da
commit
7cd3555b4b
209
Control System/Propioception.py
Normal file
209
Control System/Propioception.py
Normal file
|
@ -0,0 +1,209 @@
|
|||
"""
|
||||
This class keeps track of all the motors, sensors and import variables needed for the robot.
|
||||
"""
|
||||
class Propioception:
|
||||
"""
|
||||
Standard kwargs to give for this function
|
||||
|
||||
data = {
|
||||
# Motors (all units are in degrees)
|
||||
'left_hip_pitch': 0,
|
||||
'left_hip_yaw': 0,
|
||||
'left_hip_roll': 0,
|
||||
'left_knee_pitch': 0,
|
||||
'left_ankle_pitch': 0,
|
||||
'left_ankle_yaw': 0,
|
||||
'left_ankle_roll': 0,
|
||||
'right_hip_pitch': 0,
|
||||
'right_hip_yaw': 0,
|
||||
'right_hip_roll': 0,
|
||||
'right_knee_pitch': 0,
|
||||
'right_ankle_pitch': 0,
|
||||
'right_ankle_yaw': 0,
|
||||
'right_ankle_roll': 0,
|
||||
'left_shoulder_pitch': 0,
|
||||
'left_shoulder_yaw': 0,
|
||||
'left_shoulder_roll': 0,
|
||||
'left_elbow_pitch': 0,
|
||||
'right_shoulder_pitch': 0,
|
||||
'right_shoulder_yaw': 0,
|
||||
'right_shoulder_roll': 0,
|
||||
'right_elbow_pitch': 0,
|
||||
'torso_pitch': 0,
|
||||
'torso_yaw': 0,
|
||||
'torso_roll': 0,
|
||||
'neck_pitch': 0,
|
||||
'neck_yaw': 0,
|
||||
'neck_roll': 0,
|
||||
|
||||
# Sensors
|
||||
# Add later
|
||||
}
|
||||
|
||||
This will also have the constants
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
self.data = {
|
||||
# Add defualt later
|
||||
}
|
||||
self.constants = {
|
||||
# Add some constants here later
|
||||
}
|
||||
for key, value in kwargs.items():
|
||||
self.data[str(key)] = value
|
||||
|
||||
def update(self, partsAndValues):
|
||||
"""
|
||||
This function updates the dictionary
|
||||
|
||||
arg artsAndValues is {string --> int or float}
|
||||
|
||||
Error codes:
|
||||
1 is success
|
||||
-1 is the error for NoneType variables
|
||||
-2 is for incorrect datatypes
|
||||
-3 is for when the key is not in the dictionary
|
||||
"""
|
||||
|
||||
if partsAndValues == None:
|
||||
return -1
|
||||
|
||||
if type(partsAndValues) != dict:
|
||||
return -2
|
||||
|
||||
for key in partsAndValues.keys():
|
||||
if key in self.data:
|
||||
self.data[str(key)] = partsAndValues[key]
|
||||
else:
|
||||
return -3
|
||||
return 1
|
||||
|
||||
def add(self, partsAndValues):
|
||||
"""
|
||||
This function will add a part to the system if needed
|
||||
|
||||
arg artsAndValues is {string --> int or float}
|
||||
|
||||
Error codes:
|
||||
1 is success
|
||||
-1 is the error for NoneType variables
|
||||
-2 is for incorrect datatypes
|
||||
-3 is for when the key is in the dictionary
|
||||
"""
|
||||
|
||||
if partsAndValues == None:
|
||||
return -1
|
||||
|
||||
if type(partsAndValues) != dict:
|
||||
return -2
|
||||
|
||||
for key in partsAndValues.keys():
|
||||
if key in self.data:
|
||||
return -3
|
||||
else:
|
||||
self.data[str(key)] = partsAndValues[key]
|
||||
return 1
|
||||
|
||||
def remove(self, partsAndValues):
|
||||
"""
|
||||
This function will remove a part to the system if needed
|
||||
|
||||
arg artsAndValues is [string]
|
||||
|
||||
Error codes:
|
||||
1 is success
|
||||
-1 is the error for NoneType variables
|
||||
-2 is for incorrect datatypes
|
||||
-3 is for when the key is not in the dictionary
|
||||
"""
|
||||
|
||||
if partsAndValues == None:
|
||||
return -1
|
||||
|
||||
|
||||
if type(partsAndValues) != list:
|
||||
return -2
|
||||
|
||||
for key in partsAndValues:
|
||||
if key in self.data:
|
||||
self.data.pop(str(key))
|
||||
else:
|
||||
return -3
|
||||
return 1
|
||||
|
||||
|
||||
# Test
|
||||
stuff = Propioception(
|
||||
left_hip_pitch= 0,
|
||||
left_hip_yaw= 0,
|
||||
left_hip_roll= 0,
|
||||
left_knee_pitch= 0,
|
||||
left_ankle_pitch= 0,
|
||||
left_ankle_yaw= 0,
|
||||
left_ankle_roll= 0,
|
||||
right_hip_pitch= 0,
|
||||
right_hip_yaw= 0,
|
||||
right_hip_roll= 0,
|
||||
right_knee_pitch= 0,
|
||||
right_ankle_pitch= 0,
|
||||
right_ankle_yaw= 0,
|
||||
right_ankle_roll= 0,
|
||||
left_shoulder_pitch= 0,
|
||||
left_shoulder_yaw= 0,
|
||||
left_shoulder_roll= 0,
|
||||
left_elbow_pitch= 0,
|
||||
right_shoulder_pitch= 0,
|
||||
right_shoulder_yaw= 0,
|
||||
right_shoulder_roll= 0,
|
||||
right_elbow_pitch= 0,
|
||||
torso_pitch= 0,
|
||||
torso_yaw= 0,
|
||||
torso_roll= 0,
|
||||
neck_pitch= 0,
|
||||
neck_yaw= 0,
|
||||
neck_roll= 0
|
||||
)
|
||||
|
||||
"""Update cases"""
|
||||
# Success case
|
||||
print(stuff.data['neck_roll'])
|
||||
stuff.update({'neck_roll': 21})
|
||||
print(stuff.data['neck_roll'])
|
||||
|
||||
# -1 case
|
||||
print(stuff.update(None))
|
||||
|
||||
# -2 case
|
||||
print(stuff.update([]))
|
||||
|
||||
# -3 case
|
||||
print(stuff.update({'test': 21}))
|
||||
|
||||
"""Remove cases"""
|
||||
# Success case
|
||||
print(stuff.data['neck_roll'])
|
||||
print(stuff.remove(['neck_roll']))
|
||||
|
||||
# -1 case
|
||||
print(stuff.remove(None))
|
||||
|
||||
# # -2 case
|
||||
print(stuff.remove({}))
|
||||
|
||||
# -3 case
|
||||
print(stuff.remove(['test']))
|
||||
|
||||
"""Add cases"""
|
||||
# Success case
|
||||
print('test' in stuff.data.keys())
|
||||
stuff.add({'test': 21})
|
||||
print(stuff.data['test'])
|
||||
|
||||
# -1 case
|
||||
print(stuff.add(None))
|
||||
|
||||
# -2 case
|
||||
print(stuff.add([]))
|
||||
|
||||
# -3 case
|
||||
print(stuff.add({'test': 21}))
|
69
Control System/bootup.py
Normal file
69
Control System/bootup.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
# Create priority queue
|
||||
# In main create three threads
|
||||
# 1 for managing actions (stores priority queue)
|
||||
# 1 for reading sensor data
|
||||
# Updates sensor data through wifi and propiosense system
|
||||
# 1 for executing the first action in the priority queue
|
||||
import threading
|
||||
import socket
|
||||
import heapq
|
||||
import time
|
||||
|
||||
class Receiver:
|
||||
def __init__(self, host, port):
|
||||
self.actions = ["Password L"]
|
||||
self.timer = 0
|
||||
self.HOST = host
|
||||
self.PORT = port
|
||||
|
||||
def priorityQueue(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
print('Socket created')
|
||||
|
||||
#managing error exception
|
||||
try:
|
||||
s.bind((self.HOST, self.PORT))
|
||||
except socket.error:
|
||||
print ('Bind failed ')
|
||||
|
||||
s.listen(5)
|
||||
print ('Socket awaiting messages')
|
||||
(conn, addr) = s.accept()
|
||||
print ('Connected')
|
||||
|
||||
# awaiting for message
|
||||
while True:
|
||||
instruction = conn.recv(1024)
|
||||
action = instruction.decode('UTF-8')
|
||||
print("Action received:", action)
|
||||
if "Password" in action:
|
||||
heapq.heappush(self.actions,action)
|
||||
print(self.actions)
|
||||
else:
|
||||
print("Incorrect Command")
|
||||
|
||||
# Sending reply
|
||||
conn.send(action.encode()+bytes(" works",'utf-8'))
|
||||
|
||||
def execute(self):
|
||||
while len(self.actions) > 0:
|
||||
if self.timer == 0:
|
||||
print("Executed: ", heapq.heappop(self.actions))
|
||||
time.sleep(5)
|
||||
heapq.heappush(self.actions, "Password Balancing")
|
||||
print("Inside execute",self.actions)
|
||||
|
||||
def sensorData(self):
|
||||
print("Test")
|
||||
|
||||
def runSimul(self):
|
||||
threading.Thread(target=self.priorityQueue).start()
|
||||
threading.Thread(target=self.execute()).start()
|
||||
threading.Thread(target=self.sensorData()).start()
|
||||
|
||||
def main():
|
||||
simulation = Receiver('10.235.1.127',12345)
|
||||
simulation.runSimul()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
3
Control System/instructions.py
Normal file
3
Control System/instructions.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
# Use methods online to dehash data packages/ instructions
|
||||
# Add instruction to priority queue
|
||||
|
26
client1.py
26
client1.py
|
@ -1,13 +1,13 @@
|
|||
import socket
|
||||
|
||||
HOST = '10.235.1.146' # Enter IP or Hostname of your server
|
||||
PORT = 12345 # Pick an open Port (1000+ recommended), must match the server port
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((HOST,PORT))
|
||||
|
||||
#Lets loop awaiting for your input
|
||||
while True:
|
||||
command = input('Enter your command: ')
|
||||
s.send(bytes(command, 'utf-8'))
|
||||
reply = s.recv(1024)
|
||||
print(reply)
|
||||
import socket
|
||||
|
||||
HOST = '10.235.1.127' # Enter IP or Hostname of your server
|
||||
PORT = 12345 # Pick an open Port (1000+ recommended), must match the server port
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((HOST,PORT))
|
||||
|
||||
#Lets loop awaiting for your input
|
||||
while True:
|
||||
command = input('Enter your command: ')
|
||||
s.send(bytes(command, 'utf-8'))
|
||||
reply = s.recv(1024)
|
||||
print(reply)
|
|
@ -1,6 +1,6 @@
|
|||
import socket
|
||||
|
||||
HOST = '10.235.1.146' # Server IP or Hostname
|
||||
HOST = '10.235.1.127' # Use this for client
|
||||
PORT = 12345 # Pick an open Port (1000+ recommended), must match the client sport
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
print('Socket created')
|
||||
|
|
Loading…
Reference in New Issue
Block a user